zoukankan      html  css  js  c++  java
  • simple简单消息队列

    一:介绍

    1.优缺点

      简单,但是耦合性较高。

      这种模式是生产者与消费者一一对应,就是一个产生者,有一个消费者来消费。

      如果,多个消费者想消费一个队列中的消息就不适合了。这种情况在后面会接着介绍。

    2.进入官网

      进入get start

      

      然后进入Tutorials

      

      发现简单消息队列

      

    二:新建项目

    1.新建maven项目

      

    2.pomwenjian  

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>mq</groupId>
     8     <artifactId>rabbitmqTest</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <dependencies>
    12         <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    13         <dependency>
    14             <groupId>com.rabbitmq</groupId>
    15             <artifactId>amqp-client</artifactId>
    16             <version>3.6.2</version>
    17         </dependency>
    18         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    19         <dependency>
    20             <groupId>org.slf4j</groupId>
    21             <artifactId>slf4j-api</artifactId>
    22             <version>1.7.10</version>
    23         </dependency>
    24         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    25         <dependency>
    26             <groupId>org.slf4j</groupId>
    27             <artifactId>slf4j-log4j12</artifactId>
    28             <version>1.7.5</version>
    29             <scope>test</scope>
    30         </dependency>
    31         <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    32         <dependency>
    33             <groupId>log4j</groupId>
    34             <artifactId>log4j</artifactId>
    35             <version>1.2.17</version>
    36         </dependency>
    37         <!-- https://mvnrepository.com/artifact/junit/junit -->
    38         <dependency>
    39             <groupId>junit</groupId>
    40             <artifactId>junit</artifactId>
    41             <version>4.11</version>
    42             <scope>test</scope>
    43         </dependency>
    44 
    45 
    46     </dependencies>
    47 
    48 </project>

    3.公共类

      获取连接

     1 package com.mq.utils;
     2 
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5 
     6 public class ConnectionUtil {
     7     /**
     8      * 获取connection连接
     9      */
    10     public static Connection getConnection()throws Exception{
    11         //定义一个连接工厂
    12         ConnectionFactory factory=new ConnectionFactory();
    13         //设置服务地址
    14         factory.setHost("127.0.0.1");
    15         //设置AMQP端口
    16         factory.setPort(5672);
    17         //vhost
    18         factory.setVirtualHost("/cjhost");
    19         //用户名
    20         factory.setUsername("caojun");
    21         //密码
    22         factory.setPassword("123456");
    23         //返回连接
    24         return factory.newConnection();
    25     }
    26 }

    4.项目结构

      

    三:生产者

    1.程序

     1 package com.mq.send;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class SimpleSend {
     8     private static final String QUENE_NAME="test_simple_queue";
     9     public static void main(String[] args) throws Exception {
    10         //获取一个连接
    11         Connection connection= ConnectionUtil.getConnection();
    12         //从连接中获取一个通道
    13         Channel channel=connection.createChannel();
    14         //创建队列声明
    15         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    16 
    17         //消息
    18         String strBody="Hello Mq";
    19 
    20         //发送
    21         channel.basicPublish("",QUENE_NAME,null,strBody.getBytes());
    22         System.out.println("send strBody:"+strBody);
    23 
    24         //关闭连接
    25         channel.close();
    26         connection.close();
    27     }
    28 }

    2.运行

      控制台:

      

      管理平台:

      

    3.使用管理平台获取消息

      这个时候,队列中的消息就会被消费掉。

      

    四:消费者

    1.程序一

      这个程序中的API是3.4之前的,现在还能用

     1 package com.mq.receive;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.QueueingConsumer;
     7 import com.rabbitmq.client.QueueingConsumer.Delivery;
     8 
     9 public class SimpleReceive {
    10     private static final String QUENE_NAME="test_simple_queue";
    11     public static void main(String[] args) throws Exception {
    12         //获取一个连接
    13         Connection connection= ConnectionUtil.getConnection();
    14         //创建通道
    15         Channel channel=connection.createChannel();
    16         //定义消费者
    17         QueueingConsumer consumer=new QueueingConsumer(channel);
    18         //监听队列
    19         channel.basicConsume(QUENE_NAME,true,consumer);
    20         while (true){
    21             Delivery delivery=consumer.nextDelivery();
    22             String strBody=new String(delivery.getBody());
    23             System.out.println("receive strBody:"+strBody);
    24         }
    25     }
    26 }

    2.效果

      

    3.程序二

      这个是新的api

     1 package com.mq.receive;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 import com.rabbitmq.client.QueueingConsumer.Delivery;
     6 
     7 import java.io.IOException;
     8 
     9 public class SimpleReceive {
    10     private static final String QUENE_NAME = "test_simple_queue";
    11 
    12     public static void main(String[] args) throws Exception {
    13         newApi();
    14     }
    15     public static void newApi()throws Exception{
    16         //获取一个连接
    17         Connection connection = ConnectionUtil.getConnection();
    18         //创建通道
    19         Channel channel = connection.createChannel();
    20         //创建队列声明
    21         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    22         //创建消费者
    23         DefaultConsumer consumer=new DefaultConsumer(channel){
    24             @Override
    25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    26                String strBody=new String(body,"utf-8");
    27                System.out.println("receive new strBody:"+strBody);
    28             }
    29         };
    30         //监听队列
    31         channel.basicConsume(QUENE_NAME,true,consumer);
    32     }
    33 
    34     /**
    35      * 这个是老的API
    36      * @throws Exception
    37      */
    38     public static void oldApi() throws Exception {
    39         //获取一个连接
    40         Connection connection = ConnectionUtil.getConnection();
    41         //创建通道
    42         Channel channel = connection.createChannel();
    43         //定义消费者
    44         QueueingConsumer consumer = new QueueingConsumer(channel);
    45         //监听队列
    46         channel.basicConsume(QUENE_NAME, true, consumer);
    47         while (true) {
    48             Delivery delivery = consumer.nextDelivery();
    49             String strBody = new String(delivery.getBody());
    50             System.out.println("receive strBody:" + strBody);
    51         }
    52     }
    53 }

    4.效果  

      

  • 相关阅读:
    [LeetCode]2. Add Two Numbers链表相加
    Integration between Dynamics 365 and Dynamics 365 Finance and Operation
    向视图列添加自定义图标和提示信息 -- PowerApps / Dynamics365
    Update the Power Apps portals solution
    Migrate portal configuration
    Use variable to setup related components visible
    Loyalty management on Retail of Dynamic 365
    Modern Fluent UI controls in Power Apps
    Change screen size and orientation of a canvas app in Power App
    Communication Plan for Power Platform
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8570703.html
Copyright © 2011-2022 走看看