MyException - 我的异常网
当前位置:我的异常网» 开源软件 » RocketMQ最佳实践(3)开发spring-boot-starter-roc

RocketMQ最佳实践(3)开发spring-boot-starter-rocketmq实现与spring boot项目的整合

www.MyException.Cn  网友分享于:2013-10-22  浏览:0次
RocketMQ最佳实践(三)开发spring-boot-starter-rocketmq实现与spring boot项目的整合
最近在使用spring boot/spring cloud搭建做微服务架构,发现spring boot官方提供的starter中居然没有集成RocketMQ惊讶word天,顿时激发我的创作基情啊有木有大笑
上面这张截图来自spring boot官方文档,为啥官方提供了JMS、AMQP和Kafka却偏偏少了RocketMQ呢,我认为是因为目前RocketMQ在国外并不普及,而且才捐献给apache不久,需要一段时间,那么如此看来,写一个spring-boot-starter-rocketmq还是比较有意义的。
 
but,本人水平毕竟有限,写的东西自然没法和spring相比,这个版本的starter参考了JMS的starter来封装,虽然不够尽善尽美,但还是极具实用价值的微笑

编写spring-boot-starter-rocketmq

 

 

创建一个Maven项目名字就叫spring-boot-starter-rocketmq,其pom.xml文件内容如下:

 

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  3.     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  4.   
  5.     <groupId>com.bqjr</groupId>  
  6.     <version>0.0.1-SNAPSHOT</version>  
  7.     <name>spring-boot-starter-rocketmq</name>  
  8.     <description>Starter for using RocketMQ</description>  
  9.   
  10.     <parent>  
  11.         <groupId>org.springframework.boot</groupId>  
  12.         <artifactId>spring-boot-starter-parent</artifactId>  
  13.         <version>1.5.3.RELEASE</version>  
  14.         <relativePath/>  
  15.     </parent>  
  16.     <properties>  
  17.         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
  18.         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>  
  19.         <java.version>1.8</java.version>  
  20.         <rocketmq.version>4.0.0-incubating</rocketmq.version>  
  21.     </properties>  
  22.       
  23.     <modelVersion>4.0.0</modelVersion>  
  24.     <artifactId>spring-boot-starter-rocketmq</artifactId>  
  25.   
  26.   
  27.     <dependencies>  
  28.         <dependency>  
  29.             <groupId>org.springframework.boot</groupId>  
  30.             <artifactId>spring-boot-starter</artifactId>  
  31.         </dependency>  
  32.         <!-- RocketMq客户端相关依赖 -->  
  33.         <dependency>  
  34.             <groupId>org.apache.rocketmq</groupId>  
  35.             <artifactId>rocketmq-client</artifactId>  
  36.             <version>${rocketmq.version}</version>  
  37.         </dependency>  
  38.         <dependency>  
  39.             <groupId>org.apache.rocketmq</groupId>  
  40.             <artifactId>rocketmq-common</artifactId>  
  41.             <version>${rocketmq.version}</version>  
  42.         </dependency>  
  43.           
  44.         <dependency>  
  45.             <groupId>org.projectlombok</groupId>  
  46.             <artifactId>lombok</artifactId>  
  47.             <version>1.16.10</version><!--$NO-MVN-MAN-VER$-->  
  48.         </dependency>  
  49.     </dependencies>  
  50.   
  51.   
  52. </project>  

编写配置类RocketmqProperties,这个类的属性对应application.properties文件中的配置项,目前只提供核心的一些配置支持,其他性能优化方面的配置参数可自行扩展

 

 

[java] view plain copy
 
  1. /** 
  2.  * @author jiangjb 
  3.  */  
  4. @Data  
  5. @ConfigurationProperties(PREFIX)  
  6. public class RocketmqProperties {  
  7.   
  8.     public static final String PREFIX = "spring.extend.rocketmq";  
  9.   
  10.     private String namesrvAddr;  
  11.     private String instanceName;  
  12.     private String clientIP;  
  13.     private ProducerConfig producer;  
  14.     private ConsumerConfig consumer;  
  15.       
  16. }  

编写配置解析类RocketmqAutoConfiguration,这个类主要初始化了三个Bean:defaultProducer用来发送普通消息、transactionProducer用来发送事务消息以及pushConsumer用来接收订阅的所有topic下的消息,并派发给不同的tag的消费者。

