zoukankan      html  css  js  c++  java
  • rocketmq-client使用

    一、添加依赖

    <!-- RocketMQ  -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.6.0</version>
    </dependency>

     2个核心接口,3个默认实现。

    interface  MQProducer
                 --- DefaultMQProducer
    interface  MQConsumer
                 --- DefaultMQPushConsumer
                 --- DefaultMQPullConsumer

    DefaultMQProducer是MQProducer的唯一默认实现,其实现 MQProducer 接口的时候 还继承了 ClientConfig类 (客户端配置类),可以配置如 sendMsgTimeout超时时间,producerGroup 生产者组 最大消息容量和是否启用压缩等。

    关键方法是 send(Message) 发送一个消息到MQ。

    DefaultMQPushConsumer 包含很多可以配置的信息,最主要的有:
      messageModel 消息模型 支持以下两种 1、集群消费 2、广播消费
      messageListener 消息监听器
      consumeThreadMin 消费线程池数量 默认10
      consumeThreadMax 消费线程池数量 默认20
    Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法, MessageListenerOrderly有序的,MessageListenerConcurrently 无序的。
    关键方法有registerMessageListener 注册监听器等。

    二、生产者

    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    /**
     * 生产者
     */
    public class RocketProducer {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
            //设置生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A");
            //指定nameServer的地址, 多个地址用分号分隔
            producer.setNamesrvAddr("localhost:9876");
            //启动实例
            producer.start();
            Message message = new Message("topic-name-A","tag-name-A","Message : My blog address".getBytes());
            producer.send(message);
            System.out.println("Message sent");
            //关闭生产者,释放资源
            producer.shutdown();
        }
    }

    三、消费者

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.List;
    
    /**
     * 消费者
     */
    public class RocketConsumer {
        public static void main(String[] args) throws MQClientException{
            //设置消费者组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group-name-A");
            //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //NameServer地址, 多个地址用分号(;)分隔
            consumer.setNamesrvAddr("localhost:9876");
            //参数1:topic名字 参数2:tag名字
            consumer.subscribe("topic-name-A", "tag-name-A");
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动,会一直监听消息
            consumer.start();
            System.out.println("Consumer Started!");
            //consumer.shutdown();
        }
    }
  • 相关阅读:
    jzoj5377 开拓
    JZOJ5371 组合数问题
    JZOJ 10043 第k小数
    联赛emacs配置
    11.7 NOIP总复习总结
    cogs791 [HAOI2012] 音量调节
    bzoj1968 [Ahoi2005]COMMON 约数研究
    cogs 1330 [HNOI2008]玩具装箱toy
    cogs2479 偏序 cdq+树套树
    【CJOJ2433】陌上花开 CDQ分治
  • 原文地址:https://www.cnblogs.com/myitnews/p/12906600.html
Copyright © 2011-2022 走看看