zoukankan      html  css  js  c++  java
  • RocketMq 广播模式下 确保顺序消费

    生产者(指定队列推送,或者默认创建主题时就创建一个队列):
    在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4改成1即可,已有的主题只能指定了。

    package com.apacherocketmq.test;
    import java.io.UnsupportedEncodingException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.List;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.MessageQueueSelector;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import com.apacherocketmq.connection.RocketProducer;
    public class BroadcastProducer
    {
        private static int i = 0;
        private static int index = 0;
        private static long count =1L;
        @SuppressWarnings("unused")
        public static void main(String[] args)
                throws UnsupportedEncodingException, MQClientException, RemotingException, MQBrokerException, InterruptedException
        {
            DefaultMQProducer producer = RocketProducer.newInstance();
            producer.setProducerGroup("GroupE");
            producer.start();
            while (true)
            {
                if (i == 0)
                {
                    i = 1;
                    index++;
                } else if (i == 1)
                {
                    i = 0;
                }
                Message message = new Message("GroupETopicA", "TagE", "e",
                        (" " + time() + " " + i + " " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(message, new MessageQueueSelector()
                {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg)
                    {
                        //0号队列
                        return mqs.get(0);
                    }
                }, "ss");
    //            Thread.sleep(2000);
                System.out.println(count++);
            }
            // producer.shutdown();
        }
        private static String time()
        {
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            return df.format(new Date());
        }
    }

    消费者(从指定队列取,其他队列的抛弃,若主题只有一个队列则无所谓):

    package com.apacherocketmq.test;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.List;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import com.apacherocketmq.connection.RocketMQPushConsumer;
    import com.jdbc.MyJDBCUtil;
    public class BroadcastConsumerTEST
    {
        public static void main(String[] args) throws MQClientException
        {
            DefaultMQPushConsumer consumer = RocketMQPushConsumer.newInstance();
            // 消息模型,支持以下两种:集群消费(clustering),广播消费(broadcasting)
            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.setConsumerGroup("GroupE");
            consumer.subscribe("GroupETopicA", "*");
            // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
            // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 从指定时间消费
            // consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()
            // - (1000 * 60 * 30)));
            consumer.setConsumeThreadMax(1);
            consumer.setConsumeThreadMin(1);
            consumer.registerMessageListener(new MessageListenerConcurrently()
            {
                private boolean isFirst = true;
                private long i = 0L;
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
                {
                    if (isFirst)
                    {
                        i = query("JYLQ");
                        isFirst = false;
                    }
                    long offset = msgs.get(0).getQueueOffset();
                    int s = msgs.get(0).getQueueId();
                    if (s != 0)
                    {
                        //抛弃其他队列数据
                        System.err.println("QueueId NOT 0");
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    if (offset > i)
                    {
                        update(offset);
                        String ss = offset + new String(msgs.get(0).getBody()) + " N 
    ";
                        System.out.println(ss);
                    } else
                    {
                        String t = offset + new String(msgs.get(0).getBody()) + " O 
    ";
                        System.err.println(t);
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                private long query(String param)
                {
                    long i = 0;
                    String SQL = "SELECT CONFIG_VALUE FROM common_sys_config WHERE CONFIG_ITEM=?";
                    ResultSet result = MyJDBCUtil.query(SQL, param);
                    try
                    {
                        result.next();
                        i = result.getLong("CONFIG_VALUE");
                    } catch (SQLException e)
                    {
                        e.printStackTrace();
                    }
                    return i;
                }
                private void update(long param)
                {
                    String SQL1 = "UPDATE common_sys_config SET CONFIG_VALUE=? WHERE CONFIG_ITEM='JYLQ'";
                    MyJDBCUtil.execute(SQL1, String.valueOf(param));
                }
            });
            consumer.start();
            System.out.println("Broadcast ConsumerA Started.");
        }
        public static void waitFor(long i)
        {
            try
            {
                Thread.sleep(1000);
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        }
    }
  • 相关阅读:
    Java-数据字符串进行四舍五入
    Git本地安装及汉化
    Navicat安装教程
    Jdk+maven安装
    系列文章
    @PathVariable
    feign组件
    Lombok
    常见Jar包的用途
    iOS archive(归档)的总结
  • 原文地址:https://www.cnblogs.com/virtulreal/p/9749853.html
Copyright © 2011-2022 走看看