[java] view plain copy
 
  1. /** 
  2.  * @author jiangjb 
  3.  */  
  4. @Configuration  
  5. @EnableConfigurationProperties(RocketmqProperties.class)  
  6. @ConditionalOnProperty(prefix = PREFIX, value = "namesrvAddr")  
  7. public class RocketmqAutoConfiguration {  
  8.       
  9.     @Autowired  
  10.     private RocketmqProperties properties;  
  11.       
  12.     @Value("${spring.application.name}")  
  13.     private String producerGroupName;  
  14.       
  15.     @Value("${spring.application.name}")  
  16.     private String consumerGroupName;  
  17.       
  18.     @Autowired  
  19.     private ApplicationEventPublisher publisher;  
  20.     /** 
  21.      * 初始化向rocketmq发送普通消息的生产者 
  22.      */  
  23.     @Bean  
  24.     @ConditionalOnProperty(prefix = PREFIX, value = "producer.instanceName")  
  25.     public DefaultMQProducer defaultProducer() throws MQClientException{  
  26.         /** 
  27.          * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  28.          * 注意:ProducerGroupName需要由应用来保证唯一<br> 
  29.          * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, 
  30.          * 因为服务器会回查这个Group下的任意一个Producer 
  31.          */  
  32.         DefaultMQProducer producer = new DefaultMQProducer(producerGroupName);  
  33.         producer.setNamesrvAddr(properties.getNamesrvAddr());  
  34.         producer.setInstanceName(properties.getProducer().getInstanceName());  
  35.         producer.setVipChannelEnabled(false);  
  36.   
  37.         /** 
  38.          * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  39.          * 注意:切记不可以在每次发送消息时,都调用start方法 
  40.          */  
  41.         producer.start();  
  42.         System.out.println("RocketMq defaultProducer Started.");  
  43.         return producer;  
  44.     }  
  45.       
  46.     /** 
  47.      * 初始化向rocketmq发送事务消息的生产者 
  48.      */  
  49.     @Bean  
  50.     @ConditionalOnProperty(prefix = PREFIX, value = "producer.tranInstanceName")  
  51.     public TransactionMQProducer transactionProducer() throws MQClientException{  
  52.         /** 
  53.          * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  54.          * 注意:ProducerGroupName需要由应用来保证唯一<br> 
  55.          * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键, 
  56.          * 因为服务器会回查这个Group下的任意一个Producer 
  57.          */  
  58.         TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroupName");  
  59.         producer.setNamesrvAddr(properties.getNamesrvAddr());  
  60.         producer.setInstanceName(properties.getProducer().getTranInstanceName());  
  61.           
  62.         // 事务回查最小并发数  
  63.         producer.setCheckThreadPoolMinSize(2);  
  64.         // 事务回查最大并发数  
  65.         producer.setCheckThreadPoolMaxSize(2);  
  66.         // 队列数  
  67.         producer.setCheckRequestHoldMax(2000);  
  68.           
  69.         //TODO 由于社区版本的服务器阉割调了消息回查的功能,所以这个地方没有意义  
  70.         //TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
  71.         //producer.setTransactionCheckListener(transactionCheckListener);  
  72.           
  73.         /** 
  74.          * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  75.          * 注意:切记不可以在每次发送消息时,都调用start方法 
  76.          */  
  77.         producer.start();  
  78.           
  79.         System.out.println("RocketMq TransactionMQProducer Started.");  
  80.         return producer;  
  81.     }  
  82.     /** 
  83.      * 初始化rocketmq消息监听方式的消费者 
  84.      */  
  85.     @Bean  
  86.     @ConditionalOnProperty(prefix = PREFIX, value = "consumer.instanceName")  
  87.     public DefaultMQPushConsumer pushConsumer() throws MQClientException{  
  88.         /** 
  89.          * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br> 
  90.          * 注意:ConsumerGroupName需要由应用来保证唯一 
  91.          */  
  92.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroupName);  
  93.         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  94.         consumer.setNamesrvAddr(properties.getNamesrvAddr());  
  95.         consumer.setInstanceName(properties.getConsumer().getInstanceName());  
  96.         consumer.setConsumeMessageBatchMaxSize(1);//设置批量消费,以提升消费吞吐量,默认是1  
  97.           
  98.           
  99.         /** 
  100.          * 订阅指定topic下tags 
  101.          */  
  102.         List<String> subscribeList = properties.getConsumer().getSubscribe();  
  103.         for (String sunscribe : subscribeList) {  
  104.             consumer.subscribe(sunscribe.split(":")[0], sunscribe.split(":")[1]);  
  105.         }  
  106.           
  107.          consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {  
  108.   
  109.             MessageExt msg = msgs.get(0);  
  110.               
  111.             try {  
  112.                 //默认msgs里只有一条消息,可以通过设置consumeMessageBatchMaxSize参数来批量接收消息  
  113.                 System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());  
  114.                 //发布消息到达的事件,以便分发到每个tag的监听方法  
  115.                 this.publisher.publishEvent(new RocketmqEvent(msg,consumer));   
  116.                 System.out.println("消息到达事件已经发布成功!");  
  117.             } catch (Exception e) {  
  118.                 e.printStackTrace();  
  119.                 if(msg.getReconsumeTimes()<=3){//重复消费3次  
  120.                     //TODO 进行日志记录  
  121.                     return ConsumeConcurrentlyStatus.RECONSUME_LATER;  
  122.                 } else {  
  123.                     //TODO 消息消费失败,进行日志记录  
  124.                 }  
  125.             }  
  126.               
  127.             //如果没有return success,consumer会重复消费此信息,直到success。  
  128.             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
  129.         });  
  130.   
  131.      new Thread(new Runnable() {  
  132.         @Override  
  133.         public void run() {  
  134.             try {  
  135.                 Thread.sleep(5000);//延迟5秒再启动,主要是等待spring事件监听相关程序初始化完成,否则,回出现对RocketMQ的消息进行消费后立即发布消息到达的事件,然而此事件的监听程序还未初始化,从而造成消息的丢失  
  136.                 /** 
  137.                  * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br> 
  138.                  */  
  139.                 try {  
  140.                     consumer.start();  
  141.                 } catch (Exception e) {  
  142.                     System.out.println("RocketMq pushConsumer Start failure!!!.");  
  143.                     e.printStackTrace();  
  144.                 }  
  145.                   
  146.                 System.out.println("RocketMq pushConsumer Started.");  
  147.   
  148.             } catch (InterruptedException e) {  
  149.                 e.printStackTrace();  
  150.             }  
  151.         }  
  152.   
  153.         }).start();  
  154.           
  155.         return consumer;  
  156.     }  
  157.   
  158. }  

