SpringBoot整合Pulsar的实现示例

SpringBoot整合Pulsar的实现示例

目录

一、添加pom.xml依赖

二、Pulsar 参数类

三、Pulsar 配置类

四、不同消费数据类型的监听器

五、Pulsar的核心服务类

六、Pulsar整合Spring Cloud

一、添加pom.xml依赖 <parent>     <groupId>org.springframework.boot</groupId>     <artifactId>spring-boot-starter-parent</artifactId>     <version>2.7.0</version> </parent> <dependencies>     <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-web</artifactId>     </dependency>     <dependency>         <groupId>org.apache.pulsar</groupId>         <artifactId>pulsar-client</artifactId>         <version>2.10.0</version>     </dependency>     <dependency>         <groupId>org.projectlombok</groupId>         <artifactId>lombok</artifactId>         <version>1.18.24</version>         <scope>provided</scope>     </dependency> </dependencies> <build>     <plugins>         <plugin>             <groupId>org.apache.maven.plugins</groupId>             <artifactId>maven-compiler-plugin</artifactId>             <configuration>                 <source>8</source>                 <target>8</target>             </configuration>         </plugin>     </plugins> </build>     二、Pulsar 参数类 import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Map; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:32  * @Description: Pulsar 参数类  */ @Component @ConfigurationProperties(prefix = "tdmq.pulsar") @Data public class PulsarProperties {     /**      * 接入地址      */     private String serviceurl;     /**      * 命名空间tdc      */     private String tdcNamespace;     /**      * 角色tdc的token      */     private String tdcToken;     /**      * 集群name      */     private String cluster;     /**      * topicMap      */     private Map<String, String> topicMap;     /**      * 订阅      */     private Map<String, String> subMap;     /**      * 开关 on:Consumer可用 ||||| off:Consumer断路      */     private String onOff; } 三、Pulsar 配置类 import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:33  * @Description: Pulsar 配置类  */ @Configuration @EnableConfigurationProperties(PulsarProperties.class) public class PulsarConfig {     @Autowired     PulsarProperties pulsarProperties;     @Bean     public PulsarClient getPulsarClient() {         try {             return PulsarClient.builder()                     .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))                     .serviceUrl(pulsarProperties.getServiceurl())                     .build();         } catch (PulsarClientException e) {             System.out.println(e);             throw new RuntimeException("初始化Pulsar Client失败");         }     } } 四、不同消费数据类型的监听器 import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:37  * @Description:  */ @Component public class UserMessageListener implements MessageListener<User> {     @Override     public void received(Consumer<User> consumer, Message<User> msg) {         try {             User user = msg.getValue();             System.out.println(user);             consumer.acknowledge(msg);         } catch (Exception e) {             consumer.negativeAcknowledge(msg);         }     } } import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:37  * @Description:  */ @Component public class StringMessageListener implements MessageListener<String> {     @Override     public void received(Consumer<String> consumer, Message<String> msg) {         try {             System.out.println(msg.getValue());             consumer.acknowledge(msg);         } catch (Exception e) {             consumer.negativeAcknowledge(msg);         }     } } 五、Pulsar的核心服务类 import com.yibo.pulsar.common.listener.StringMessageListener; import com.yibo.pulsar.common.listener.UserMessageListener; import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:35  * @Description: Pulsar的核心服务类  */ @Component public class PulsarCommon {     @Autowired     private PulsarProperties pulsarProperties;     @Autowired     private PulsarClient client;     @Autowired     private UserMessageListener userMessageListener;     @Autowired     private StringMessageListener stringMessageListener;     /**      * 创建一个生产者       * @param topic     topic name      * @param schema    schema方式      * @param <T>       泛型      * @return          Producer生产者      */     public <T> Producer<T> createProducer(String topic, Schema<T> schema) {         try {             return client.newProducer(schema)                     .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                     .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)                     .sendTimeout(10, TimeUnit.SECONDS)                     .blockIfQueueFull(true)                     .create();         } catch (PulsarClientException e) {             throw new RuntimeException("初始化Pulsar Producer失败");         }     }     /**      *       * @param topic             topic name      * @param subscription      sub name      * @param messageListener   MessageListener的自定义实现类      * @param schema            schema消费方式      * @param <T>               泛型      * @return                  Consumer消费者      */     public <T> Consumer<T> createConsumer(String topic, String subscription,                                    MessageListener<T> messageListener, Schema<T> schema) {         try {             return client.newConsumer(schema)                     .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)                     .subscriptionName(subscription)                     .ackTimeout(10, TimeUnit.SECONDS)                     .subscriptionType(SubscriptionType.Shared)                     .messageListener(messageListener)                     .subscribe();         } catch (PulsarClientException e) {             throw new RuntimeException("初始化Pulsar Consumer失败");         }     }     /**      * 异步发送一条消息      * @param message       消息体      * @param producer      生产者实例      * @param <T>           消息泛型      */     public <T> void sendAsyncMessage(T message, Producer<T> producer) {         producer.sendAsync(message).thenAccept(msgId -> {         });     }     /**      * 同步发送一条消息      * @param message       消息体      * @param producer      生产者实例      * @param <T>           泛型      * @throws PulsarClientException      */     public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {         MessageId send = producer.send(message);         System.out.println();         System.out.println();         System.out.println();         System.out.println();         System.out.println(send);     }     //-----------consumer-----------     @Bean(name = "comment-publish-topic-consumer")     public Consumer<String> getCommentPublishTopicConsumer() {         return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),                 pulsarProperties.getSubMap().get("comment-publish-topic-test"),                 stringMessageListener, Schema.STRING);     }     @Bean(name = "reply-publish-topic-consumer")     public Consumer<User> getReplyPublishTopicConsumer() {         return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),                 pulsarProperties.getSubMap().get("reply-publish-topic-test"),                 userMessageListener, AvroSchema.of(User.class));     }     //-----------producer-----------     @Bean(name = "comment-publish-topic-producer")     public Producer<String> getCommentPublishTopicProducer() {         return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);     }     @Bean(name = "reply-publish-topic-producer")     public Producer<User> getReplyPublishTopicProducer() {         return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));     } } 六、Pulsar整合Spring Cloud

