zoukankan      html  css  js  c++  java
  • rocketmq学习

    官网地址


    安装name server和broker

    git clone https://github.com/apache/incubator-rocketmq.git
    
    cd incubator-rocketmq
    
    mvn clean package install -Prelease-all assembly:assembly -U
    
    然后target目录下的apache-rocketmq-all就是我们需要的
    
    把apache-rocketmq-all抽出来,移到apache-rocketmq-all目录下
    
    执行nohup sh bin/mqnamesrv & 启动name server
    
    tail -f ~/logs/rocketmqlogs/namesrv.log 查看name server日志
    
    执行nohup sh bin/mqbroker -n localhost:9876 & 启动Broker
    
    tail -f ~/logs/rocketmqlogs/broker.log 查看Broker日志
    

    demo

    添加依赖

    <!--rocketmq-->
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.0.0-incubating</version>
    </dependency>
    

    消费者

    /**
     * @author fengzp
     * @date 2017/3/31下午5:10
     * @email fengzp@gzyitop.com
     * @company 广州易站通计算机科技有限公司
     */
    public class ConsumerMQ {
    
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            /**
             * ConsumerGroupName需要由应用来保证唯一
             */
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupName");
    
            /**
             * 指定服务端和端口
             */
            consumer.setNamesrvAddr("localhost:9876");
    
            /**
             * 订阅指定topic下tags为TagName的消息; "TagA || TagB || TagC" 代表订阅TagA和TagB和TagC的消息; "*" 代表订阅所有消息
             */
            consumer.subscribe("TopicName", "TagName");
    
    
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            /**
             * * 默认msgs里只有一条消息,可以通过consumer.setConsumeMessageBatchMaxSize();来设置批量接收消息
             */
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt msg : list) {
                        /**
                         * msg.getMsgId(); //msg唯一id
                         * msg.getTopic();
                         * msg.getTags();
                         */
                        System.out.println(new String(msg.getBody()));
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }    
    

    提供者

    /**
    * @author fengzp
    * @date 2017/3/31下午5:07
    * @email fengzp@gzyitop.com
    * @company 广州易站通计算机科技有限公司
    */
    public class ProducerMQ {
    
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("GroupName");
    
            producer.setNamesrvAddr("localhost:9876");
            producer.setInstanceName("InstanceName");
            producer.start();
    
            try {
                for (int i = 0; i < 3; i++) {
                    Message msg = new Message("TopicName", "TagName", (new Date() + " fengzp hao shuai a " + i).getBytes());
    
                    SendResult sendResult = producer.send(msg);
    
                    System.out.println(sendResult.getMsgId() + " : " + sendResult.getSendStatus().name());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            /**
             * 应用退出时,需要调用shutdown方法来在MetaQ服务器上注销自己
             */
            producer.shutdown();
        }
    }
    

    测试

    先启动消费者,然后启动提供者

    提供者:

    消费者:

    消息成功发送,并且触发了订阅。



    这里有些不错的介绍rocketmq的文章

    特性

    客户端实践

  • 相关阅读:
    通过设置P3P头来实现跨域访问COOKIE
    随心所欲玩复制 详解robocopy
    MySQL的mysqldump工具的基本用法
    uvm_void 寂静的空宇
    Chisel语言
    IP-XACT IP IEEE交换格式
    SystemC简介
    ( 转)UVM验证方法学之一验证平台
    (转)让你彻底理解:静态时序分析
    (转)存储芯片入门漫谈
  • 原文地址:https://www.cnblogs.com/andyfengzp/p/6652423.html
Copyright © 2011-2022 走看看