zoukankan      html  css  js  c++  java
  • Java操作RocketMQ

    一、目录展示

      

    二、导入依赖

    <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-client</artifactId>
          <version>3.0.10</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.rocketmq</groupId>
          <artifactId>rocketmq-all</artifactId>
          <version>3.0.10</version>
          <type>pom</type>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.1.1</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.1.1</version>
        </dependency>

    三、提供者

    package com.zn.tests;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    /**
     * 生产者
     */
    public class Provider {
        public static void main(String[] args) throws MQClientException {
            //创建一个生产者
            DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
            //设置NameServer地址
            producer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
            //设置生产者实例名称
            producer.setInstanceName("producer");
            //启动生产者
            producer.start();
    
            try {
                //发送消息
                for (int i=1;i<=10;i++){
                    //模拟网络延迟,每秒发送一次MQ
                    Thread.sleep(1000);
                    //创建消息,topic主题名称  tags临时值代表小分类, body代表消息体
                    Message message=new Message("itmayiedu-topic","TagA",("itmayiedu-"+i).getBytes());
                    //发送消息
                    SendResult sendResult=producer.send(message);
                    System.out.println("来了来了:"+sendResult.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.shutdown();
        }
    }

    四、消费者

    package com.zn.tests;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
        public static void main(String[] args) throws MQClientException {
            //创建消费者
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
            //设置NameServer地址
            consumer.setNamesrvAddr("192.168.33.135:9876;192.168.33.136:9876");
            //设置实例名称
            consumer.setInstanceName("consumer");
            //订阅topic
            consumer.subscribe("itmayiedu-topic","TagA");
    
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    //获取消息
                    for (MessageExt messageExt:list){
                        //RocketMQ由于是集群环境,所有产生的消息ID可能会重复
                        System.out.println(messageExt.getMsgId()+"---"+new String(messageExt.getBody()));
                    }
                    //接受消息状态 1.消费成功    2.消费失败   队列还有
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
            System.out.println("consumer Started!");
        }
    }

    五、控制台效果

    提供者:

      

    消费者:

      

    六、RocketMQ控制台效果

    测试前:

      

     测试后:

      

  • 相关阅读:
    Jenkins的插件管理(安装和更新插件)
    [Flutter] MacOS/Windows Flutter 环境走一遍
    [Sw] 使用 Swoole Server task/协程 处理大数据量异步任务时注意
    [Sw] Swoole-4.2.9 可以尝试愉快应用 Swoole 协程
    [PHP] 常备的现代 PHP 项目开发准备
    [SF] Symfony 标准 HttpFoundationRequest 实现分析
    [Linux] umask 从三类人群的权限中拿走权限数字
    [Design] 后端程序的高并发与异步
    [Linux]系统管理: 进程管理(ps/top/pstree/kill/pkill), 工作管理, 系统资源查看, 系统定时任务
    [FE] 有效开展一个前端项目-V2 (vuejs-templates/webpack)
  • 原文地址:https://www.cnblogs.com/Zzzzn/p/12322622.html
Copyright © 2011-2022 走看看