zoukankan      html  css  js  c++  java
  • RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer

    1.  添加依赖

      pom.xml如下:

    复制代码
     <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>4.3.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-srvutil</artifactId>
          <version>4.3.1</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.2.3</version>
        </dependency>
        <dependency>
          <groupId>org.javassist</groupId>
          <artifactId>javassist</artifactId>
            <version>3.23.1-GA</version>
        </dependency>
        <dependency>
          <groupId>io.openmessaging</groupId>
          <artifactId>openmessaging-api</artifactId>
          <version>0.3.0-alpha</version>
        </dependency>
        <dependency>
          <groupId>org.apache.rocketmq</groupId>
          <artifactId>rocketmq-openmessaging</artifactId>
          <version>4.3.1</version>
        </dependency>
    复制代码

    2. Producer 的开发步骤

      1. 实例化Producer Group,如下:

      DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");

      2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:

    producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

      3. 调用start()方法启动:

     producer.start();

      4. 发送消息

    复制代码
     for (int i = 0; i < 10; i++) {
                //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体
                Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult result = producer.send(message);
                System.out.println(result);
            }
    复制代码

      5. 关闭生产者(根据自己需求确定是够需要关闭)

     producer.shutdown();

      完整示例如下:

    复制代码
    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * 创建一个消费者
     */
    public class Producer {
    
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
            //1. 实例化一个producer group
            DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");
    
            //2. 设置namesrvAddr,集群环境多个nameserver用;分割
    
            producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
            //3. 启动
            producer.start();
            // 4. 发送消息
            for (int i = 0; i < 10; i++) {
                //构建实例,第一个参数为topic,第二个参数为tabs,第三个参数为消息体
                Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult result = producer.send(message);
                System.out.println(result);
            }
            //关闭生产者
            producer.shutdown();
    
        }
    }
    复制代码

      使用方式可以说非常简单了。

    3. Consumer开发步骤

      1. 实例化Consumer Group,如下:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");

      2. 设置namesrvAddr,集群环境多个nameserver用;分割,如下:

    producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

      3. 设置从什么位置开始都

    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

      4. 订阅topic.

    consumer.subscribe("MyQuickStartTopic", "*");

      5. 注册消息监听器

     consumer.registerMessageListener();

      6. 重写MessageListenerConcurrently接口的consumeMessage()方法

      完整代码如下:

    复制代码
    package com.wangx.rocketmq.quickstart;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * 创建一个消费者
     */
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //实例化一个consumer组
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
            //设置setNamesrvAddr,同生产者
            consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
    
            //设置消息读取方式,这里设置的是队尾开始读取
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            //设置订阅主题,第二个参数为过滤tabs的条件,可以写为tabA|tabB过滤Tab,*表示接受所有
            consumer.subscribe("MyQuickStartTopic", "*");
    
            //注册消息监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    try {
                        //的到MessageExt
                        MessageExt messageExt = list.get(0);
                        String topic = messageExt.getTopic();
                        String message = new String(messageExt.getBody(),"UTF-8");
                        int queueId = messageExt.getQueueId();
                        System.out.println("收到来自topic:" + topic + ", queueId:" + queueId + "的消息:" + message);
    
                    } catch (Exception e) {
                        //失败,请求稍后重发
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
    
        }
    }
    复制代码

      consumeMessage返回一个枚举的两种状态,成功表示接受成功,否则返回稍后重发的状态。这里注意,启动的时候需要consumer先启动,因为它需要在生产者之前先订阅,否则将会收不到生产在consumer生产的消息,造成消息丢失。

      启动consumer,在启动producer

      producer控制台

      consumer控制台:

      rocketmq-console信息:

    可以看到,我们前面部署的集群环境也是能够实现消息的负载均衡的,会使两个broker上都创建topic,且都能够接收生产者生产的消息。

      进入topic,可以看到新增了两个我们自定义的topic

    可能会出现的问题:

      RemotingTooMuchRequestException: sendDefaultImpl call timeout

      在客户端运行Producer时,可能会出现如上异常,这是因为从 Windows 上开发连接 虚拟机中的 nameServer 时要经过 Linux 系统的防火墙,而防火墙一般都会有超时的机制,在网络连接长时间不传输数据时,会关闭这个 TCP 的会话,关闭后再读写,就有可能导致这个异常。

      解决办法就是关闭防火墙,ubuntu下命令如下:

      

      contOS下命令如下:

      systemctl stop firewalld.service #停止firewall
      systemctl disable firewalld.service #禁止firewall开机启动
      firewall-cmd --state #查看默认防火墙状态(关闭后显示notrunning,开启后显示running)

    原文 RocketMQ学习笔记(6)----RocketMQ的Client的使用 Producer/Consumer

  • 相关阅读:
    pytorch实现BiLSTM+CRF用于NER(命名实体识别)
    pytorch中如何处理RNN输入变长序列padding
    pytorch nn.LSTM()参数详解
    Pytorch的LSTM的理解
    转:pytorch版的bilstm+crf实现sequence label
    【Tensorflow】tf.nn.atrous_conv2d如何实现空洞卷积?膨胀卷积
    iOS iphone5屏幕适配 autosizing
    IOS文件存储小结
    IIS6_IIS7日志文件位置
    xcode中没有autoSizing的设置
  • 原文地址:https://www.cnblogs.com/xiaoshen666/p/10867592.html
Copyright © 2011-2022 走看看