zoukankan      html  css  js  c++  java
  • RocketMQ入门(生产者)_2

    从 RocketMQ环境搭建_1 我们已经建立了MQ的Server,接下来就是简单的生产和消费的过程。

    1. rocketMQ的源码中有个示例代码example  ,我们从Apache官网中可以下载源码source找到example,进行学习。

    下载地址:http://rocketmq.apache.org/docs/quick-start/

    建立简单的工程,mvn最主要依赖client

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.1</version>
    </dependency>

     在做transactionProducer时,发现无法消费,问题是需要依赖parent,因此借鉴demo中的mvn依赖:

        <parent>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>4.3.0</version>
        </parent>
    
        <modelVersion>4.0.0</modelVersion>
        <packaging>jar</packaging>
        <artifactId>rocketmq-example</artifactId>
        <name>rocketmq-example ${project.version}</name>
    
        <dependencies>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>rocketmq-client</artifactId>
            </dependency>
            <dependency>
                <groupId>${project.groupId}</groupId>
                <artifactId>rocketmq-srvutil</artifactId>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
            </dependency>
             <dependency>
                <groupId>org.javassist</groupId>
                <artifactId>javassist</artifactId>
            </dependency>
            <dependency>
                <groupId>io.openmessaging</groupId>
                <artifactId>openmessaging-api</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-openmessaging</artifactId>
                <version>4.3.0</version> 
            </dependency>
        </dependencies>

    创建Producer类:

    /**
     * 简单生产者
     * 
     * @author DennyZhao
     * @since 2018/10/29
     * @version 1.0
     */
    public class Producer {
    
        /**
         * main方法
         * 
         * @param args
         * @throws InterruptedException
         * @throws MQBrokerException
         * @throws RemotingException
         * @throws MQClientException
         * @throws UnsupportedEncodingException
         */
        public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException,
                InterruptedException, UnsupportedEncodingException {
            //创建生产者实例,并确定生产组
            DefaultMQProducer producer = new DefaultMQProducer("fruitProducerGroup");
            // 指定服务NameServer服务
            producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
            // 生产者启动
            producer.start();
            String[] fruitArray = { "apple", "strawbarry", "pear", "banana", "orange" };
            for (String fruit : fruitArray) {
                // 创建消息
                Message message = new Message("fruit", "common", fruit.getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息
                SendResult result = producer.send(message);
                SendStatus sendStatus = result.getSendStatus();
                // 获取回执
                System.out.println(result);
                if (sendStatus == SendStatus.SEND_OK) {
                    System.out.println("信息发送成功!");
                } else {
                    System.out.println("信息发送失败!");
                }
            }
            // 关闭生产者
            producer.shutdown();
        }
    }

     创建Consumer类:

    /**
     * 消費者群體
     * @author DennyZhao
     * @since 2018/10/29
     * @version 1.0
     */
    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            //创建消费者实例和组
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("fruitConsumerGroup");
            // 指定nameServer服务地址
            consumer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
            // 订阅消费Topic
            consumer.subscribe("fruit", "*");
            // 订阅从何地方开始读(先进先出,还是先进后出)
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // 添加监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext arg1) {
                    if(msgList != null && msgList.size() > 0) {
                        MessageExt msg = msgList.get(0);
                        System.out.println(msg);
                        try {
                            System.out.println(new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
                        } catch (UnsupportedEncodingException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                
            });
            // 启动消费者
            consumer.start();
        }
    }

    ※注意:启动最好先启动消费者然后再启动服务者。

    从RocketMQ-console中可以看到创建的消费者群和生产者群。至此,简单的生产消费就算大功告成。

    配置说明:

    1.producer-生产者类型

    • 1. NormalProducer         (普通生产者)
    • 2. OrderProducer           (严格顺序生产者,例如:订单创建,付款,发货等)
    • 3. TransactionProducer  (事务生产者)

    msgId和msgKey非常关键:msgId是mq自动生成的,可在控制台message中查找数据。

                                                msgKey大多是业务主键key,用于跟踪数据,比如订单号等。

    msg中有个很重要的属性:在producer端放置:msg.putUserProperty([key],  [value]);  //是个map,可放内容,在consumer端获取

    主要可选参数:

            // 设置超过多大进行compress压缩
            producer.setCompressMsgBodyOverHowmuch(1024 * 10);
            // 设置发送失败的尝试次数。
            producer.setRetryTimesWhenSendFailed(3);
            // 设置如果返回值不是send_ok,是否要重新发送
            producer.setRetryAnotherBrokerWhenNotStoreOK(false);
            // 设置限制最大的文件大小
            producer.setMaxMessageSize(1024*50);
            // 设置默认主题对应的队列数
            producer.setDefaultTopicQueueNums(4);
            //创建新的topic
            producer.createTopic("1121", "vegetables", 4);
            // 设置发送超时时间 ms
            producer.setSendMsgTimeout(1000);

    OrderProducer 采用将有序内容放在单个queue,保证消费的顺序进行。可参见示例中的order代码。

             Producer类不同点展示,发送消息:

                // 发送消息
                SendResult result = producer.send(message, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> msgList, Message message, Object queueId) {
                        return msgList.get(Integer.valueOf(queueId.toString()));
                    }}, 0); //这个0表示将这些msg放入到队列0中

             Consumer类不同点展示,接受消息:

    // 添加监听
            consumer.registerMessageListener(new MessageListenerOrderly() {
    
                //获取数据,防止一次获取太多无法消化,可一次取单个条数。
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgList, ConsumeOrderlyContext arg1) {
                    if(msgList != null && msgList.size() > 0) {
                        MessageExt msg = msgList.get(0);
                        System.out.println(msg);
                            System.out.println(msg.getBody());
                    }
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                
            });

     TransactionProducer :

           用于解决事务同步,尤其是金融方面,在单体应用中我们可以通过数据库事务来控制,但大的电商系统表和表可能分属于不同的数据库,数据库事务则失效。

           比如微信转银行操作:我们从银行扣款100元到微信,如果微信增加100元,而此时出现问题,银行扣款没有成功这样理论上是要回滚微信的增加100元。

           一般分布式采用2pc(2 phase commit)模式(两阶段提交协议:预留和确认),安全性高,但是因为长连接导致长时间等待。

          而RocketMQ采用两阶段补偿型,TCC(Try-Confirm-Cancel)的简称。

         应用场景:买2张票(春运回家,从天津->上海->武汉),先买预留,然后在规定时间内付款则commit,否则过期后rollback.(无论哪个)

        异常回溯:在3.2.6+版本的非商业版已经取消,

    需要手动回查参见别人的文章RocketMQ事务消息回查设计方案

     常对应参数:

    /**
     * 做数据反查轮询用 UN KNOW,时会用到反查目前已经 deprecated
     */
    producer.setCheckThreadPoolMaxSize(5);
    producer.setCheckThreadPoolMinSize(2);
    producer.setCheckRequestHoldMax(200);//回查最大数

     生产者修改:添加

    sendMessage变化:TransactionSendResult 
    producer变化: TransactionMQProducer
    添加监听: TransactionListener
    因非商业3.2.6取消回查:因此 producer.setExecutorService(executorService);没有作用,本来是用于开启多线程进行回查用
    /**
     * 事务生产者
     * 
     * @author DennyZhao
     * @since 2018/10/31
     * @version 1.0
     */
    public class Producer {
    
        /**
         * main方法
         * 
         * @param args
         * @throws InterruptedException
         * @throws MQBrokerException
         * @throws RemotingException
         * @throws MQClientException
         * @throws UnsupportedEncodingException
         */
        public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException,
                InterruptedException, UnsupportedEncodingException {
            //创建生产者实例,并确定生产组
            TransactionMQProducer producer = new TransactionMQProducer("transProducerGroup");
            TransactionListener transListener = new FruitTransactionListener();
            // 指定服务NameServer服务
            producer.setNamesrvAddr("192.168.68.137:9876;192.168.68.138:9876;");
    //        // 设置超过多大进行compress压缩
    //        producer.setCompressMsgBodyOverHowmuch(1024 * 10);
    //        // 设置发送失败的尝试次数。
    //        producer.setRetryTimesWhenSendFailed(3);
    //        // 设置如果返回值不是send_ok,是否要重新发送
    //        producer.setRetryAnotherBrokerWhenNotStoreOK(true);
    //        // 设置限制最大的文件大小
    //        producer.setMaxMessageSize(1024*50);
    //        // 设置默认主题对应的队列数
    //        producer.setDefaultTopicQueueNums(4);
    //        // 设置发送超时时间 ms
    //        producer.setSendMsgTimeout(1000);
            /**
             * 做数据反查轮询用
             */
    //        producer.setCheckThreadPoolMaxSize(5);
    //        producer.setCheckThreadPoolMinSize(2);
    //        producer.setCheckRequestHoldMax(200);//回查最大数
            
            
            
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
    
                public Thread newThread(Runnable r) {
                    Thread th = new Thread(r);
                    th.setName("client-transaction-msg-check-thread");
                    System.out.println("id:" + th.getId());
                    System.out.println("name:" + th.getName());
                    return th;
                }
           });
            producer.setExecutorService(executorService);
            /**
             * 添加监听
             */
            producer.setTransactionListener(transListener);
            
            // 生产者启动
            producer.start();
            String[] fruitArray = { "apple-苹果", "strawbarry-草莓", "pear-梨子", "banana-香蕉", "orange-橘子"};
            for (String fruit : fruitArray) {
                // 创建消息
                Message message = new Message("transactionFruit", "common", "key"+fruit, fruit.getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息
                TransactionSendResult result = producer.sendMessageInTransaction(message, "abcd");
                 Thread.sleep(10);
                SendStatus sendStatus = result.getSendStatus();
                // 获取回执
                System.out.println(result);
                if (sendStatus == SendStatus.SEND_OK) {
                    System.out.println("信息发送成功!");
                } else {
                    System.out.println("信息发送失败!");
                }
            }
            int j = 0;
            while(j <500) {
                Thread.sleep(1000);
                j++;
            }
            // 关闭生产者
            producer.shutdown();
        }

     监听:

    因回查被取消因此:checkLocalTransaction(MessageExt msg)没有作用了,所以如果
    LocalTransactionState.UNKNOW 将无法处理,会使得topic一直处于不显示状态。
    /**
     * 事务执行监听
     * @author DennyZhao
     *
     */
    public class FruitTransactionListener implements TransactionListener {
    
        /**
         * 执行事务,事务成功commit,不成功rollback,未知unknown
         * msg Message
         * arg 附加参数,用于处理传递内容加以判断,使用
         * 
         */
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println((String) arg);
            // 执行事务处
            // 假设以水果入库为例:苹果,香蕉 commit,梨子 rollback, 橘子和草莓不知道怎么处理
            System.out.println(msg + "---executeLocal");
            System.out.println(msg.getTransactionId());
            String fruit = "";
            try {
                fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            
            if(StringUtils.contains(fruit, "苹果")
                    || StringUtils.contains(fruit, "香蕉")) {//提交
                return LocalTransactionState.COMMIT_MESSAGE;
            }else if(StringUtils.contains(fruit, "梨")) {//回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }else { //通过轮询去处理
                return LocalTransactionState.UNKNOW;
            }
        }
    
        /**
         * 轮询反查,对于unknow的内容,进行反查获取结果
         */
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.out.println(msg + "---checkAgain");
            String fruit = "";
            try {
                fruit = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            //从库中反查知道库中 存在葡萄,不存在橘子 
            if(StringUtils.contains(fruit, "葡萄")) {//提交
                return LocalTransactionState.COMMIT_MESSAGE;
            }else if(StringUtils.contains(fruit, "橘子")) {//回滚
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            return LocalTransactionState.UNKNOW;
        }

    错误说明:

    1. No route info of this topic

    因在启动broker时,参数中未设置可自动创建topic,因此生产者创建topic被认为不合法,需要在console中先创建topic,或者服务端先创建topic。

    2. transactionProducer 生产后无法消费

    mvn依赖中缺少 <parent>rocketmq-all</parent>导致。

    3. 事务回查无效

     版本 3.2.6 后rocketmq取消了事务回查机制,如果丢失需要自己手动通过key值回查.

     

  • 相关阅读:
    SpringBoot详解(二)——
    SpringBoot详解(一)——
    数据库三大范式
    Mysql备份
    mysql索引
    mysql事务
    几种数据库查找的案例
    点击加载更多
    layer、弹出框
    验证码倒计时
  • 原文地址:https://www.cnblogs.com/DennyZhao/p/9868834.html
Copyright © 2011-2022 走看看