Spring Boot 与 Apache Pulsar 集成实战:构建高性能消息系统
2026/5/14 22:21:12 网站建设 项目流程

Spring Boot 与 Apache Pulsar 集成实战:构建高性能消息系统

一、引言

Apache Pulsar 是一款云原生的分布式消息流平台,凭借其高吞吐量、低延迟、多租户支持和灵活的消息模型,成为现代分布式系统中消息传递的首选方案。

Spring Boot 提供了对 Pulsar 的原生支持,通过 Spring for Apache Pulsar 模块,开发者可以轻松地将 Pulsar 集成到应用中。本文将深入探讨 Spring Boot 与 Apache Pulsar 的集成实践,包括环境配置、生产者/消费者实现、消息模式以及高级特性。

二、环境准备与依赖配置

2.1 依赖引入

pom.xml中添加 Spring for Apache Pulsar 依赖:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-pulsar</artifactId> </dependency>

2.2 配置文件设置

application.yml中配置 Pulsar 连接信息:

spring: pulsar: client: service-url: pulsar://localhost:6650 admin-url: http://localhost:8080 connection-timeout: 10s operation-timeout: 30s producer: topic-name: my-topic enable-batching: true batching-max-messages: 1000 batching-max-bytes: 131072 consumer: subscription-name: my-subscription subscription-type: exclusive ack-timeout: 30s

2.3 Pulsar 配置类

创建自定义的 Pulsar 配置:

@Configuration public class PulsarConfig { @Bean public PulsarClient pulsarClient(PulsarProperties properties) { return PulsarClient.builder() .serviceUrl(properties.getClient().getServiceUrl()) .adminUrl(properties.getClient().getAdminUrl()) .connectionTimeout(properties.getClient().getConnectionTimeout()) .operationTimeout(properties.getClient().getOperationTimeout()) .build(); } @Bean public ProducerFactory<String> producerFactory(PulsarClient pulsarClient) { return new DefaultProducerFactory<>(pulsarClient, Map.of()); } @Bean public ConsumerFactory<String> consumerFactory(PulsarClient pulsarClient) { return new DefaultConsumerFactory<>(pulsarClient, Map.of()); } }

三、生产者实现

3.1 简单消息发送

