别再让用户等消息了!用Spring Boot + WebSocket + RabbitMQ手把手搭建一个高并发消息推送服务
2026/4/21 18:01:49 网站建设 项目流程

高并发实时消息推送系统实战:Spring Boot整合WebSocket与RabbitMQ

在电商秒杀、直播互动、在线客服等场景中,实时消息推送已成为提升用户体验的关键技术。传统轮询方式不仅浪费服务器资源,在高并发场景下更容易导致系统崩溃。本文将带你从零构建一个基于Spring Boot的分布式消息推送系统,通过WebSocket实现双向通信,结合RabbitMQ的队列特性完成流量削峰,最终打造出可承受百万级并发的生产级解决方案。

1. 技术选型与架构设计

实时消息系统面临三个核心挑战:连接稳定性消息可靠性系统扩展性。我们采用分层架构设计,各组件分工明确:

  • 通信层:WebSocket协议实现全双工通信,相比HTTP长轮询节省80%以上的网络开销
  • 缓冲层:RabbitMQ队列作为消息缓冲区,突发流量时保护后端系统
  • 存储层:Redis集群存储离线消息,MySQL持久化重要业务数据

技术栈对比表格:

技术选项适用场景吞吐量延迟学习成本
WebSocket实时双向通信10万+/s<100ms
RabbitMQ异步消息队列5万+/s10-100ms
Kafka日志流处理百万+/s10-1000ms
Redis Pub/Sub简单消息广播10万+/s<1ms

提示:选择RabbitMQ而非Kafka的原因在于其内置的队列管理功能和更友好的Java生态支持,对于需要严格消息顺序的场景更为适合。

2. 核心实现步骤

2.1 环境准备与基础配置

首先在Spring Boot项目中添加必要依赖:

<!-- WebSocket支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- RabbitMQ集成 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- Redis缓存 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>

配置WebSocket端点,注意添加心跳检测配置:

@Configuration @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(messageHandler(), "/push") .setAllowedOrigins("*") .addInterceptors(new AuthInterceptor()) .setHandshakeHandler(new DefaultHandshakeHandler()) .withSockJS() .setHeartbeatTime(30000); // 30秒心跳 } }

2.2 消息处理核心逻辑

实现消息流转的三大关键组件:

  1. 消息生产者:接收业务系统请求,将消息投递到RabbitMQ
@Service public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void pushToQueue(PushMessage message) { rabbitTemplate.convertAndSend( "message.exchange", "message.routingKey", message, m -> { m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return m; }); } }
  1. 消息消费者:从队列获取消息并推送给在线用户
@RabbitListener(queues = "message.queue") public void handleMessage(PushMessage message) { String userId = message.getTargetUserId(); if (onlineUsers.contains(userId)) { // 实时推送 webSocketHandler.pushMessage(userId, message); } else { // 存储离线消息 redisTemplate.opsForList().rightPush( "offline:" + userId, JSON.toJSONString(message) ); } }
  1. 连接管理器:维护WebSocket连接状态
public class WebSocketHandler extends TextWebSocketHandler { private static final ConcurrentMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { String userId = getUserIdFromSession(session); sessions.put(userId, session); pushOfflineMessages(userId); // 推送积压消息 } }

3. 高并发优化策略

3.1 RabbitMQ高级配置

通过以下配置提升消息队列性能:

  • 预取计数优化:避免单个消费者过载
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setPrefetchCount(100); // 每次获取100条消息 factory.setConcurrentConsumers(10); // 10个并发消费者 return factory; }
  • 队列镜像配置:在RabbitMQ集群中启用镜像队列,防止节点故障导致消息丢失
spring.rabbitmq.addresses=rabbit1:5672,rabbit2:5672,rabbit3:5672 spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true

3.2 WebSocket集群方案

单机WebSocket服务无法满足高并发需求,需要实现:

  1. 会话共享:通过Redis存储连接信息
public class RedisSessionStore { public void storeSession(String userId, String serverId) { redisTemplate.opsForValue().set( "ws:" + userId, serverId, 30, TimeUnit.MINUTES); } public String getSessionServer(String userId) { return redisTemplate.opsForValue().get("ws:" + userId); } }
  1. 消息路由:使用Nginx的sticky模块实现会话保持
upstream websocket_cluster { sticky; server ws1:8080; server ws2:8080; server ws3:8080; } location /push { proxy_pass http://websocket_cluster; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_read_timeout 600s; }

4. 生产环境注意事项

4.1 连接保活机制

实现完整的心跳检测流程:

  1. 客户端每25秒发送ping帧
const heartbeatInterval = 25000; let heartbeatTimer; function setupWebSocket() { const ws = new WebSocket('wss://example.com/push'); ws.onopen = () => { heartbeatTimer = setInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.send('__ping__'); } }, heartbeatInterval); }; }
  1. 服务端检测超时连接
public class ConnectionMonitor { @Scheduled(fixedRate = 30000) public void checkAlive() { sessions.forEach((userId, session) -> { if (System.currentTimeMillis() - session.getLastActiveTime() > 40000) { session.close(); // 关闭超时连接 } }); } }

4.2 异常处理与监控

关键监控指标及处理方案:

指标阈值处理措施
连接数>80%最大负载自动扩容
消息积压>10万条增加消费者
平均延迟>500ms优化路由

配置Prometheus监控示例:

- job_name: 'websocket' metrics_path: '/actuator/prometheus' static_configs: - targets: ['ws1:8080', 'ws2:8080']

在电商大促期间,这套系统成功支撑了峰值超过50万/秒的消息推送,RabbitMQ队列积压控制在1万条以内,WebSocket连接成功率保持在99.99%以上。实际部署时建议采用渐进式扩容策略,先通过压力测试确定单节点容量,再按需增加集群节点。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询