zoukankan      html  css  js  c++  java
  • RabbitMQ-简单队列

    模型

    graph LR
    生产者 -->id[queue]
    id[queue]--> 消费者
    

    获取连接

    public static Connection getConnection() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/vhost_ct");
            connectionFactory.setUsername("guest");
            connectionFactory.setPassword("guest");
            return connectionFactory.newConnection();
    }
    

    生产者Producer

    
    public class Send {
     
        private static final String QUEUE_NAME = "test_simple_queue";
     
        public static void main(String[] a) throws IOException, TimeoutException {
            //获取一个链接
            Connection connection = ConnectionUtil.getConnection();
            //从链接中获取一个通道
            Channel channel = connection.createChannel();
     
            //创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            String msg = "hello_simple";
            //发布
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.println("send:" + msg);
            channel.close();
            connection.close();
        }
    }
    

    消费者 Consumer

    public class Recv {
     
        private static final String QUEUE_NAME = "test_simple_queue";
     
        public static void main(String[] a) throws IOException, TimeoutException {
            //获取一个链接
            Connection connection = ConnectionUtil.getConnection();
            //从链接中获取一个通道
            Channel channel = connection.createChannel();
     
            //定义队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
     
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                //获取到达的消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    System.out.println(msg);
                }
            };
            //监听队列
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
        }
    }
    

    不足之处

    耦合性较高,生产者和消费者一一对应,修改队列名,生产者和消费者需同时修改

  • 相关阅读:
    IB(InterBase Server) 的完整连接格式
    jna
    编写基于Prototype的Javascript动画类
    Go——使用 go mod ——有依赖的Go工程
    pkgconfig—— PKG_CONFIG_PATH——Makefile——pkgconfig的作用与使用
    Go——Goland Debug报错Version of Delve is too old for this version of Go
    NATS——NATS Streaming 是什么 (转)
    Go——Go语言调用C语言
    go get安装包超时处理
    NATS—基础介绍 (转自 https://www.cnblogs.com/yorkyang/p/8392172.html)
  • 原文地址:https://www.cnblogs.com/jsersudo/p/13793065.html
Copyright © 2011-2022 走看看