编写基于spring事件传播机制的事件类RocketmqEvent,用来定义上面的consumer接收到消息后的发布的事件。

 

 

[java] view plain copy
 
  1. /** 
  2.  *  
  3.  * @author jiangjb 
  4.  * 
  5.  */  
  6. @Data  
  7. @EqualsAndHashCode(callSuper=false)  
  8. public class RocketmqEvent extends ApplicationEvent{  
  9.     private static final long serialVersionUID = -4468405250074063206L;  
  10.       
  11.     private DefaultMQPushConsumer consumer;  
  12.     private MessageExt messageExt;  
  13.     private String topic;  
  14.     private String tag;  
  15.     private byte[] body;  
  16.       
  17.     public RocketmqEvent(MessageExt msg,DefaultMQPushConsumer consumer) throws Exception {  
  18.         super(msg);  
  19.         this.topic = msg.getTopic();  
  20.         this.tag = msg.getTags();  
  21.         this.body = msg.getBody();  
  22.         this.consumer = consumer;  
  23.         this.messageExt = msg;  
  24.     }  
  25.   
  26.     public String getMsg() {  
  27.         try {  
  28.             return new String(this.body,"utf-8");  
  29.         } catch (UnsupportedEncodingException e) {  
  30.             return null;  
  31.         }  
  32.     }  
  33.       
  34.     public String getMsg(String code) {  
  35.         try {  
  36.             return new String(this.body,code);  
  37.         } catch (UnsupportedEncodingException e) {  
  38.             return null;  
  39.         }  
  40.     }  
  41.       
  42. }  

然后运行maven的编译、打包

 

编写测试项目rocketmq-starter-test

pom.xml中加入上面的starter的依赖
[html] view plain copy
 
  1. <dependency>  
  2.     <groupId>com.bqjr</groupId>  
  3.     <artifactId>spring-boot-starter-rocketmq</artifactId>  
  4.     <version>0.0.1-SNAPSHOT</version>  
  5. </dependency>  
