zoukankan      html  css  js  c++  java
  • Rabbitmq(3) work queues

    轮询分发:每个消费者处理的消息是一样的

    公平分发:能者多劳

    *****公平分发

    生产者

    package com.aynu.bootamqp.service;
    
    import com.aynu.bootamqp.commons.utils.Amqp;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Send {
    
        private final static String QUEUE_NAME ="hello";
        public static void main(String[] args) throws IOException, TimeoutException {
                Connection connection = Amqp.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
                //在手动确认机制之前
                //一次只发送一条消息,给不同的消费者
                channel.basicQos(1);
    
                for (int i=0;i<50;i++){
                    String message = "hello mm"+i;
                    System.out.println(message);
                    channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
                }
    
                channel.close();
                connection.close();
        }
    }

    消费者1

    package com.aynu.bootamqp.service;
    
    import com.aynu.bootamqp.commons.utils.Amqp;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    @SuppressWarnings("all")
    public class Receive {
    
        private final static String QUEUE_NAME ="hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = Amqp.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
           // 一次只处理一个消息
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body,"utf-8");
                    System.out.println("receive"+msg);
                    try {
                        Thread.sleep(1000*2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        // 手动发送消息确认机制
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            // 自动应答
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
        }
    }

    消费者2

    package com.aynu.bootamqp.service;
    
    import com.aynu.bootamqp.commons.utils.Amqp;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    @SuppressWarnings("all")
    public class Receive2 {
    
        private final static String QUEUE_NAME ="hello";
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection connection = Amqp.getConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    
            channel.basicQos(1);
            DefaultConsumer consumer = new DefaultConsumer(channel) {
    
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    String msg = new String(body,"utf-8");
                    System.out.println("receive2222"+msg);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        // 手动发送消息确认机制
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                }
            };
            boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,consumer);
        }
    }
  • 相关阅读:
    记支付宝接口对接,涉及到提取证书SN号的解决方案
    Second Level Cache for Entity Framework 6.1
    记一个dynamic的坑
    使用EntityFramwork[6.1]进行级联保存的时候出现异常
    转:Transform Web.Config when Deploying a Web Application Project
    转:程序员如何增加收入
    超实用的JavaScript技巧及最佳实践(下)
    超实用的JavaScript技巧及最佳实践(上)
    Oracle PL/SQL入门语法点
    轻量级IOC框架:Ninject (下)
  • 原文地址:https://www.cnblogs.com/mm163/p/10702553.html
Copyright © 2011-2022 走看看