pulsar 是一种高效的服务器到服务器消息系统,具有多租户和高性能等特点,最初由 yahoo 开发,现由 apache 软件基金会管理。它是 apache 的顶级项目,定位为下一代云原生分布式消息流平台,融合了消息传递、存储和轻量级函数计算功能,采用计算与存储分离的架构设计,支持多租户、持久化存储、跨区域数据复制,具备强一致性、高吞吐、低延迟和高扩展性等流数据存储特性,被视为云原生时代实时消息流传输、存储和计算的理想解决方案。
Pulsar 的特性包括:
Pulsar 的架构主要包括以下组件:
Pulsar 支持四种订阅模式:
下载和安装 Pulsar 2.9.1 版本后,可以在 Linux 服务器上解压并启动单机版 Pulsar。使用命令行可以启动和终止 Pulsar 服务。
在 Spring Boot 中集成 Pulsar 需要以下步骤:
引入 Maven 依赖:
<dependency> <groupId>io.github.majusko</groupId> <artifactId>pulsar-java-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency>
配置 application.yml:
pulsar: service-url: pulsar://192.168.0.105:6650
创建 Pulsar 配置类:
@Configuration public class PulsarConfig { @Bean public ProducerFactory producerFactory() { return new ProducerFactory().addProducer("testTopic", String.class); } }
定义 Topic 名称常量类:
public class TopicName { private TopicName(){} public static final String TEST_TOPIC = "testTopic"; }
创建消息生产者类:
@Component public class PulsarProducer<T> { @Resource private PulsarTemplate<T> template; public void send(String topic, T message) { try { template.send(topic, message); } catch (PulsarClientException e) { e.printStackTrace(); } } }
创建消息消费者类:
@Component public class TestTopicPulsarConsumer { private static final Logger log = LoggerFactory.getLogger(TestTopicPulsarConsumer.class); @PulsarConsumer(topic = TopicName.TEST_TOPIC, subscriptionType = SubscriptionType.Shared, clazz = String.class) public void consume(String message) { log.info("PulsarRealConsumer content:{}", message); } }
创建 PulsarController 测试发送消息:
@RestController @RequestMapping("/pulsar") public class PulsarController { @Resource private PulsarProducer<String> pulsarProducer; @PostMapping(value = "/sendMessage") public CommonResponse<String> sendMessage(@RequestParam(name = "message") String message) { pulsarProducer.send(TopicName.TEST_TOPIC, message); return CommonResponse.success("done"); } }
定义公共响应体类:
public class CommonResponse<T> { private String code; private Boolean success; private T data; public static <T> CommonResponse<T> success(T t){ return new CommonResponse<>("200",true,t); } public CommonResponse(String code, Boolean success, T data) { this.code = code; this.success = success; this.data = data; } //getter、setter方法 }
启动项目后,可以使用 Postman 测试消息发送和接收功能。
以上就是Pulsar中间件入门学习的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号