zoukankan      html  css  js  c++  java
  • java 操作 RabbitMQ 发送、接受消息

    例子1

    Producer.java

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
        public final static String QUEUE_NAME="rabbitMQ_test2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置RabbitMQ相关信息
            factory.setHost("100.51.15.10");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            //创建一个新的连接
            Connection connection = factory.newConnection();
    
            //创建一个通道
            Channel channel = connection.createChannel();
    
            // 声明一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            //发送消息到队列中
            String message = "Hello RabbitMQ";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("Producer Send +'" + message + "'");
    
            //关闭通道和连接
            channel.close();
            connection.close();
        }
    }

    Consumer.java

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP;
    
    public class Customer {
        private final static String QUEUE_NAME = "rabbitMQ_test2";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //设置RabbitMQ地址
            factory.setHost("100.51.15.10");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            //创建一个新的连接
            Connection connection = factory.newConnection();
    
            //创建一个通道
            Channel channel = connection.createChannel();
    
            //声明要关注的队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("Customer Waiting Received messages");
    
            //DefaultConsumer类实现了Consumer接口,通过传入一个频道,
            // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery
            Consumer consumer = new DefaultConsumer(channel) {
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String message = new String(body, "UTF-8");
                    System.out.println("Customer Received '" + message + "'");
                }
            };
            //自动回复队列应答 -- RabbitMQ中的消息确认机制
            channel.basicConsume(QUEUE_NAME, true, consumer);
        }
    }

    执行

    Producer.java

    Producer Send +'Hello RabbitMQ'
    Producer Send +'Hello RabbitMQ'

    Consumer.java

    Customer Received 'Hello RabbitMQ'
    Customer Received 'Hello RabbitMQ'
    

    例子2

    首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者,连接队列的代码都是一样的,这样可以通用一些。

    EndPoint.java

    //package co.syntx.examples.rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * Represents a connection with a queue
     * @author syntx
     *
     */
    public abstract class EndPoint{
    
        protected Channel channel;
        protected Connection connection;
        protected String endPointName;
    
        public EndPoint(String endpointName) throws IOException{
            this.endPointName = endpointName;
    
            //Create a connection factory
            ConnectionFactory factory = new ConnectionFactory();
    
            //hostname of your rabbitmq server
            factory.setHost("100.51.15.10");
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setPort(5672);
    
            //getting a connection
            try{
                connection = factory.newConnection();
            }catch (TimeoutException ex) {
                System.out.println(ex);
                connection = null;
            }
    
            //creating a channel
            channel = connection.createChannel();
    
            //declaring a queue for this channel. If queue does not exist,
            //it will be created on the server.
            channel.queueDeclare(endpointName, false, false, false, null);
        }
    
    
        /**
         * 关闭channel和connection。并非必须,因为隐含是自动调用的。
         * @throws IOException
         */
        public void close() throws IOException{
            try{
                this.channel.close();
            } catch (TimeoutException ex){
                System.out.println("ex" + ex);
            }
            this.connection.close();
        }
    }

    Producer2.java

    import java.io.IOException;
    import java.io.Serializable;
    
    import org.apache.commons.lang.SerializationUtils;
    
    public class Producer2 extends EndPoint{
    
        public Producer2(String endPointName) throws IOException{
            super(endPointName);
        }
    
        public void sendMessage(Serializable object) throws IOException {
            channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
        }
    }

    QueueConsumer.java

    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.commons.lang.SerializationUtils;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.ShutdownSignalException;
    
    
    public class QueueConsumer extends EndPoint implements Runnable, Consumer{
    
        public QueueConsumer(String endPointName) throws IOException{
            super(endPointName);
        }
    
        public void run() {
            try {
                //start consuming messages. Auto acknowledge messages.
                channel.basicConsume(endPointName, true,this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * Called when consumer is registered.
         */
        public void handleConsumeOk(String consumerTag) {
            System.out.println("Consumer "+consumerTag +" registered");
        }
    
        /**
         * Called when new message is available.
         */
        public void handleDelivery(String consumerTag, Envelope env,
                                   BasicProperties props, byte[] body) throws IOException {
            Map map = (HashMap)SerializationUtils.deserialize(body);
            System.out.println("Message Number "+ map.get("message number") + " received.");
    
        }
    
        public void handleCancel(String consumerTag) {}
        public void handleCancelOk(String consumerTag) {}
        public void handleRecoverOk(String consumerTag) {}
        public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
    }

    Main.java

    import java.io.IOException;
    import java.sql.SQLException;
    import java.util.HashMap;
    
    public class Main {
        public Main() throws Exception{
    
            QueueConsumer consumer = new QueueConsumer("queue");
            Thread consumerThread = new Thread(consumer);
            consumerThread.start();
    
            Producer2 producer = new Producer2("queue");
    
            for (int i = 0; i < 5; i++) {
                HashMap message = new HashMap();
                message.put("message number", i);
                producer.sendMessage(message);
                System.out.println("Message Number "+ i +" sent.");
            }
        }
    
        public static void main(String[] args) throws Exception{
            new Main();
            System.out.println("##############end...");
        }
    }
  • 相关阅读:
    BZOJ5296 [CQOI2018] 破解D-H协议 【数学】【BSGS】
    Codeforces963C Frequency of String 【字符串】【AC自动机】
    Codeforces962F Simple Cycles Edges 【双连通分量】【dfs树】
    Hello World
    Codeforces963C Cutting Rectangle 【数学】
    BZOJ5203 [NEERC2017 Northern] Grand Test 【dfs树】【构造】
    20160422 --Switch…case 总结; 递归算法
    20160421字符串类型;日期时间类型数学类型
    20160420冒泡排序和查找
    20160419 while练习,复习
  • 原文地址:https://www.cnblogs.com/kaituorensheng/p/6591581.html
Copyright © 2011-2022 走看看