Commit 12a5190a authored by jianhaijun's avatar jianhaijun

kafka订阅方式:1、先WebSocket 连接: ws://localhost:8080/ws-native

2、发布消息:/topic/test-topic
parent 5f4ce944
......@@ -26,6 +26,13 @@ public class JwtAuthenticationFilter extends OncePerRequestFilter {
this.jwtUtil = jwtUtil;
}
// @Override
// protected boolean shouldNotFilter(HttpServletRequest request) {
// String path = request.getServletPath();
// // 放行 WebSocket 相关路径
// return path.startsWith("/ws") || path.startsWith("/topic") || path.startsWith("/queue");
// }
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
throws ServletException, IOException {
......
......@@ -51,9 +51,14 @@ public class SecurityConfig {
.requestMatchers("/api/auth/**").permitAll()
.requestMatchers("/api/public/**").permitAll()
.requestMatchers("/ws/**").permitAll()
.requestMatchers("/topic/**").permitAll()
.requestMatchers("/ws-native/**").permitAll()
.requestMatchers("/ws-other/**").permitAll()
.requestMatchers("/topic/test-topic/**").permitAll()
.requestMatchers("/queue/**").permitAll()
.requestMatchers("/kafka-test.html").permitAll()
.requestMatchers("/kafka/producter-topic").permitAll()
// .requestMatchers("/**").permitAll()
// .requestMatchers("/login/**").permitAll()
.anyRequest().authenticated()
)
......
......@@ -10,16 +10,19 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// SockJS
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
// 原生 WebSocket
registry.addEndpoint("/ws-native")
.setAllowedOriginPatterns("*");
registry.addEndpoint("/ws-other")
.setAllowedOriginPatterns("*");
}
}
\ No newline at end of file
package com.example.demo.controller;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
@RestController
@RequestMapping("/kafka")
public class kafkaController {
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String TOPIC_NAME = "test-topic";
/**
* 触发生产消息,给kafka发送消息
*/
@PostMapping("/producter-topic")
public void productertopic(@RequestBody String phone) {
// 配置生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// 创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
try {
// 发布消息
String key = "key1";
String value = "Hello, Kafka!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, key, value);
// 同步发送
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功,主题: " + metadata.topic() + ", 分区: " + metadata.partition() + ", 偏移量: " + metadata.offset());
} catch (ExecutionException | InterruptedException e) {
System.err.println("消息发送失败: " + e.getMessage());
}
} finally {
// 关闭生产者
producer.close();
}
}
}
\ No newline at end of file
......@@ -24,7 +24,7 @@ public class KafkaConsumerExample {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
String topic = "test-topic";
String topic = "test-topic1";
consumer.subscribe(Collections.singletonList(topic));
// 拉取消息并处理
......
......@@ -19,7 +19,7 @@ public class KafkaProducerExample {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到主题
String topic = "test-topic";
String topic = "test-topic1";
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key" + i, "value" + i);
producer.send(record, (metadata, exception) -> {
......
......@@ -17,5 +17,7 @@ public class KafkaWebSocketListener {
public void listen(String message) {
System.out.println("Received from Kafka: " + message);
messagingTemplate.convertAndSend("/topic/" + TOPIC_NAME, message);
}
}
\ No newline at end of file
package com.example.demo.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaWebSocketListener1 {
private static final String TOPIC_NAME = "test-topic1";
@Autowired
private SimpMessagingTemplate messagingTemplate;
@KafkaListener(topics = TOPIC_NAME, groupId = "websocket-group")
public void listen(String message) {
System.out.println("Received from Kafka: " + message);
messagingTemplate.convertAndSend("/topic/" + TOPIC_NAME, message);
}
}
\ No newline at end of file
<template>
<div class="kafka-subscribe">
<h2>Kafka 消息订阅</h2>
<div class="controls">
<button @click="connect" :disabled="connected">连接 WebSocket</button>
<button @click="disconnect" :disabled="!connected">断开</button>
</div>
<div class="status" :class="{ connected: connected, disconnected: !connected }">
状态: {{ connected ? '已连接' : '未连接' }}
</div>
<div class="info">
订阅主题: /topic/test-topic
</div>
<h3>收到的消息:</h3>
<div class="messages">
<div v-for="(msg, index) in messages" :key="index" class="message">
{{ msg }}
</div>
</div>
</div>
</template>
<script>
import SockJS from 'sockjs-client';
import Stomp from 'stomp-websocket';
export default {
name: 'KafkaSubscriber',
data() {
return {
connected: false,
messages: [],
stompClient: null
};
},
methods: {
connect() {
console.log('Connecting to WebSocket...');
// 创建 SockJS 连接
const socket = new SockJS('http://localhost:8080/ws');
// 创建 STOMP 客户端
this.stompClient = Stomp.over(socket);
// 可选:开启调试
// this.stompClient.debug = (str) => console.log('STOMP:', str);
// 连接
this.stompClient.connect(
{},
(frame) => {
console.log('Connected:', frame);
this.connected = true;
this.messages.push(`[${new Date().toLocaleTimeString()}] 连接成功`);
// 订阅主题
this.stompClient.subscribe('/topic/test-topic', (message) => {
console.log('Received:', message.body);
this.messages.push(`[${new Date().toLocaleTimeString()}] ${message.body}`);
});
this.messages.push(`[${new Date().toLocaleTimeString()}] 已订阅 /topic/test-topic`);
},
(error) => {
console.error('Connection error:', error);
this.connected = false;
this.messages.push(`[${new Date().toLocaleTimeString()}] 连接失败: ${error}`);
}
);
},
disconnect() {
if (this.stompClient) {
this.stompClient.disconnect();
this.stompClient = null;
}
this.connected = false;
this.messages.push(`[${new Date().toLocaleTimeString()}] 已断开连接`);
}
},
beforeUnmount() {
this.disconnect();
}
};
</script>
<style scoped>
.kafka-subscribe {
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
.controls button {
padding: 10px 20px;
margin-right: 10px;
cursor: pointer;
}
.controls button:disabled {
cursor: not-allowed;
opacity: 0.6;
}
.status {
padding: 10px;
margin: 10px 0;
font-weight: bold;
}
.status.connected {
color: green;
background: #e8f5e9;
}
.status.disconnected {
color: red;
background: #ffebee;
}
.info {
padding: 10px;
background: #e3f2fd;
margin: 10px 0;
}
.messages {
border: 1px solid #ccc;
padding: 10px;
height: 300px;
overflow-y: auto;
background: #f9f9f9;
}
.message {
padding: 8px;
margin: 5px 0;
background: white;
border-radius: 4px;
font-size: 14px;
}
</style>
\ No newline at end of file
......@@ -12,7 +12,7 @@
.disconnected { color: red; }
#messages { border: 1px solid #ccc; padding: 10px; height: 300px; overflow-y: auto; margin-top: 10px; background: #f9f9f9; }
.message { padding: 8px; margin: 5px 0; background: white; border-radius: 4px; }
.info { color: #666; font-size: 12px; margin-top: 10px; }
#debug { margin-top: 20px; padding: 10px; background: #eee; font-size: 12px; max-height: 150px; overflow-y: auto; }
</style>
</head>
<body>
......@@ -25,70 +25,112 @@
<div id="status">状态: 未连接</div>
<div class="info">
<strong>说明:</strong>连接后会自动订阅 /topic/test-topic,Kafka 发送的消息会实时显示在这里。
</div>
<h3>调试日志:</h3>
<div id="debug"></div>
<h3>收到的消息:</h3>
<div id="messages"></div>
<script>
let ws = null;
var stompClient = null;
var logDiv = document.getElementById('debug');
var ws = null;
function log(msg) {
var div = document.createElement('div');
div.textContent = new Date().toLocaleTimeString() + ' ' + msg;
logDiv.appendChild(div);
logDiv.scrollTop = logDiv.scrollHeight;
console.log(msg);
}
function parseFrame(data) {
var lines = data.split('\n');
var command = lines.shift();
var headers = {};
var body = '';
var i = 0;
for (; i < lines.length; i++) {
var line = lines[i];
if (line === '') break;
var idx = line.indexOf(':');
if (idx > 0) {
headers[line.substring(0, idx)] = line.substring(idx + 1);
}
}
if (i + 1 < lines.length) {
body = lines.slice(i + 1).join('\n');
}
return { command: command, headers: headers, body: body };
}
function connect() {
console.log('Connecting to WebSocket...');
ws = new WebSocket('ws://localhost:8080/ws');
log('开始连接...');
// 使用原生 WebSocket
log('创建 WebSocket 连接...');
ws = new WebSocket('ws://localhost:8080/ws-other');
ws.onopen = function() {
console.log('Connected!');
document.getElementById('status').textContent = '状态: 已连接';
document.getElementById('status').className = 'connected';
// 发送 STOMP CONNECT
ws.send('CONNECT\naccept-version:1.2\n\n\x00');
console.log('Sent CONNECT');
// 1秒后订阅主题
setTimeout(function() {
ws.send('SUBSCRIBE\nid:sub-1\ndestination:/topic/test-topic\n\n\x00');
console.log('Subscribed to /topic/test-topic');
addMessage('系统', '已订阅 test-topic');
}, 1000);
log('WebSocket 打开');
// 发送 STOMP CONNECT 帧
var frame = 'CONNECT\naccept-version:1.2\n\n\x00';
ws.send(frame);
log('发送 CONNECT 帧');
};
ws.onmessage = function(event) {
console.log('Received:', event.data);
addMessage('Kafka', event.data);
ws.onmessage = function(e) {
log('收到消息: ' + e.data);
var frame = parseFrame(e.data);
if (frame.command === 'CONNECTED') {
log('STOMP 连接成功');
document.getElementById('status').textContent = '状态: 已连接';
document.getElementById('status').className = 'connected';
// 订阅主题
var subFrame = 'SUBSCRIBE\nid:sub-1\ndestination:/topic/test-topic1\n\n\x00';
ws.send(subFrame);
log('发送 SUBSCRIBE 帧');
addMessage('系统', '已订阅 /topic/test-topic1');
} else if (frame.command === 'MESSAGE') {
log('收到消息: ' + frame.body);
addMessage('Kafka', frame.body);
} else if (frame.command === 'ERROR') {
log('错误: ' + frame.body);
}
};
ws.onerror = function(error) {
console.error('Error:', error);
document.getElementById('status').textContent = '状态: 连接失败';
document.getElementById('status').className = 'disconnected';
ws.onerror = function(e) {
log('WebSocket 错误');
};
ws.onclose = function(event) {
console.log('Closed:', event.code);
document.getElementById('status').textContent = '状态: 已断开 (' + event.code + ')';
ws.onclose = function(e) {
log('WebSocket 关闭: ' + e.code);
document.getElementById('status').textContent = '状态: 已断开 (' + e.code + ')';
document.getElementById('status').className = 'disconnected';
};
}
function disconnect() {
if (ws) {
ws.send('UNSUBSCRIBE\nid:sub-1\n\n\x00');
ws.close();
ws = null;
}
document.getElementById('status').textContent = '状态: 已断开';
document.getElementById('status').className = 'disconnected';
}
function addMessage(label, text) {
const div = document.createElement('div');
var div = document.createElement('div');
div.className = 'message';
div.textContent = new Date().toLocaleTimeString() + ' [' + label + '] ' + text;
document.getElementById('messages').appendChild(div);
document.getElementById('messages').scrollTop = document.getElementById('messages').scrollHeight;
}
log('页面加载完成');
</script>
</body>
</html>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment