zoukankan      html  css  js  c++  java
  • RabbitMQ java 原生代码

    rabbitMQ 的交换器有四种类型:direct、fanout、topic、headers

     以下是具体的代码:

    direct:路由键只能全部匹配,才能进入到指定队列中。其他使用

    direct生产者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * direct 生产者
     */
    public class DirectPro {
    
        public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
        public final static Integer SEND_NUM = 10;//发送消息次数
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂,连接RabbitMQ
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("IP");//端口号、用户名、密码可以使用默认的
            connectionFactory.setUsername("用户名");
            connectionFactory.setPassword("密码");
            connectionFactory.setPort(5672);
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //交换器和队列绑定放到消费者进行
            //自定义路由键
            List<String> routeKey = Arrays.asList("key1","key2","key3");
            //发送消息
            for (int i=0;i<SEND_NUM;i++){
                String key = routeKey.get(i%routeKey.size());//发送的key
                String msg = "hello rabbitmq"+i;//发送的消息
                //消息进行发送
                channel.basicPublish(EXCHANGE_NAME,key,null,msg.getBytes());
                System.out.println("send:"+key+"==="+msg);
            }
            channel.close();
            connection.close();
        }
    
    }

    direct消费者

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * direct 消费者
     */
    public class DirectCon {
    
        public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
    
        public final static String QUEUE_NAME = "queue_name";
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂,连接RabbitMQ
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("IP");//端口号、用户名、密码可以使用默认的
            connectionFactory.setUsername("用户名");
            connectionFactory.setPassword("密码");
            connectionFactory.setPort(5672);
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //交换器和队列绑定
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key1");
            System.out.println("waiting message.....");
    
            //声明消费者
            final Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println("Received:"+envelope.getRoutingKey()+"========"+message);
                }
            };
            //消费者在指定的对队列上消费
            channel.basicConsume(QUEUE_NAME,true,consumer);
    
        }
    
    }

    3:执行结果:首先启动消费者,再启动发送者

    生产者

    消费者

    fanout:消息能发送到所有队列上,跟路由键没有任何关系。

    fanout生产者:重新定义一个交换器,只需将交换器设置成fanout就可以

      //在信道中设置交换器

      channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

    fanout消费者:重新定义一个交换器和队列,将交换器设置成fanout,绑定的key可以随便写。

      //在信道中设置交换器
      channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
      //声明队列
      channel.queueDeclare(QUEUE_NAME,false,false,false,null);
      //交换器和队列绑定
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"abc");

    结果

    生产者都一样

     消费者

    topic:按照*、#的匹配规则,进入到对应的队列

    topic生产者:只需将交换器设置成topic,路由键的设置必须是用点. 进行分割("key1.k","key2.k","key3.k")

    topic消费者:重新定义一个交换器和队列

    //交换器和队列绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2.*");

    这样子就只能匹配key2.开头的

     

    结果:生产者都一样

    消费者

    headers:是根据头部的消息映射到队列的。特殊的值x-match:all(全部匹配)、any(任何一个)。

    生产者

            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
            //设置要发送headers值
            Map<String, Object> heardersMap = new HashMap<String, Object>();
            heardersMap.put("api", "login");
            heardersMap.put("version", 1.0);
            heardersMap.put("radom", UUID.randomUUID().toString());
            //设置消息的属性
            AMQP.BasicProperties pro = new AMQP.BasicProperties.Builder()
                    .headers(heardersMap)
                    .build();
    
            //发送消息
            for (int i=0;i<SEND_NUM;i++){
                String msg = "hello rabbitmq"+i;//发送的消息
                //消息进行发送
                channel.basicPublish(EXCHANGE_NAME,"",pro,msg.getBytes());
                System.out.println("send:"+msg);
            }
    消费者
            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
            Map<String, Object> arguments = new HashMap<String, Object>();
            arguments.put("x-match", "any");
            arguments.put("api", "login");
            arguments.put("version", 1.0);
            arguments.put("dataType", "json");
    
            //交换器和队列绑定
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName,EXCHANGE_NAME,"",arguments);
            System.out.println("waiting message.....");
    
            //声明消费者
            final Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println(" [HeaderRecv] Received '" +
                            properties.getHeaders() + "':'" + message + "'");
                }
            };
    结果:发送者一样
    消费者

     以上就是Java对原生的RabbitMQ基本使用。

     

  • 相关阅读:
    HOJ 2139 Spiderman's workout(动态规划)
    FZU 2107 Hua Rong Dao(dfs)
    Java 第十一届 蓝桥杯 省模拟赛 计算机存储中有多少字节
    Java 第十一届 蓝桥杯 省模拟赛 计算机存储中有多少字节
    Java 第十一届 蓝桥杯 省模拟赛 计算机存储中有多少字节
    Java 第十一届 蓝桥杯 省模拟赛 合法括号序列
    Java 第十一届 蓝桥杯 省模拟赛 合法括号序列
    Java 第十一届 蓝桥杯 省模拟赛 合法括号序列
    Java 第十一届 蓝桥杯 省模拟赛 无向连通图最少包含多少条边
    Java 第十一届 蓝桥杯 省模拟赛 无向连通图最少包含多少条边
  • 原文地址:https://www.cnblogs.com/orange-time/p/11630783.html
Copyright © 2011-2022 走看看