zoukankan      html  css  js  c++  java
  • 消息中间件--"rocketmq"02之QuickStart

    依赖

    <dependency>
        <groupId>com.alibaba.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>3.2.6</version>
    </dependency>
    
    
    

    QuickStart

    producer

    
    //伪代码 producer
    
    DefaultMQProducer producer = new DefaultMQProducer ("ProducerGroup");
    producer.setNamesrvAddr("192.168.1.121:9876;192.168.1.122:9876");
    producer.start();
    
    Message msg = new Message("TopicTest","TagTest","hello world".getBytes("utf-8"));
    
    SendResult sendResult = producer.send(msg);
    
    producer.shutdown();
    
    
    

    consumer

    
    //伪代码 consumer
    
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ProducerGroup");
    consumer.setNamesrvAddr("192.168.1.121:9876;192.168.1.122:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUMER_FROM_FIRST_OFFSET);
    comsumer.subscribe("TopicTest","*");
    consumer.registerMessageListener(new MessageListenerConcurrently(){
        
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context){
            
            for(MessageExt ext:msgs){
                System.out.println(new Date()+ new String(ext.getBody(),"utf-8"));
            }
            
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            
        }
        
        
    });
    
    consumer.start();
    
    );
    
    

    QuickStart 结论

    • 无论生产者、消费者都必须给出GroupName,而且具有唯一性!这个组名称,是维护在应用系统级别上的,比如在生产端指定一个名称:ProducerGroupName,这个名称是需要应用系统来保证唯一性的,一类Producer的集合的名称,
      这类producer通常发送一类消息,且发送逻辑一致。同理消费者也是这样。

    • rocketmq默认是将消息持久化了,当一个消费组消费完消息以后,broker 并没有把消息删掉,而是持久化了。现在我们测试一下,现在换一个消费组,改变ComsumerGroupName再次消费的话,可以将broker上面的持久化的所有消息都消费掉。

    • 生产到哪个Topic的哪个Tag下,消费者也是从Topic的哪个Tag进行消费,可见这个Tag有点类似于JMS Selector机制,即实现消息的过滤

    • 生产者、消费者需要设置NameServer地址。

    • 这里,采用的是Consumer Push的方式,即设置Listener机制回调,相当于开启了一个线程。以后为大家介绍Consumer Pull的方式。


    这里消费消息是没有什么顺序的,以后我们在来谈消息的顺序性。

  • 相关阅读:
    决定搬家
    Deklarit3.0的确不错,推荐一下。
    [Linux] 安装samba
    如何远程连接非默认端口SQL Server
    [c#] for和foreach
    svn linux客户端安装
    [c#] HttpContext.Cache和AppFabric的性能对比
    [ms sql server]计算今天是第几周
    ajax readyState的五种状态详解
    清空sql server日志
  • 原文地址:https://www.cnblogs.com/leihuazhe/p/7689572.html
Copyright © 2011-2022 走看看