后来发现如上代码会导致BUG-> 在更新Nacos配置之后 Consumer会挂掉
经排查发现结果是由于@RefreshScope注解导致,此注解将摧毁Bean,PulsarConsumer和Producer都将被摧毁,只是说Producer将在下⼀次调⽤中完成重启,Consumer则不能重启,因为没有调⽤,那么怎么解决呢?

就是发布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; /**  * @Author: huangyibo  * @Date: 2022/5/28 2:34  * @Description:  */ @Component @Slf4j public class RefreshPulsarListener implements ApplicationListener {     @Autowired     ApplicationContext applicationContext;     @Override     public void onApplicationEvent(ApplicationEvent event) {         if (event.getSource().equals("__refreshAll__")) {             log.info("Nacos配置中心配置修改 重启Pulsar====================================");             log.info("重启PulsarClient,{}", applicationContext.getBean("getPulsarClient"));             log.info("重启PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));             log.info("重启PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));         }     } }

参考:

https://wenku.baidu.com/view/4d3337ab6b0203d8ce2f0066f5335a8102d266a7.html

https://gitee.com/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/

https://blog.csdn.net/weixin_56227932/article/details/122897075

http://www.zzvips.com/article/219361.html

https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ

到此这篇关于SpringBoot整合Pulsar的实现示例的文章就介绍到这了,更多相关SpringBoot整合Pulsar内容请搜索易知道(ezd.cc)以前的文章或继续浏览下面的相关文章希望大家以后多多支持易知道(ezd.cc)!

推荐阅读

    学习写字楼新选择6000元主流配置

    学习写字楼新选择6000元主流配置,,这种配置需要考虑双核心的办公和娱乐平台,充分考虑办公室的办公需求和娱乐需求,以约6000元的预算和cost-e

    酷睿I7 配置

    酷睿I7 配置,配置,玩家国度啦华硕 Rampage II Extreme(3800元)如果米不够,也可以把Extreme改为Gene,不过是小板内存推荐金士顿6G DDR3 2000骇

    提高3A四核羿龙II游戏配置的性能

    提高3A四核羿龙II游戏配置的性能,,以节能环保为主题的IT产业,目前3A低端平台处理器、主板芯片组、独立开发卡性能突出,特别是在与AMD的处理

    opporeno8参数配置及价格

    opporeno8参数配置及价格,面部,亿元,Oppo的荣誉2020年1月4日,接近屏幕关闭传感器是否支持双卡:支持oppor11splus什么时候上市的Oppo R11S P

    查看配置:酷睿i3530集展示办公平台

    查看配置:酷睿i3530集展示办公平台,,由于时间和精力的关系,我们不可能对所有的配置进行评论,希望我们能理解,我希望我们的评论能在那些需要帮

    3500元超额值学生娱乐结构的优化配置

    3500元超额值学生娱乐结构的优化配置,,作为一个DIY的主流用户领域的学生,每个用户51学生攒机的高峰。因为学生用户没有稳定的收入来源,攒机

    魅蓝note6性能参数有哪些

    魅蓝note6性能参数有哪些,摄像头,蓝牙,魅蓝note6性能参数有哪些魅力蓝色Note6最好拍照。电池寿命更长。蓝色Note6使用高通 snapdragon 625

    金蝶易记账售后服务电话

    金蝶易记账售后服务电话,,1.人工客服电话是多少075客服专线075-5292 22668*QQ公司人工客服专线:075-5292 22668【24小时服务热线】你好一一