zoukankan      html  css  js  c++  java
  • RabbitMQ java 原生代码

    rabbitMQ 的交换器有四种类型:direct、fanout、topic、headers

     以下是具体的代码:

    direct:路由键只能全部匹配,才能进入到指定队列中。其他使用

    direct生产者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * direct 生产者
     */
    public class DirectPro {
    
        public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
        public final static Integer SEND_NUM = 10;//发送消息次数
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂,连接RabbitMQ
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("IP");//端口号、用户名、密码可以使用默认的
            connectionFactory.setUsername("用户名");
            connectionFactory.setPassword("密码");
            connectionFactory.setPort(5672);
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //交换器和队列绑定放到消费者进行
            //自定义路由键
            List<String> routeKey = Arrays.asList("key1","key2","key3");
            //发送消息
            for (int i=0;i<SEND_NUM;i++){
                String key = routeKey.get(i%routeKey.size());//发送的key
                String msg = "hello rabbitmq"+i;//发送的消息
                //消息进行发送
                channel.basicPublish(EXCHANGE_NAME,key,null,msg.getBytes());
                System.out.println("send:"+key+"==="+msg);
            }
            channel.close();
            connection.close();
        }
    
    }

    direct消费者

    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    /**
     * direct 消费者
     */
    public class DirectCon {
    
        public final static String EXCHANGE_NAME = "direct_exchange";//direct交换器名称
    
        public final static String QUEUE_NAME = "queue_name";
    
        public static void main(String[] args) throws Exception {
            //创建连接工厂,连接RabbitMQ
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("IP");//端口号、用户名、密码可以使用默认的
            connectionFactory.setUsername("用户名");
            connectionFactory.setPassword("密码");
            connectionFactory.setPort(5672);
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //交换器和队列绑定
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key1");
            System.out.println("waiting message.....");
    
            //声明消费者
            final Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println("Received:"+envelope.getRoutingKey()+"========"+message);
                }
            };
            //消费者在指定的对队列上消费
            channel.basicConsume(QUEUE_NAME,true,consumer);
    
        }
    
    }

    3:执行结果:首先启动消费者,再启动发送者

    生产者

    消费者

    fanout:消息能发送到所有队列上,跟路由键没有任何关系。

    fanout生产者:重新定义一个交换器,只需将交换器设置成fanout就可以

      //在信道中设置交换器

      channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

    fanout消费者:重新定义一个交换器和队列,将交换器设置成fanout,绑定的key可以随便写。

      //在信道中设置交换器
      channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
      //声明队列
      channel.queueDeclare(QUEUE_NAME,false,false,false,null);
      //交换器和队列绑定
      channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"abc");

    结果

    生产者都一样

     消费者

    topic:按照*、#的匹配规则,进入到对应的队列

    topic生产者:只需将交换器设置成topic,路由键的设置必须是用点. 进行分割("key1.k","key2.k","key3.k")

    topic消费者:重新定义一个交换器和队列

    //交换器和队列绑定
    channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2.*");

    这样子就只能匹配key2.开头的

     

    结果:生产者都一样

    消费者

    headers:是根据头部的消息映射到队列的。特殊的值x-match:all(全部匹配)、any(任何一个)。

    生产者

            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
            //设置要发送headers值
            Map<String, Object> heardersMap = new HashMap<String, Object>();
            heardersMap.put("api", "login");
            heardersMap.put("version", 1.0);
            heardersMap.put("radom", UUID.randomUUID().toString());
            //设置消息的属性
            AMQP.BasicProperties pro = new AMQP.BasicProperties.Builder()
                    .headers(heardersMap)
                    .build();
    
            //发送消息
            for (int i=0;i<SEND_NUM;i++){
                String msg = "hello rabbitmq"+i;//发送的消息
                //消息进行发送
                channel.basicPublish(EXCHANGE_NAME,"",pro,msg.getBytes());
                System.out.println("send:"+msg);
            }
    消费者
            //在信道中设置交换器
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
    
            Map<String, Object> arguments = new HashMap<String, Object>();
            arguments.put("x-match", "any");
            arguments.put("api", "login");
            arguments.put("version", 1.0);
            arguments.put("dataType", "json");
    
            //交换器和队列绑定
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName,EXCHANGE_NAME,"",arguments);
            System.out.println("waiting message.....");
    
            //声明消费者
            final Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String message = new String(body,"utf-8");
                    System.out.println(" [HeaderRecv] Received '" +
                            properties.getHeaders() + "':'" + message + "'");
                }
            };
    结果:发送者一样
    消费者

     以上就是Java对原生的RabbitMQ基本使用。

     

  • 相关阅读:
    Redis数据模型
    Redis集群使用的一些命令(持续更新)
    Redis简单集群搭建
    观察者模式
    抽象工厂模式
    简单工厂模式及其简单Java案例代码实现
    工厂方法模式及简单Java案例代码实现
    Java中的双重检查锁(double checked locking)
    BayaiM__MYSQL千万级数据量的优化方法积累__初级菜鸟
    BayaiM__Linux安装MySQL的两种方法
  • 原文地址:https://www.cnblogs.com/orange-time/p/11630783.html
Copyright © 2011-2022 走看看