zoukankan      html  css  js  c++  java
  • 10.RabbitMQ Fanout类型交换机

    Fanout类型交换机忽略Routing Key,它将消息传递到所有与它绑定的队列上。

     
    10.RabbitMQ <wbr>Fanout类型交换机
    10.RabbitMQ <wbr>Fanout类型交换机
     
    10.RabbitMQ <wbr>Fanout类型交换机
    10.RabbitMQ <wbr>Fanout类型交换机
    10.RabbitMQ <wbr>Fanout类型交换机
     
    Producer.java
    package com.test.fanout;
     
    import com.rabbitmq.client.*;
     
    import java.io.IOException;
    import java.lang.String;
    import java.lang.System;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Scanner;
     
    public class Producer {
     
        public static void main(String[] args) throws Exception {
          //使用默认端口连接MQ
            ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("admin");
            factory.setHost("192.168.169.142"); //使用默认端口5672
            Connection conn = factory.newConnection(); //声明一个连接
            Channel channel = conn.createChannel(); //声明消息通道
       
    String message = "hello world!";
    String queueName1 = "queue_fanout1";
    String queueName2 = "queue_fanout2";
    String queueName3 = "queue_fanout3";
    String exchangeName = "test.fanout";
    //Routing Key
    channel.queueDeclare(queueName1, false, false, false, null);
    channel.queueDeclare(queueName2, false, false, false, null);
    channel.queueDeclare(queueName3, false, false, false, null);
    channel.exchangeDeclare(exchangeName, "fanout", false, false, null);
     
    channel.queueBind(queueName1, exchangeName, "");
    channel.queueBind(queueName2, exchangeName, "");
    channel.queueBind(queueName3, exchangeName, "");
     
    channel.basicPublish(exchangeName, "",
    MessageProperties.TEXT_PLAIN, message.getBytes());
     
    System.out.println("Message "" + message + "" sent successfully.");
     
    channel.close();
    conn.close();
        }
     
    }
     
    Customer.java
    package com.test.fanout;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
     
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
     
    //通过channel.basicAck向服务器发送回执,删除服务上的消息
    public class Consumer implements com.rabbitmq.client.Consumer{
    private Channel channel;
     
        public static void main(String[] args) throws Exception {
          //使用默认端口连接MQ
            ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("admin");
        factory.setPassword("admin");
            factory.setHost("192.168.169.142"); //使用默认端口5672
            Connection conn = factory.newConnection(); //声明一个连接
            Channel channel = conn.createChannel(); //声明消息通道
     
    String queueName = args[0];//"queue_fanout1";
     
    channel.queueDeclare(queueName, false, false, false, null);
     
    Consumer consumer = new Consumer();
    consumer.channel = channel;
     
    channel.basicConsume(queueName, false, consumer);
        }
     
    @Override
    public void handleConsumeOk(String consumerTag) {
    // TODO Auto-generated method stub
    System.out.println("Consumer "" + consumerTag + "" has subscribed.");
    }
     
    @Override
    public void handleCancelOk(String consumerTag) {
    // TODO Auto-generated method stub
    }
     
    @Override
    public void handleCancel(String consumerTag) throws IOException {
    // TODO Auto-generated method stub
    }
     
    @Override
    public void handleDelivery(String consumerTag, Envelope env,
    BasicProperties props, byte[] body) throws IOException {
    // TODO Auto-generated method stub
    System.out.println("Message "" + new String(body) + "" received.");
    channel.basicAck(env.getDeliveryTag(), false);
    }
     
    @Override
    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
    // TODO Auto-generated method stub
    }
     
    @Override
    public void handleRecoverOk(String consumerTag) {
    // TODO Auto-generated method stub
    }
    }
     
     
  • 相关阅读:
    angular2监听页面大小变化
    angular如何引入公共JS
    angular使用Md5加密
    angular4模块中标签添加背景图
    angular使用sass的scss语法
    设置angular公共样式表
    更改angular的默认端口
    angular模拟web API
    RN与webview通讯
    shell通过ping检测整个网段IP的网络状态脚本
  • 原文地址:https://www.cnblogs.com/zzpblogs/p/8168817.html
Copyright © 2011-2022 走看看