zoukankan      html  css  js  c++  java
  • RabbitMQ小记(二)

    1、RabbitMQ相关介绍

    (1)RabbitMQ整体上是一个生产者和消费者模型,主要负责接收、存储、转发消息。RabbitMQ整体结构图如下:

     

     (2)生产者:发送消息的一方,生产者创建一条消息,发布到RabbitMQ上,消息一般分为两部分:消息体和标签,消息体是带有业务逻辑结构的数据,也可以进一步对消息体进行序列化,标签用来描述这条消息。

         消费者:接收消息的一方,消费者创建一条连接,接到RabbitMQ服务器上的队列上,当消费者消费一条队列上的消息时,只是消费消息体,标签自动丢弃,所以消费者不会知道生产者是谁。

        Broker:消息中间服务节点,一个RabbitMQ Broker可以看作是一个RabbitMQ的实例,也可看作一台rabbitMQ的服务器。

        队列:Queue,RabbitMQ的内部对象,用于存储消息。多个消费者可以订阅一个队列,不支持队列层面的广播消费。

        交换器:Exchange,生产者创建消息,把消息交给交换器,有交换器把消息发送到一个或多个队列上。如果交换器发送队列失败,消息会返回给生产者或者丢弃。RabbitMQ中交换器有四种类型:fanout、direct、topic、headers。

        fanout:四种交换器中其一,会把消息发送到所有与交换器绑定的队列上。

        direct:四种交换器其二,会把消息发送到bindingKey和RoutingKey完全匹配的队列上。

        topic:四种交换器其三,与direct相似,会把消息发送到bindingKey和RoutingKey完全匹配的队列上,但匹配规则不同。

        headers:四种交换器其四,根据消息中的headers的属性来进行匹配,性能差,基本不会使用。

        bindingKey:绑定键,RabbitMQ中通过绑定键把交换器和对列关联起来,与RoutingKey配合使用。

        RoutdingKey:路由键,生产者将消息发送给交换器时会指定一个RoutingKey,当bindingKey和RoutingKey完全匹配时,消息会被放到对应的队列上。

    1、SpringBoot整合RabbitMQ

    (1)环境配置

    项目采用maven构建系统,所以需要在pom.xml的文件中引入RabbitMQ的相关依赖:

    <!--引入RabbitMQ的相关依赖-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.2.6.RELEASE</version>
    </dependency>
    在application.yml中配置RabbitMQ相关信息:
    Spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5727
        username: guest
        password: guest

    (2)初始化RabbitMQ连接

    public class RabbitMQConfig {
    Logger logger = LoggerFactory.getLogger(RabbitMQConfig.class);
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    /**
    * rabbitmq连接
    * */
    @Bean
    public Connection rabbitMQ_Config(){
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    factory.setPort(port);
    factory.setUsername(username);
    factory.setPassword(password);
    Connection conn = null;
    try {
    //创建连接
    conn = factory.newConnection();
    } catch (IOException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    } catch (TimeoutException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    }
    return conn;
    }
    @Bean
    /**
    * rabbitmq连接.uri方式
    * */
    public Connection rabbitMQ_Config_Uri(){
    ConnectionFactory factory = new ConnectionFactory();
    Connection conn = null;
    try {
    factory.setUri("amqp://"+username+":"+password+"@"+host+":"+port+"/virtualHost");
    try {
    conn = factory.newConnection();
    } catch (IOException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    } catch (TimeoutException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    }
    } catch (URISyntaxException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    } catch (NoSuchAlgorithmException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    } catch (KeyManagementException e) {
    logger.error("创建MQ连接失败!",e);
    e.printStackTrace();
    }
    return conn;
    }
    }

    (3)生产者

    /**
    * 生产者
    * */
    @Bean
    public boolean direct_MQ_Producer(String excahngeName, String exchangeType,String Queue,String Binding,byte[] array) throws Exception{

    boolean flag = false;
    Connection con = rabbitMQ_Config();
    //创建通道
    Channel channel = con.createChannel();
    try {
    //创建交换机
    channel.exchangeDeclare(excahngeName,exchangeType,true);
    //创建队列
    channel.queueDeclare(Queue,true,false,false,null);
    //将交换机与队列绑定
    channel.queueBind(Queue,excahngeName,Binding);
    //发送消息
    channel.basicPublish(excahngeName,Binding, MessageProperties.PERSISTENT_TEXT_PLAIN,array);
    flag = true;
    } catch (IOException e) {
    logger.error("发送消息失败!",e);
    e.printStackTrace();
    }finally {
    //关闭通道连接
    channel.close();
    //关闭连接
    con.close();
    }
    return flag;
    }

    (4)消费者   

    /**
    * 推模式消费者,一次消费多条
    * */
    @Bean
    public String[] direct_MQ_Consumer(String Queue) throws IOException, TimeoutException {
    Connection con = rabbitMQ_Config();
    Channel channel = null;
    final String[] message = {""};
    try {
    channel = con.createChannel();
    //设置客户端最多接收未被接收的数目
    channel.basicQos(64);
    Channel finalChannel = channel;
    //通过重写DefaultConsumer方法来实现消费者消息
    Consumer consumer = new DefaultConsumer(finalChannel){
    @Override
    public void handleDelivery(String s1, Envelope envelope1, AMQP.BasicProperties basicProperties1, byte[] bytes) throws IOException{
    message[0] = new String(bytes);
    System.out.println("recv messahe : " + new String(bytes));
    try {
    TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    finalChannel.basicAck(envelope1.getDeliveryTag(),false);
    }
    };
    channel.basicConsume(Queue,consumer);
    TimeUnit.SECONDS.sleep(5);
    } catch (IOException e) {
    logger.error("创建通道失败!",e);
    e.printStackTrace();
    }catch (InterruptedException e) {
    e.printStackTrace();
    }finally {
    //关闭资源
    channel.close();
    con.close();
    }
    return message;
    }

    /**
    * 拉模式消费者,一次消费一条
    *
    * */
    @Bean
    public String direct_MQ_Consumer1(String Queue) throws IOException, TimeoutException {
    Connection con = rabbitMQ_Config();
    Channel channel = null;
    String message = "";
    try {
    channel = con.createChannel();
    GetResponse getResponse = channel.basicGet(Queue,false);
    message = new String(getResponse.getBody());
    channel.basicAck(getResponse.getEnvelope().getDeliveryTag(),false);
    TimeUnit.SECONDS.sleep(5);
    } catch (IOException e) {
    logger.error("创建通道失败!",e);
    e.printStackTrace();
    }catch (InterruptedException e) {
    e.printStackTrace();
    }finally {
    //关闭资源
    channel.close();
    con.close();
    }
    return message;
    }
  • 相关阅读:
    微信WeixinJSBridge API
    微信内置浏览器的JsAPI(WeixinJSBridge续)[转载]
    一套简单可依赖的Javascript库
    一款轻量级移动web开发框架
    传说中的WeixinJSBridge和微信rest接口
    点击网页分享按钮,触发微信分享功能
    Metronic前端模板
    AdminLTE前端模板
    Nginx如何配置静态文件直接访问
    架构设计流程
  • 原文地址:https://www.cnblogs.com/carblack/p/12639260.html
Copyright © 2011-2022 走看看