zoukankan      html  css  js  c++  java
  • RabbitMQ Hello world(二)

    简介:

    Rabbitmq 是消息代理中间件,它接收或者发送消息。你可以把它想想宬一个邮局:当你把邮件放到邮箱时,你可以确定某一位邮递员可以准确的把邮件送到收件人手中,在这个比喻中,rabbitmq是一个邮箱,邮局及邮递员。
    Rabbitmq和邮局的区别是,它不处理纸张,它接收,存储,转发二进制文件消息。
     

    Rabbitmq 消息服务中几个术语。 

    • 生产者:发送消息。消息的产生者。
    • 队列:在Rabbitmq内部,如同邮箱。尽管消息流转通过RabbitMq和你的应用,但是只被存储在对列里。对列只能绑定到主机内存并且磁盘限制,本质上是一个大的消息缓存。生产者发送消息到对列中,消费者从对列中接收数据。
    • 消费者:接收消息,大部分接收程序等待接收消息。
     

    Hello world

    Maven
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.3</version>
      </dependency> 
    RabitMqConnectionConfig.java
    package com.rabitmq.config;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class RabitMqConnectionConfig {
        
        //MQ 连接地址
        private static final String RABIT_MQ_HOST = "localhost";
        
        private static final int RABIT_MQ_PORT = 5672;
        
        private static final String RABIT_MQ_USER_NAME = "guest";
        
        private static final String RABIT_MQ_USER_PASSWD = "guest";
        
    //    private static final String RABIT_TEST_EXCHANGE = "test_exchange";
        
        public static final String RABIT_TEST_QUEUE = "test_rabit_mq";
        
        /**
         * rabbitmq 连接器
         * @return
         * @throws IOException
         * @throws TimeoutException
         */
        public static Connection getConnection() throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(RABIT_MQ_HOST);
            connectionFactory.setPort(RABIT_MQ_PORT);
            connectionFactory.setUsername(RABIT_MQ_USER_NAME);
            connectionFactory.setPassword(RABIT_MQ_USER_PASSWD);
            return connectionFactory.newConnection();
        }
        
        /**
         * 
         * @param connection
         * @throws IOException
         */
        public static void closeConnection(Connection connection) throws IOException {
            if(connection != null) {
                connection.close();
            }
        }
    }
    RabitMqProducer.java
    package com.rabitmq.producer;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabitmq.config.RabitMqConnectionConfig;
    
    public class RabitMqProducer {
        
        //发布者名称
        private String producerName;
        
        public RabitMqProducer(String producerName) {
            super();
            this.producerName = producerName;
        }
    
    
        public void send(String message) {
    
            Connection connection = null;
            Channel channel = null;
            try {
                connection = RabitMqConnectionConfig.getConnection();
                channel = connection.createChannel();            
                Map<String, Object> map = new HashMap<String, Object>();
                map.put("x-queue-type", "classic");
                channel.queueDeclare(RabitMqConnectionConfig.RABIT_TEST_QUEUE, true, false, false, map);
    //            channel.queueBind(RABIT_TEST_QUEUE, RABIT_TEST_EXCHANGE, "");
    //            String sendMessage = "producerName:" + this.producerName + ",message:"+message;
                for (int i = 0; i < 1000;i++) {
                    String sendMessage = "producerName:" + this.producerName + ",message:"+message + "----"+i;
                    channel.basicPublish("", RabitMqConnectionConfig.RABIT_TEST_QUEUE, null, sendMessage.getBytes("utf-8"));
                    System.out.println("sendMessage:["+ sendMessage +"]");
                }
                
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }finally {
                if(channel != null) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
                if(connection != null) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        
        public static void main(String[] args) {
            String message = "你好棒呀";
            new RabitMqProducer("pro_aaa").send(message);
            new RabitMqProducer("pro_bbb").send(message);
            new RabitMqProducer("pro_ccc").send(message);
            new RabitMqProducer("pro_ddd").send(message);
        }
    }
    RabitMqConsumer.java
    package com.rabitmq.consumer;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.CancelCallback;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DeliverCallback;
    import com.rabbitmq.client.Delivery;
    import com.rabitmq.config.RabitMqConnectionConfig;
    
    public class RabitMqConsumer {
        
        public RabitMqConsumer() {
            
        }
    
        public String receive(final String consumerName) {
            System.out.println( consumerName + "  receive start" );
            Connection connection =null;
            Channel channel = null;
            try {
                connection = RabitMqConnectionConfig.getConnection();
                channel = connection.createChannel();
                Map<String, Object> args = new HashMap<>();
                args.put("x-queue-type", "classic");
                channel.queueDeclare(RabitMqConnectionConfig.RABIT_TEST_QUEUE, true, false, false, args);
                channel.basicConsume(RabitMqConnectionConfig.RABIT_TEST_QUEUE, true, new DeliverCallback() {
                    
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        String messageStr = new String(message.getBody(),"utf-8");
                        System.out.println("consumerName :" + consumerName + " ,deliver call back" + consumerTag + ",message:" + messageStr);
                    }
                }, new CancelCallback() {
                    
                    @Override
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("consumerName :" + consumerName + " ,Cancel call back :" + consumerTag);
                    }
                });
                
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }finally {
    //            if(channel != null) {
    //                try {
    //                    channel.close();
    //                } catch (IOException e) {
    //                    e.printStackTrace();
    //                } catch (TimeoutException e) {
    //                    e.printStackTrace();
    //                }
    //            }
    //            if(connection != null) {
    //                try {
    //                    connection.close();
    //                } catch (IOException e) {
    //                    e.printStackTrace();
    //                }
    //            }
    
            }
            
            
            return "";
        }
        
        public static void main(String[] args) {
            new RabitMqConsumer().receive("consumer_aaa");
            new RabitMqConsumer().receive("consumer_bbb");
            new RabitMqConsumer().receive("consumer_ccc");
            new RabitMqConsumer().receive("consumer_ddd");
        }
    }
     
     
     
     
     
     
     
     
     
     
     

  • 相关阅读:
    C#关于MSMQ通过HTTP远程发送专有队列消息的问题
    ASP.NET中进行消息处理(MSMQ) 三
    ASP.NET中进行消息处理(MSMQ) 二
    ASP.NET中进行消息处理(MSMQ) 一
    日志插件 log4net 的使用
    在64位windows下使用instsrv.exe和srvany.exe创建windows服务
    Windows下MemCache多端口安装配置
    把页面上DIV元素生成图片
    memcached协议
    没钱买珍珠首饰,能够画一个
  • 原文地址:https://www.cnblogs.com/--net/p/12806820.html
Copyright © 2011-2022 走看看