zoukankan      html  css  js  c++  java
  • RocketMQ生产者和消费者

    RocketMQ生产者和消费者

      注:生产者在生产数据时,指定数据的key,然后消费者进行数据消费时,获取到key,与redis中保存的key做判断

      如果不相同代表之前没有人进行消费,处理消费,保存到redis当中

      当有第二个消费者时,如果拿到的消息与redis中相同代表之前已已经有人消费

      就进行数据签收,防止后续消费者同样拿到重复消费数据

      注:消费者的消费逻辑失败时,可以通过设置返回状态达到消息重试的结果。

      消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。

      每次重试后,消息ID都不一致,所以不能使用消息ID判断幂等。

    生产者

     
    private static Map<String,Object> map=new HashMap<>();
        public static void main(String[] args) throws Exception {
            //创建消费者
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("mckz-group");
            //设置NameServer地址
            consumer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
            //设置实例名称
            consumer.setInstanceName("mckz");
            //订阅Topic
            consumer.subscribe("mckz_topic","TagA");
            //监听消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    //获取消息
                    for(MessageExt ext:msgs){
                        //判断redis中有没有当前消息key
                        if(map.containsKey(ext.getKeys())){
                            System.out.println("已经消费.......");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
                        //RocketMQ由于是集群环境,所以产生的消息ID可能会重复
                        System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
                        //将当前Key保存到redis当中
                        map.put(ext.getKeys(),ext);
                    }
              /*如果出现异常可进行try进行捕获*/
    /*try {
                        int a=5/0;
                    }catch (Exception e){
                        //接受消息状态 
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }*/
    
                    /*try {
                        Thread.sleep(60000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }*/
    
                    //接受消息状态 
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者
            consumer.start();
        }
     

    消费者

     
    public static void main(String[] args) throws Exception {
            //创建一个生产者
            DefaultMQProducer producer=new DefaultMQProducer("mckz_group");
            //设置NameServer地址
            producer.setNamesrvAddr("192.168.3.100:9876;192.168.3.200:9876");
            //设置生产者实例名称
            producer.setInstanceName("mckz");
            //启动生产者
            producer.start();
            //发送消息
            for (int i = 1; i <=10 ; i++) {
                Thread.sleep(1000); //模拟网络延迟
                //创建消息  topic代表主题名称     tags代表小分类     body代表消息体
                Message message=new Message("mckz_topic","TagA",("wdksoft-"+i).getBytes());
                //消息的唯一标识
                message.setKeys("订单编号"+i);
                //发送消息
                SendResult send = producer.send(message);
                System.out.println(send.toString());
            }
        }
  • 相关阅读:
    ....
    CodeForces 375A(同余)
    POJ 2377 Bad Cowtractors (最小生成树)
    POJ 1258 AgriNet (最小生成树)
    HDU 1016 Prime Ring Problem(全排列)
    HDU 4460 Friend Chains(bfs)
    POJ 2236 Wireless Network(并查集)
    POJ 2100 Graveyard Design(尺取)
    POJ 2110 Mountain Walking(二分/bfs)
    CodeForces 1059B Forgery(模拟)
  • 原文地址:https://www.cnblogs.com/danxun/p/12337159.html
Copyright © 2011-2022 走看看