zoukankan      html  css  js  c++  java
  • (三)RocketMq入门之独立线程处理业务

    一、示例代码

    这段代码实现了一个独立线程监听在一个特殊的消息队列上,一旦收到消息就处理并发送给MQ,然后推送给所有的消费者。

     1 import com.alibaba.rocketmq.client.exception.MQBrokerException;
     2 import com.alibaba.rocketmq.client.exception.MQClientException;
     3 import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
     4 import com.alibaba.rocketmq.client.producer.SendResult;
     5 import com.alibaba.rocketmq.client.producer.SendStatus;
     6 import com.alibaba.rocketmq.common.message.Message;
     7 import com.alibaba.rocketmq.remoting.exception.RemotingException;
     8 
     9 import java.util.concurrent.LinkedBlockingQueue;
    10 
    11 
    12 public class ThreadMqProducer implements Runnable {
    13     public static LinkedBlockingQueue queue = new LinkedBlockingQueue();
    14     private DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    15 
    16     public static void main(String[] args) {
    17         ThreadMqProducer tpd = new ThreadMqProducer();
    18         tpd.init();
    19 
    20         Thread t1 = new Thread(tpd);
    21         t1.setName("mq-thread");
    22         t1.start();
    23     }
    24 
    25     public void init() {
    26         producer.setNamesrvAddr("172.18.4.114:9876");
    27         producer.setInstanceName("producer");
    28 
    29         try {
    30             producer.start();
    31         } catch (MQClientException e) {
    32             e.printStackTrace();
    33         }
    34     }
    35 
    36     public void release() {
    37         producer.shutdown();
    38     }
    39 
    40     public int send2MQ(String body) {
    41         Message msg = new Message("TopicA-test", "TagA", body.getBytes());
    42         SendResult sendResult;
    43 
    44         try {
    45             sendResult = producer.send(msg);
    46 
    47             if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
    48                 return 0;
    49             }
    50         } catch (MQClientException e) {
    51             // TODO Auto-generated catch block 
    52             e.printStackTrace();
    53         } catch (RemotingException e) {
    54             // TODO Auto-generated catch block 
    55             e.printStackTrace();
    56         } catch (MQBrokerException e) {
    57             // TODO Auto-generated catch block 
    58             e.printStackTrace();
    59         } catch (InterruptedException e) {
    60             // TODO Auto-generated catch block 
    61             e.printStackTrace();
    62         }
    63 
    64         return -1;
    65     }
    66 
    67     @Override
    68     public void run() {
    69         // TODO Auto-generated method stub 
    70         while (true) {
    71             try {
    72                 String body = queue.take();
    73                 System.out.println(
    74                     "take a message from queue... send notify to rocketmq!");
    75                 send2MQ(body);
    76             } catch (InterruptedException e) {
    77                 // TODO Auto-generated catch block 
    78                 e.printStackTrace();
    79             }
    80         }
    81     }
    82 }
  • 相关阅读:
    如何在原生微信小程序中实现数据双向绑定
    【推荐】开源项目minapp-重新定义微信小程序的开发
    iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 规范与部署
    iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 错误处理
    系列3|走进Node.js之多进程模型
    手把手教你撸一个 Webpack Loader
    iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 记录日志
    React Native 网络层分析
    如何实现VM框架中的数据绑定
    iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 解析JSON
  • 原文地址:https://www.cnblogs.com/yoyotl/p/6782748.html
Copyright © 2011-2022 走看看