@Service public class PulsarProducerService { private final PulsarTemplate<String> pulsarTemplate; public PulsarProducerService(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void sendMessage(String topic, String message) { pulsarTemplate.send(topic, message); } public CompletableFuture<MessageId> sendAsyncMessage(String topic, String message) { return pulsarTemplate.sendAsync(topic, message); } }

3.2 带消息属性的发送

public void sendMessageWithProperties(String topic, String message, Map<String, String> properties) { Message<String> pulsarMessage = MessageBuilder.withPayload(message) .putAllProperties(properties) .build(); pulsarTemplate.send(topic, pulsarMessage); }

3.3 批量消息发送

public void sendBatchMessages(String topic, List<String> messages) { List<Message<String>> pulsarMessages = messages.stream() .map(msg -> MessageBuilder.withPayload(msg).build()) .collect(Collectors.toList()); pulsarTemplate.send(topic, pulsarMessages); }

四、消费者实现

4.1 注解式消费者

@Component public class PulsarConsumerService { @PulsarListener(subscriptionName = "my-subscription", topics = "my-topic") public void receiveMessage(String message) { System.out.println("Received message: " + message); processMessage(message); } @PulsarListener(subscriptionName = "durable-subscription", topics = "persistent://public/default/my-topic", subscriptionType = SubscriptionType.Durable) public void receivePersistentMessage(String message) { System.out.println("Received persistent message: " + message); } private void processMessage(String message) { // 业务处理逻辑 } }

4.2 手动确认模式

@PulsarListener(subscriptionName = "manual-ack-subscription", topics = "my-topic", ackMode = AckMode.MANUAL) public void receiveWithManualAck(String message, @Header(Headers.MESSAGE_ID) MessageId messageId, Acknowledgment acknowledgment) { try { processMessage(message); acknowledgment.acknowledge(); } catch (Exception e) { acknowledgment.negativeAcknowledge(); } }

4.3 批量消费

@PulsarListener(subscriptionName = "batch-subscription", topics = "my-topic", batch = true, batchSize = 100) public void receiveBatchMessages(List<String> messages) { System.out.println("Received batch of " + messages.size() + " messages"); messages.forEach(this::processMessage); }

五、消息模式

5.1 点对点模式

@Service public class PointToPointService { private final PulsarTemplate<String> pulsarTemplate; public PointToPointService(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void sendTask(String task) { pulsarTemplate.send("tasks-topic", task); } } @Component public class TaskConsumer { @PulsarListener(subscriptionName = "task-consumer", topics = "tasks-topic", subscriptionType = SubscriptionType.Exclusive) public void processTask(String task) { System.out.println("Processing task: " + task); } }

5.2 发布/订阅模式

@Service public class NotificationService { private final PulsarTemplate<String> pulsarTemplate; public NotificationService(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void publishNotification(String notification) { pulsarTemplate.send("notifications-topic", notification); } } @Component public class NotificationSubscriber1 { @PulsarListener(subscriptionName = "notification-subscriber-1", topics = "notifications-topic") public void receiveNotification(String notification) { System.out.println("Subscriber 1 received: " + notification); } } @Component public class NotificationSubscriber2 { @PulsarListener(subscriptionName = "notification-subscriber-2", topics = "notifications-topic") public void receiveNotification(String notification) { System.out.println("Subscriber 2 received: " + notification); } }

5.3 请求/回复模式

@Service public class RequestReplyService { private final PulsarTemplate<String> pulsarTemplate; public RequestReplyService(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public CompletableFuture<String> sendRequest(String request) { return pulsarTemplate.sendAndReceive("request-topic", request); } } @Component public class RequestHandler { @PulsarListener(subscriptionName = "request-handler", topics = "request-topic") public String handleRequest(String request) { System.out.println("Handling request: " + request); return "Response for: " + request; } }

六、高级特性

6.1 消息分区

@Bean public ProducerFactory<String> partitionedProducerFactory(PulsarClient pulsarClient) { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfigurationName.PARTITION_KEY, "my-partition-key"); return new DefaultProducerFactory<>(pulsarClient, config); }

6.2 消息压缩

@Bean public ProducerFactory<String> compressedProducerFactory(PulsarClient pulsarClient) { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfigurationName.COMPRESSION_TYPE, CompressionType.LZ4); return new DefaultProducerFactory<>(pulsarClient, config); }

6.3 事务支持

@Service public class TransactionalMessageService { private final PulsarTemplate<String> pulsarTemplate; public TransactionalMessageService(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } @Transactional public void sendTransactionalMessages(String topic1, String topic2, String message1, String message2) { pulsarTemplate.send(topic1, message1); pulsarTemplate.send(topic2, message2); } }

6.4 死信队列

@PulsarListener( subscriptionName = "dlq-subscription", topics = "main-topic", deadLetterPolicy = @DeadLetterPolicy( maxRedeliverCount = 3, deadLetterTopic = "dead-letter-topic" ) ) public void receiveMessage(String message) { throw new RuntimeException("Simulated processing failure"); } @Component public class DeadLetterConsumer { @PulsarListener(subscriptionName = "dlq-consumer", topics = "dead-letter-topic") public void processDeadLetter(String message) { System.out.println("Processing dead letter: " + message); } }

七、Schema 支持

7.1 JSON Schema

public class User { private String name; private int age; // Getters and Setters } @Service public class JsonProducerService { private final PulsarTemplate<User> pulsarTemplate; public JsonProducerService(PulsarTemplate<User> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void sendUser(User user) { pulsarTemplate.send("user-topic", user); } } @Component public class JsonConsumer { @PulsarListener(subscriptionName = "json-consumer", topics = "user-topic") public void receiveUser(User user) { System.out.println("Received user: " + user.getName()); } }

7.2 Avro Schema

@Data public class Order { private String orderId; private String productId; private int quantity; private double totalAmount; } @Configuration public class AvroSchemaConfig { @Bean public Schema<Order> orderSchema() { return Schema.AVRO(Order.class); } }

八、监控与管理

8.1 指标监控

@Component public class PulsarMetrics { private final PulsarClient pulsarClient; public PulsarMetrics(PulsarClient pulsarClient) { this.pulsarClient = pulsarClient; } public Map<String, Object> getProducerStats(String topic) { Map<String, Object> stats = new HashMap<>(); // 获取生产者统计信息 return stats; } public Map<String, Object> getConsumerStats(String topic) { Map<String, Object> stats = new HashMap<>(); // 获取消费者统计信息 return stats; } }

8.2 健康检查

@Component public class PulsarHealthIndicator implements HealthIndicator { private final PulsarClient pulsarClient; public PulsarHealthIndicator(PulsarClient pulsarClient) { this.pulsarClient = pulsarClient; } @Override public Health health() { try { pulsarClient.getPartitionsForTopic("health-check-topic"); return Health.up().withDetail("pulsar", "connected").build(); } catch (Exception e) { return Health.down().withDetail("pulsar", "disconnected") .withDetail("error", e.getMessage()).build(); } } }

九、实战案例:订单消息系统

@Service public class OrderMessageService { private final PulsarTemplate<OrderEvent> pulsarTemplate; public OrderMessageService(PulsarTemplate<OrderEvent> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void publishOrderCreated(Order order) { OrderEvent event = new OrderEvent(); event.setOrderId(order.getId()); event.setEventType("ORDER_CREATED"); event.setPayload(order); pulsarTemplate.send("order-events", event); } public void publishOrderPaid(Long orderId) { OrderEvent event = new OrderEvent(); event.setOrderId(orderId); event.setEventType("ORDER_PAID"); pulsarTemplate.send("order-events", event); } } @Component public class OrderEventListener { private final InventoryService inventoryService; private final NotificationService notificationService; public OrderEventListener(InventoryService inventoryService, NotificationService notificationService) { this.inventoryService = inventoryService; this.notificationService = notificationService; } @PulsarListener(subscriptionName = "order-event-listener", topics = "order-events") public void handleOrderEvent(OrderEvent event) { switch (event.getEventType()) { case "ORDER_CREATED" -> inventoryService.reserveStock(event.getOrderId()); case "ORDER_PAID" -> notificationService.sendOrderConfirmation(event.getOrderId()); default -> System.out.println("Unknown event type: " + event.getEventType()); } } }

十、总结

本文详细介绍了 Spring Boot 与 Apache Pulsar 的集成实践:

  1. 环境配置:依赖引入、连接配置、生产者/消费者工厂配置
  2. 生产者实现:同步/异步消息发送、批量发送、消息属性设置
  3. 消费者实现:注解式监听、手动确认、批量消费
  4. 消息模式:点对点、发布/订阅、请求/回复
  5. 高级特性:分区、压缩、事务、死信队列
  6. Schema 支持:JSON、Avro 等数据格式
  7. 监控管理:指标收集、健康检查

通过本文的学习,读者可以掌握 Spring Boot 与 Pulsar 集成的核心技能,能够构建高效、可靠的消息系统。在实际项目中,需要根据业务需求选择合适的消息模式和配置参数,以达到最佳的性能和可靠性。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询