发送消息测试类producerDemo
[java] view plain copy
 
  1. /** 
  2.  *  
  3.  * @author jiangjb 
  4.  * 
  5.  */  
  6. @RestController  
  7. public class producerDemo {  
  8.       
  9.     @Autowired  
  10.     private DefaultMQProducer defaultProducer;  
  11.       
  12.     @Autowired  
  13.     private TransactionMQProducer transactionProducer;  
  14.       
  15.     @Value("${spring.extend.rocketmq.producer.topic}")  
  16.     private String producerTopic;  
  17.       
  18.     @RequestMapping(value = "/sendMsg", method = RequestMethod.GET)  
  19.     public void sendMsg() {  
  20.          Message msg = new Message(producerTopic,// topic  
  21.                  "TagA",// tag  
  22.                  "OrderID001",// key  
  23.                  ("Hello jyqlove333").getBytes());// body  
  24.          try {  
  25.             defaultProducer.send(msg,new SendCallback(){  
  26.                   
  27.                 @Override  
  28.                 public void onSuccess(SendResult sendResult) {  
  29.                      System.out.println(sendResult);  
  30.                      //TODO 发送成功处理  
  31.                 }  
  32.                   
  33.                 @Override  
  34.                 public void onException(Throwable e) {  
  35.                      System.out.println(e);  
  36.                     //TODO 发送失败处理  
  37.                 }  
  38.             });  
  39.         } catch (Exception e) {  
  40.             e.printStackTrace();  
  41.         }  
  42.     }  
  43.       
  44.     @RequestMapping(value = "/sendTransactionMsg", method = RequestMethod.GET)  
  45.     public String sendTransactionMsg() {  
  46.         SendResult sendResult = null;  
  47.         try {  
  48.             //构造消息  
  49.             Message msg = new Message(producerTopic,// topic  
  50.                     "TagA",// tag  
  51.                     "OrderID001",// key  
  52.                     ("Hello jyqlove333").getBytes());// body  
  53.               
  54.             //发送事务消息,LocalTransactionExecute的executeLocalTransactionBranch方法中执行本地逻辑  
  55.             sendResult = transactionProducer.sendMessageInTransaction(msg, (Message msg1,Object arg) -> {  
  56.                 int value = 1;  
  57.                   
  58.                 //TODO 执行本地事务,改变value的值  
  59.                 //===================================================  
  60.                 System.out.println("执行本地事务。。。完成");  
  61.                 if(arg instanceof Integer){  
  62.                     value = (Integer)arg;  
  63.                 }  
  64.                 //===================================================  
  65.                   
  66.                 if (value == 0) {  
  67.                     throw new RuntimeException("Could not find db");  
  68.                 } else if ((value % 5) == 0) {  
  69.                     return LocalTransactionState.ROLLBACK_MESSAGE;  
  70.                 } else if ((value % 4) == 0) {  
  71.                     return LocalTransactionState.COMMIT_MESSAGE;  
  72.                 }  
  73.                 return LocalTransactionState.ROLLBACK_MESSAGE;  
  74.             }, 4);  
  75.             System.out.println(sendResult);  
  76.         } catch (Exception e) {  
  77.             e.printStackTrace();  
  78.         }  
  79.         return sendResult.toString();  
  80.     }  
  81. }  

消费消息测试类consumerDemo
[java] view plain copy
 
  1. /** 
  2.  *  
  3.  * @author jiangjb 
  4.  * 
  5.  */  
  6. @Component  
  7. public class consumerDemo {  
  8.       
  9.     @EventListener(condition = "#event.topic=='TopicTest1' && #event.tag=='TagA'")  
  10.     public void rocketmqMsgListen(RocketmqEvent event) {  
  11.         DefaultMQPushConsumer consumer = event.getConsumer();  
  12.         try {  
  13.             System.out.println("com.bqjr.consumerDemo监听到一个消息达到:" + event.getMsg("gbk"));  
  14.             //TODO 进行业务处理  
  15.         } catch (Exception e) {  
  16.             if(event.getMessageExt().getReconsumeTimes()<=3){//重复消费3次  
  17.                 try {  
  18.                     consumer.sendMessageBack(event.getMessageExt(), 2);  
  19.                 } catch (Exception e1) {  
  20.                     //TODO 消息消费失败,进行日志记录  
  21.                 }  
  22.             } else {  
  23.                 //TODO 消息消费失败,进行日志记录  
  24.                   
  25.             }  
  26.         }  
  27.     }  
  28. }  

来,测试一把

在浏览器中访问:http://10.89.0.144:12306/sendMsg,控制台输出如下:

