Commit 5f4ce944 authored by jianhaijun's avatar jianhaijun

kafka调试页面提交

parent 1521acd5
# 前端订阅 Kafka test-topic 实现方案
## 需求概述
前端通过 WebSocket 订阅 Kafka 中的 test-topic 主题,实时接收消息推送。
## 技术方案
### 架构设计
- Spring Boot 后端配置 WebSocket + STOMP
- 后端监听 Kafka 消息,通过 WebSocket 转发到前端
- 前端使用 STOMP 客户端订阅主题
### 涉及文件
- 修改:`pom.xml` - 添加 WebSocket 和 Spring Kafka 依赖
- 新建:`src/main/java/com/example/demo/config/WebSocketConfig.java` - WebSocket 配置
- 新建:`src/main/java/com/example/demo/kafka/KafkaWebSocketListener.java` - Kafka 监听器
- 新建:`src/main/resources/static/test.html` - 前端测试页面
## 实现详情
### 1. 依赖配置
- spring-boot-starter-websocket
- spring-kafka
### 2. WebSocketConfig
- 启用 STOMP 消息代理
- 端点:`/ws`
- 消息主题前缀:`/topic`
### 3. KafkaWebSocketListener
- 使用 @KafkaListener 监听 test-topic
- 收到消息后通过 SimpMessagingTemplate 转发
### 4. 前端页面
- 原生 WebSocket + STOMP 协议
- 连接 ws://localhost:8080/ws
- 订阅 /topic/test-topic
## 预期效果
前端连接 WebSocket 后订阅 /topic/test-topic,即可实时接收 Kafka 推送的消息。
\ No newline at end of file
# 前端订阅 Kafka test-topic 实现总结
## 任务完成情况
所有 5 个任务已完成:
### 1. 添加依赖 (pom.xml)
- `spring-boot-starter-websocket` - WebSocket 支持
- `spring-kafka` - Spring Kafka 集成
- `kafka-clients` 版本更新为 3.6.0
### 2. WebSocketConfig.java
- 路径: `src/main/java/com/example/demo/config/WebSocketConfig.java`
- 启用 STOMP 消息代理
- 端点: `/ws` (支持 SockJS)
- 消息前缀: `/topic`, `/queue`
### 3. KafkaWebSocketListener.java
- 路径: `src/main/java/com/example/demo/kafka/KafkaWebSocketListener.java`
- 监听 `test-topic` 主题
- 收到消息后通过 WebSocket 转发到前端
### 4. 前端测试页面
- 路径: `src/main/resources/static/kafka-test.html`
- 使用原生 WebSocket + STOMP 协议
### 5. Security 配置
- 放行了 `/ws/**`, `/topic/**`, `/queue/**`, `/kafka-test.html`
### 6. Kafka 配置 (application.yaml)
- bootstrap-servers: localhost:9092
- consumer group: websocket-group
## 测试步骤
1. 启动 Spring Boot 应用 (端口 8080)
2. 启动 Kafka 服务 (localhost:9092)
3. 访问 http://localhost:8080/kafka-test.html
4. 点击"连接 WebSocket"按钮
5. 运行 KafkaProducerExample 发送消息
6. 页面会实时显示接收到的消息
\ No newline at end of file
# 前端订阅 Kafka test-topic 实现任务
- [x] Task 1: 添加 WebSocket 和 Kafka 依赖到 pom.xml
- [x] Task 2: 创建 WebSocketConfig 配置类
- [x] Task 3: 创建 KafkaWebSocketListener 监听器
- [x] Task 4: 创建前端测试页面
- [x] Task 5: 配置 Security 放行 WebSocket 端点
# WebSocket 后端配置规格文档
## 需求概述
为 Spring Boot 后端创建 WebSocket 支持,使前端可以订阅 Kafka 中的 test-topic 主题。
## 技术方案
### 架构设计
- 使用 Spring WebSocket (STOMP 协议)
- 后端订阅 Kafka topic,通过 WebSocket 推送到前端
- 前端通过 STOMP 客户端连接 WebSocket,订阅主题
### 涉及文件
- 新建:`src/main/java/com/example/demo/config/WebSocketConfig.java` - WebSocket 配置
- 新建:`src/main/java/com/example/demo/websocket/KafkaWebSocketController.java` - Kafka 消息转发控制器
## 实现详情
### 1. WebSocketConfig
- 启用 WebSocket 支持
- 配置 STOMP 端点 `/ws`
- 允许跨域访问
### 2. KafkaWebSocketController
- 注入 Kafka consumer 或使用监听器
- 将 Kafka 消息通过 WebSocket 转发给订阅的前端
- 主题路径:`/topic/test-topic`
### 依赖
pom.xml 已有 kafka-clients 2.6.0,需添加:
- spring-boot-starter-websocket
## 预期效果
前端连接 `ws://localhost:8080/ws`,订阅 `/topic/test-topic` 即可接收 Kafka 推送的消息。
\ No newline at end of file
# WebSocket 后端实现总结
## 任务完成情况
已完成所有 4 个任务:
1. **添加依赖** - 在 `pom.xml` 中添加了:
- `spring-boot-starter-websocket` - WebSocket 支持
- `spring-kafka` - Spring Kafka 集成
2. **创建 WebSocketConfig** - `src/main/java/com/example/demo/config/WebSocketConfig.java`
- 启用 STOMP WebSocket 消息代理
- 配置端点 `/ws`(支持 SockJS)
- 消息前缀 `/app`,推送主题 `/topic`
3. **创建 KafkaWebSocketListener** - `src/main/java/com/example/demo/kafka/KafkaWebSocketListener.java`
- 使用 `@KafkaListener` 监听 `test-topic`
- 收到消息后通过 `SimpMessagingTemplate` 推送到 WebSocket
4. **KafkaProducerExample** - 已存在,无需修改
## 前端连接方式
前端需要使用 STOMP 客户端连接:
```javascript
// 使用 stomp-js 或 sockjs-client
const socket = new SockJS('http://localhost:8080/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, () => {
stompClient.subscribe('/topic/test-topic', (message) => {
console.log('Received:', message.body);
});
});
```
## 依赖版本
- Spring Boot: 3.2.0
- Spring Kafka: 由 parent 管理
- kafka-clients: 2.6.0
- zookeeper: 3.5.3-beta
\ No newline at end of file
# WebSocket 后端实现任务
- [x] Task 1: 添加 spring-boot-starter-websocket 依赖到 pom.xml
- [x] Task 2: 创建 WebSocketConfig 配置类
- [x] Task 3: 创建 Kafka 消息监听器并通过 WebSocket 转发
- [x] Task 4: 更新 KafkaProducerExample 添加消息发送示例
...@@ -155,6 +155,32 @@ ...@@ -155,6 +155,32 @@
<artifactId>spring-boot-starter-security</artifactId> <artifactId>spring-boot-starter-security</artifactId>
</dependency> </dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
<!-- Zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.3-beta</version>
</dependency>
<!-- WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>4.0.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>weather_forecast</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>my_test</name>
<description>my_test</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>21</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>4.0.1</version>
</dependency>
<!-- 数据库驱动(以MySQL为例) -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
<!-- 关键:删除scope标签,默认compile作用域 -->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webmvc-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter-test</artifactId>
<version>4.0.1</version>
<scope>test</scope>
</dependency>
<!-- Spring Data Redis -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>2.7.0</version>
</dependency>
<!-- Jedis Redis Client -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.0.0</version>
</dependency>
<!-- FastJSON -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- Spring MVC -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.3.31</version>
</dependency>
<!-- Servlet API -->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>4.0.1</version>
<scope>provided</scope>
</dependency>
<!-- JJWT for JWT handling -->
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.11.5</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.11.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.11.5</version>
<scope>runtime</scope>
</dependency>
<!-- Jackson for JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.mybatis.generator</groupId>
<artifactId>mybatis-generator-maven-plugin</artifactId>
<version>1.3.3</version>
<configuration>
<configurationFile>src/main/resources/generatorConfig_bak.xml</configurationFile>
<verbose>true</verbose>
<overwrite>true</overwrite>
</configuration>
<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<annotationProcessorPaths>
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
...@@ -50,6 +50,10 @@ public class SecurityConfig { ...@@ -50,6 +50,10 @@ public class SecurityConfig {
.authorizeHttpRequests(authz -> authz .authorizeHttpRequests(authz -> authz
.requestMatchers("/api/auth/**").permitAll() .requestMatchers("/api/auth/**").permitAll()
.requestMatchers("/api/public/**").permitAll() .requestMatchers("/api/public/**").permitAll()
.requestMatchers("/ws/**").permitAll()
.requestMatchers("/topic/**").permitAll()
.requestMatchers("/queue/**").permitAll()
.requestMatchers("/kafka-test.html").permitAll()
// .requestMatchers("/login/**").permitAll() // .requestMatchers("/login/**").permitAll()
.anyRequest().authenticated() .anyRequest().authenticated()
) )
......
package com.example.demo.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
\ No newline at end of file
package com.example.demo.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
// 配置Kafka消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
String topic = "test-topic";
consumer.subscribe(Collections.singletonList(topic));
// 拉取消息并处理
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message from topic %s: key = %s, value = %s%n",
record.topic(), record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
package com.example.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaProducerExample {
public static void main(String[] args) {
// 配置Kafka生产者属性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 创建生产者实例
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息到主题
String topic = "test-topic";
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "key" + i, "value" + i);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.println("Sent message to topic " + metadata.topic() +
", partition " + metadata.partition() +
", offset " + metadata.offset());
}
});
}
// 关闭生产者
producer.close();
}
}
package com.example.demo.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaWebSocketListener {
private static final String TOPIC_NAME = "test-topic";
@Autowired
private SimpMessagingTemplate messagingTemplate;
@KafkaListener(topics = TOPIC_NAME, groupId = "websocket-group")
public void listen(String message) {
System.out.println("Received from Kafka: " + message);
messagingTemplate.convertAndSend("/topic/" + TOPIC_NAME, message);
}
}
\ No newline at end of file
package com.example.demo.kafka;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperExample implements Watcher {
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zk;
private CountDownLatch connectedSignal = new CountDownLatch(1);
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
ZookeeperExample example = new ZookeeperExample();
example.connect("localhost:2181");
// 创建节点
String path = example.zk.create("/test-node", "test-data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("Created node: " + path);
// 获取节点数据
Stat stat = new Stat();
byte[] data = example.zk.getData("/test-node", example, stat);
System.out.println("Node data: " + new String(data));
// 更新节点数据
example.zk.setData("/test-node", "updated-data".getBytes(), -1);
System.out.println("Updated node data");
// 删除节点
example.zk.delete("/test-node", -1);
System.out.println("Deleted node");
example.close();
}
public void connect(String hosts) throws IOException, InterruptedException {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
connectedSignal.await();
}
public void close() throws InterruptedException {
if (zk != null) {
zk.close();
}
}
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
}
\ No newline at end of file
...@@ -33,6 +33,18 @@ spring: ...@@ -33,6 +33,18 @@ spring:
host: 47.121.195.174 host: 47.121.195.174
port: 6379 port: 6379
# Kafka配置
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: websocket-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# SpringBoot应用端口配置 # SpringBoot应用端口配置
server: server:
port: 8080 port: 8080
......
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Kafka WebSocket 测试</title>
<style>
body { font-family: Arial, sans-serif; padding: 20px; max-width: 800px; margin: 0 auto; }
h2 { color: #333; }
button { padding: 10px 20px; margin: 5px; cursor: pointer; }
#status { font-weight: bold; margin: 10px 0; }
.connected { color: green; }
.disconnected { color: red; }
#messages { border: 1px solid #ccc; padding: 10px; height: 300px; overflow-y: auto; margin-top: 10px; background: #f9f9f9; }
.message { padding: 8px; margin: 5px 0; background: white; border-radius: 4px; }
.info { color: #666; font-size: 12px; margin-top: 10px; }
</style>
</head>
<body>
<h2>Kafka test-topic 订阅测试</h2>
<div>
<button onclick="connect()">连接 WebSocket</button>
<button onclick="disconnect()">断开</button>
</div>
<div id="status">状态: 未连接</div>
<div class="info">
<strong>说明:</strong>连接后会自动订阅 /topic/test-topic,Kafka 发送的消息会实时显示在这里。
</div>
<h3>收到的消息:</h3>
<div id="messages"></div>
<script>
let ws = null;
function connect() {
console.log('Connecting to WebSocket...');
ws = new WebSocket('ws://localhost:8080/ws');
ws.onopen = function() {
console.log('Connected!');
document.getElementById('status').textContent = '状态: 已连接';
document.getElementById('status').className = 'connected';
// 发送 STOMP CONNECT
ws.send('CONNECT\naccept-version:1.2\n\n\x00');
console.log('Sent CONNECT');
// 1秒后订阅主题
setTimeout(function() {
ws.send('SUBSCRIBE\nid:sub-1\ndestination:/topic/test-topic\n\n\x00');
console.log('Subscribed to /topic/test-topic');
addMessage('系统', '已订阅 test-topic');
}, 1000);
};
ws.onmessage = function(event) {
console.log('Received:', event.data);
addMessage('Kafka', event.data);
};
ws.onerror = function(error) {
console.error('Error:', error);
document.getElementById('status').textContent = '状态: 连接失败';
document.getElementById('status').className = 'disconnected';
};
ws.onclose = function(event) {
console.log('Closed:', event.code);
document.getElementById('status').textContent = '状态: 已断开 (' + event.code + ')';
document.getElementById('status').className = 'disconnected';
};
}
function disconnect() {
if (ws) {
ws.send('UNSUBSCRIBE\nid:sub-1\n\n\x00');
ws.close();
ws = null;
}
}
function addMessage(label, text) {
const div = document.createElement('div');
div.className = 'message';
div.textContent = new Date().toLocaleTimeString() + ' [' + label + '] ' + text;
document.getElementById('messages').appendChild(div);
document.getElementById('messages').scrollTop = document.getElementById('messages').scrollHeight;
}
</script>
</body>
</html>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment