zoukankan      html  css  js  c++  java
  • Java操作RocketMQ

    第一步:导入依赖

     <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>3.0.10</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-all</artifactId>
          <version>3.0.10</version>
          <type>pom</type>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.10</version>
          <scope>test</scope>
     </dependency>
    

      

    第二步:创建生产者

    package com.wish;
    
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    
    public class Producer {
    
        public static void main(String[] args) throws MQClientException {
            //创建一个消息的生产者
            // producerGroup:一般发送同样消息的Producer,归为同一个Group,应用必须设置,并保证命名唯一
            DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
            //设置名称srv地址
            producer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            //实例名称
            producer.setInstanceName("producer");
            //启动
            producer.start();
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000); // 每秒发送一次MQ
                    Message msg = new Message("itmayiedu-topic", // topic 主题名称
                            "TagA", // tag 临时值
                            ("itmayiedu-"+i).getBytes()// body 内容
                    );
                    //send()发送
                    SendResult sendResult = producer.send(msg);
                    //SendResult:发送消息结果
                    System.out.println(sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //关掉
            producer.shutdown();
        }
    
    }
    

      

    第三步:创建消费者

    package com.wish;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建一个消费者
            //consumerGroup:做同样事情的Consumer归为同一个Group,应用必须设置,并保证命名唯一
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
            //设置名称srv地址
            consumer.setNamesrvAddr("192.168.152.55:9876;192.168.152.66:9876");
            //实例名称
            consumer.setInstanceName("consumer");
            //实现订阅
            consumer.subscribe("itmayiedu-topic", "TagA");
            //注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(msg.getMsgId()+"---"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

      

    第四步:分别启动消费者和生产者,查看浏览器

  • 相关阅读:
    收集座右铭
    Yii2查询语句使用不等于号
    使用jQuery获取Bootstrap Switch的值
    wamp 提示 Directive allow_call_time_pass_reference is no longer avaiable in PHP
    解决GitHub添加sshkey仍然无法访问clone远程仓库的问题
    异常-User class threw exception: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
    CDH5.16.1升级kafka0.10到1.0.1
    MacOs桌面自动被打乱的原因
    彻底解决MacOS上应用程序快捷键冲突的问题,自定义快捷键设置
    CDH5.16.1的maven依赖版本查询地址
  • 原文地址:https://www.cnblogs.com/wishsaber/p/12322200.html
Copyright © 2011-2022 走看看