再测试一下消费者,在RocketMQ控制台(RocketMQ控制台的介绍放到下一篇吧微笑)发送一条消息
查看控制台打印的消费日志
恭喜你,成功了。大笑


补充说明:

 

        本来想自定义一个叫RocketmqListener的注解来实现消息的监听的,花了大量时间去阅读和研究了spring关于EventListener注解和JmsListener注解的实现,发现目前我并不能很好的理解和掌控其设计思路,想以瓢画葫最终也没能实现,迫于五一节来临,只能使用EventListener注解代替,不过发现其实也不错。

 

http://blog.csdn.net/jayjjb/article/details/70906511

文章评论

程序员的鄙视链
程序员的鄙视链
那些争议最大的编程观点
那些争议最大的编程观点
亲爱的项目经理,我恨你
亲爱的项目经理,我恨你
一个程序员的时间管理
一个程序员的时间管理
中美印日四国程序员比较
中美印日四国程序员比较
5款最佳正则表达式编辑调试器
5款最佳正则表达式编辑调试器
旅行,写作,编程
旅行,写作,编程
程序员周末都喜欢做什么?
程序员周末都喜欢做什么?
我跳槽是因为他们的显示器更大
我跳槽是因为他们的显示器更大
Java程序员必看电影
Java程序员必看电影
Web开发人员为什么越来越懒了?
Web开发人员为什么越来越懒了?
不懂技术不要对懂技术的人说这很容易实现
不懂技术不要对懂技术的人说这很容易实现
做程序猿的老婆应该注意的一些事情
做程序猿的老婆应该注意的一些事情
初级 vs 高级开发者 哪个性价比更高?
初级 vs 高级开发者 哪个性价比更高?
漫画:程序员的工作
漫画:程序员的工作
老程序员的下场
老程序员的下场
程序员都该阅读的书
程序员都该阅读的书
代码女神横空出世
代码女神横空出世
10个调试和排错的小建议
10个调试和排错的小建议
为啥Android手机总会越用越慢?
为啥Android手机总会越用越慢?
要嫁就嫁程序猿—钱多话少死的早
要嫁就嫁程序猿—钱多话少死的早
程序员和编码员之间的区别
程序员和编码员之间的区别
程序猿的崛起——Growth Hacker
程序猿的崛起——Growth Hacker
我是如何打败拖延症的
我是如何打败拖延症的
如何区分一个程序员是“老手“还是“新手“?
如何区分一个程序员是“老手“还是“新手“?
“懒”出效率是程序员的美德
“懒”出效率是程序员的美德
聊聊HTTPS和SSL/TLS协议
聊聊HTTPS和SSL/TLS协议
 程序员的样子
程序员的样子
编程语言是女人
编程语言是女人
科技史上最臭名昭著的13大罪犯
科技史上最臭名昭著的13大罪犯
10个帮程序员减压放松的网站
10个帮程序员减压放松的网站
2013年美国开发者薪资调查报告
2013年美国开发者薪资调查报告
什么才是优秀的用户界面设计
什么才是优秀的用户界面设计
Web开发者需具备的8个好习惯
Web开发者需具备的8个好习惯
鲜为人知的编程真相
鲜为人知的编程真相
当下全球最炙手可热的八位少年创业者
当下全球最炙手可热的八位少年创业者
团队中“技术大拿”并非越多越好
团队中“技术大拿”并非越多越好
Java 与 .NET 的平台发展之争
Java 与 .NET 的平台发展之争
十大编程算法助程序员走上高手之路
十大编程算法助程序员走上高手之路
程序员应该关注的一些事儿
程序员应该关注的一些事儿
“肮脏的”IT工作排行榜
“肮脏的”IT工作排行榜
程序员眼里IE浏览器是什么样的
程序员眼里IE浏览器是什么样的
那些性感的让人尖叫的程序员
那些性感的让人尖叫的程序员
看13位CEO、创始人和高管如何提高工作效率
看13位CEO、创始人和高管如何提高工作效率
程序员最害怕的5件事 你中招了吗?
程序员最害怕的5件事 你中招了吗?
每天工作4小时的程序员
每天工作4小时的程序员
我的丈夫是个程序员
我的丈夫是个程序员
老美怎么看待阿里赴美上市
老美怎么看待阿里赴美上市
如何成为一名黑客
如何成为一名黑客
软件开发程序错误异常ExceptionCopyright © 2009-2015 MyException 版权所有