zoukankan      html  css  js  c++  java
  • rocketmq 学习笔记

    参考官网 http://rocketmq.apache.org/docs/quick-start/        安装启动nameService 和broker,

    broker配置文件在distribution/target/apache-rocketmq 目录,进入文件默认有几个配制

    rocketMq  阿里文档文档地址:http://git.gupaoedu.com/jinjian/rocketmq.pdf/blob/master/rocketMq.pdf (自己花百度文库积分换的......心痛)

    快速入门:

    //依赖
    <dependencies> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.2.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> </dependencies>

    生产者:Producer 

    package org.personal.rocketmq;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by Administrator on 2018/7/8.
     */
    public class RocketProducerMqDemo {
    
        public static void main(String[] args) {
         final    DefaultMQProducer producer=new DefaultMQProducer("produceGroup");
            producer.setNamesrvAddr("192.168.149.133:9876");
            try {
                producer.start();
                for (int i = 0; i < 1; i++) {
                    try {
                        {
                            Message msg = new Message("TopicTest1",// topic
                                    "TagA",// tag
                                    "s1000000"+i,// key
                                    ("Hello rocketMq"+i).getBytes());// body
    
                            SendResult sendResult = producer.send(msg);
                            System.out.println(sendResult);
                        }
    
                        {
                            Message msg = new Message("TopicTest2",// topic
                                    "TagB",// tag
                                    "siID0034",// key
                                    ("Hello MetaQB"+i).getBytes());// body
                            SendResult sendResult = producer.send(msg);
                            System.out.println(sendResult);
                        }
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
    
    
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
            producer.shutdown();
    //        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    //            public void run() {
    //                producer.shutdown();
    //            }
    //        }));
    //        System.exit(0);//关掉虚拟机,正常退出
    
        }
    
    }

    消费者:Consumer

    package org.personal.rocketmq;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * Created by Administrator on 2018/7/11.
     */
    public class RocketConsumerMqDemo {
    
        public static void main(String[] args) throws MQClientException {
    
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("ConsumerGroup3");
            consumer.setNamesrvAddr("192.168.149.133:9876");
            consumer.setInstanceName("Consumber");
            /**
             * 订阅指定topic下tags分别等于TagA或TagC或TagD
             */
            consumer.subscribe("TopicTest1", "TagA || TagC || TagD");
            /**
             * 订阅指定topic下所有消息<br>
             * 注意:一个consumer对象可以订阅多个topic
             */
            consumer.subscribe("TopicTest2", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    
                    System.out.println(Thread.currentThread().getName()
                            + " Receive New Messages: " + msgs.size());
    
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals("TopicTest1")) {
                        //执行TopicTest1的消费逻辑
                        if(msg.getTags() != null){
                            if("TagA".equals(msg.getTags())){//执行TagA的消费
                                System.out.println(new String(msg.getBody())+msg.toString());
                            }else if ("TagC".equals(msg.getTags())){//执行TagC的消费
                                System.out.println(new String(msg.getBody())+msg.toString());
                            }else if ("TagD".equals(msg.getTags())){//执行TagD的消费
                                System.out.println(new String(msg.getBody())+msg.toString());
                            }
                        }
                    } else if (msg.getTopic().equals("TopicTest2")) {
                        System.out.println(new String(msg.getBody()));
                    }
    
                    //返回状态
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
                }
            });
    
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             */
            consumer.start();
            
            System.out.println("ConsumerStarted.");
        }
    }

    broker入门(单机配置和基本概念): https://www.jianshu.com/p/cc108aeb08ac

    rocketMq源码分析:http://www.yunai.me/RocketMQ/why-read-RocketMQ-source-code/

    broker整体储存结构:https://www.cnblogs.com/tommyli/p/5081846.html   https://www.jianshu.com/p/bc85c0695da0

    broker  Consumequeue储存说明:https://blog.csdn.net/meilong_whpu/article/details/76921208

    broker indexServices索引: https://blog.csdn.net/rodbate/article/details/78763379   https://blog.csdn.net/quhongwei_zhanqiu/article/details/39153195

    broke集群部署: http://blog.51cto.com/leexide/2106360

    rocketmq和kafka对比: https://blog.csdn.net/chunlongyu/article/details/54576649

  • 相关阅读:
    查找链表中是否有环linked-list-cycle
    reverse-integer
    AVL树之 Java的实现
    single-number
    Best Time to Buy and Sell Stock II
    maximun-depth-of-binary-tree
    minimun-depth-of-binary-tree
    剑指offer--矩阵中的路径
    grep的几个参数
    fsck和badlocks
  • 原文地址:https://www.cnblogs.com/jinjian91/p/9280571.html
Copyright © 2011-2022 走看看