zoukankan      html  css  js  c++  java
  • rabbitMQ的简单实例——amqp协议带数据回写机制

    rabbitMQ是一种高性能的消息队列,支持或者说它实现了AMQP协议(advanced message queue protocol高级消息队列协议)。

    下面简单讲一讲一个小例子。我们首先要部署好rabbitMQ,然后实现一个生产者—消费者,生产者向rabbit中发布一个消息,消费者去rabbit取这个消息,在正确收到这个消息后,消费者会通过返回队列回写通知生产者自己收到了消息。

    windows下部署rabbit非常简单,先安装erlang运行时,然后安装rabbitMQ安装文件即可,都是exe的,很简单。然后找到rabbit的sbin目录里的bat即可启动rabbitMQ。

    下面是producer—consumer代码:

    package com.hzfi.rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    import com.rabbitmq.client.ShutdownSignalException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    
    public class Producer {
        private final static String QUEUE_NAME = "myQueue"; //上送队列
        
        public static void main(String[] args) throws IOException, TimeoutException{
            String replyQueueName = null;   //返回队列名
            
            ConnectionFactory connFactory = null;
            Connection conn = null;
            Channel channel = null;
            try{
            connFactory = new ConnectionFactory();
            connFactory.setHost("localhost");
            conn = connFactory.newConnection();
            channel = conn.createChannel();
            //返回队列
            replyQueueName = channel.queueDeclare().getQueue();
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(replyQueueName, true, consumer);
    
            String corrId = java.util.UUID.randomUUID().toString(); //用来表示返回队列结果的id,唯一
            BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
            String msg = "linyang@hzfi.cn";
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            channel.basicPublish("", QUEUE_NAME, props, msg.getBytes());
            System.out.println("producer has published: "" + msg + """);
            
            while(true){
                Thread.sleep(1000);
                Delivery delivery = consumer.nextDelivery();
                System.out.println("from server reply:" + new String(delivery.getBody()));
            }
            }catch(IOException ioe){
                ioe.printStackTrace();
            }catch(TimeoutException toe){
                toe.printStackTrace();
            } catch (ShutdownSignalException e) {
                e.printStackTrace();
            } catch (ConsumerCancelledException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                if(channel!=null)   channel.close();
                if(conn!=null)  conn.close();
            }
        }
    }
    package com.hzfi.rabbitmq;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.QueueingConsumer.Delivery;
    
    public class Consumer {
        private final static String QUEUE_NAME = "myQueue";
        public static void main(String[] args) throws IOException, TimeoutException{
            ConnectionFactory connFactory = null;
            Connection conn = null;
            Channel channel = null;
            try{
            connFactory = new ConnectionFactory();
            connFactory.setHost("localhost");
            conn = connFactory.newConnection();
            channel = conn.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println("listening for event message...");
            
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while(true){
                Thread.sleep(1000);
                Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties reply_props = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
                String msg = new String(delivery.getBody(),"utf-8");
                System.out.println("receive msg:" + msg);
                String retMsg = "ok, give you reply:" + new String(msg.getBytes(),"utf-8");
                System.out.println("Consumer中的返回队列名" + props.getReplyTo());
                channel.basicPublish( "", props.getReplyTo(), reply_props, retMsg.getBytes());
            }
            }catch(IOException ioe){
                ioe.printStackTrace();
            }catch(TimeoutException toe){
                toe.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                if(channel!=null)   channel.close();
                if(conn!=null)  conn.close();
            }
        }
    }

     开启RabbitMQ的后台管理服务(是个web页面)

         sbin>rabbitmq-plugins enable rabbitmq_management

         访问地址 http://localhost:15672/     id/psw: guest/guest

         可以对队列,用户,权限等进行管理,例如,默认情况下密码是任意,如上代码所示,ConnectionFactory仅仅设置了主机名,并未设置用户名和密码。

         我们可以新建或修改一个用户名和密码,如下图:

    这样,我们上面的代码也要做相应的调整:

    ConnectionFactory connFactory = new ConnectionFactory();
    connFactory.setHost("localhost");
    connFactory.setUsername("guest");
    connFactory.setPassword("123");    
  • 相关阅读:
    统计学习方法学习笔记(一)--极大似然估计与贝叶斯估计原理及区别
    数据过拟合解决方法
    LSTM基础
    异方差产生与解决
    人工免疫相关算法
    Svm相关
    sscanf,sscanf_s及其相关用法
    C语言数组初始化
    生产者和消费者
    Linux线程-创建
  • 原文地址:https://www.cnblogs.com/lyhero11/p/5128856.html
Copyright © 2011-2022 走看看