zoukankan      html  css  js  c++  java
  • simple简单消息队列

    一:介绍

    1.优缺点

      简单,但是耦合性较高。

      这种模式是生产者与消费者一一对应,就是一个产生者,有一个消费者来消费。

      如果,多个消费者想消费一个队列中的消息就不适合了。这种情况在后面会接着介绍。

    2.进入官网

      进入get start

      

      然后进入Tutorials

      

      发现简单消息队列

      

    二:新建项目

    1.新建maven项目

      

    2.pomwenjian  

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>mq</groupId>
     8     <artifactId>rabbitmqTest</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <dependencies>
    12         <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    13         <dependency>
    14             <groupId>com.rabbitmq</groupId>
    15             <artifactId>amqp-client</artifactId>
    16             <version>3.6.2</version>
    17         </dependency>
    18         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
    19         <dependency>
    20             <groupId>org.slf4j</groupId>
    21             <artifactId>slf4j-api</artifactId>
    22             <version>1.7.10</version>
    23         </dependency>
    24         <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
    25         <dependency>
    26             <groupId>org.slf4j</groupId>
    27             <artifactId>slf4j-log4j12</artifactId>
    28             <version>1.7.5</version>
    29             <scope>test</scope>
    30         </dependency>
    31         <!-- https://mvnrepository.com/artifact/log4j/log4j -->
    32         <dependency>
    33             <groupId>log4j</groupId>
    34             <artifactId>log4j</artifactId>
    35             <version>1.2.17</version>
    36         </dependency>
    37         <!-- https://mvnrepository.com/artifact/junit/junit -->
    38         <dependency>
    39             <groupId>junit</groupId>
    40             <artifactId>junit</artifactId>
    41             <version>4.11</version>
    42             <scope>test</scope>
    43         </dependency>
    44 
    45 
    46     </dependencies>
    47 
    48 </project>

    3.公共类

      获取连接

     1 package com.mq.utils;
     2 
     3 import com.rabbitmq.client.Connection;
     4 import com.rabbitmq.client.ConnectionFactory;
     5 
     6 public class ConnectionUtil {
     7     /**
     8      * 获取connection连接
     9      */
    10     public static Connection getConnection()throws Exception{
    11         //定义一个连接工厂
    12         ConnectionFactory factory=new ConnectionFactory();
    13         //设置服务地址
    14         factory.setHost("127.0.0.1");
    15         //设置AMQP端口
    16         factory.setPort(5672);
    17         //vhost
    18         factory.setVirtualHost("/cjhost");
    19         //用户名
    20         factory.setUsername("caojun");
    21         //密码
    22         factory.setPassword("123456");
    23         //返回连接
    24         return factory.newConnection();
    25     }
    26 }

    4.项目结构

      

    三:生产者

    1.程序

     1 package com.mq.send;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 
     7 public class SimpleSend {
     8     private static final String QUENE_NAME="test_simple_queue";
     9     public static void main(String[] args) throws Exception {
    10         //获取一个连接
    11         Connection connection= ConnectionUtil.getConnection();
    12         //从连接中获取一个通道
    13         Channel channel=connection.createChannel();
    14         //创建队列声明
    15         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    16 
    17         //消息
    18         String strBody="Hello Mq";
    19 
    20         //发送
    21         channel.basicPublish("",QUENE_NAME,null,strBody.getBytes());
    22         System.out.println("send strBody:"+strBody);
    23 
    24         //关闭连接
    25         channel.close();
    26         connection.close();
    27     }
    28 }

    2.运行

      控制台:

      

      管理平台:

      

    3.使用管理平台获取消息

      这个时候,队列中的消息就会被消费掉。

      

    四:消费者

    1.程序一

      这个程序中的API是3.4之前的,现在还能用

     1 package com.mq.receive;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.Channel;
     5 import com.rabbitmq.client.Connection;
     6 import com.rabbitmq.client.QueueingConsumer;
     7 import com.rabbitmq.client.QueueingConsumer.Delivery;
     8 
     9 public class SimpleReceive {
    10     private static final String QUENE_NAME="test_simple_queue";
    11     public static void main(String[] args) throws Exception {
    12         //获取一个连接
    13         Connection connection= ConnectionUtil.getConnection();
    14         //创建通道
    15         Channel channel=connection.createChannel();
    16         //定义消费者
    17         QueueingConsumer consumer=new QueueingConsumer(channel);
    18         //监听队列
    19         channel.basicConsume(QUENE_NAME,true,consumer);
    20         while (true){
    21             Delivery delivery=consumer.nextDelivery();
    22             String strBody=new String(delivery.getBody());
    23             System.out.println("receive strBody:"+strBody);
    24         }
    25     }
    26 }

    2.效果

      

    3.程序二

      这个是新的api

     1 package com.mq.receive;
     2 
     3 import com.mq.utils.ConnectionUtil;
     4 import com.rabbitmq.client.*;
     5 import com.rabbitmq.client.QueueingConsumer.Delivery;
     6 
     7 import java.io.IOException;
     8 
     9 public class SimpleReceive {
    10     private static final String QUENE_NAME = "test_simple_queue";
    11 
    12     public static void main(String[] args) throws Exception {
    13         newApi();
    14     }
    15     public static void newApi()throws Exception{
    16         //获取一个连接
    17         Connection connection = ConnectionUtil.getConnection();
    18         //创建通道
    19         Channel channel = connection.createChannel();
    20         //创建队列声明
    21         channel.queueDeclare(QUENE_NAME,false,false,false,null);
    22         //创建消费者
    23         DefaultConsumer consumer=new DefaultConsumer(channel){
    24             @Override
    25             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    26                String strBody=new String(body,"utf-8");
    27                System.out.println("receive new strBody:"+strBody);
    28             }
    29         };
    30         //监听队列
    31         channel.basicConsume(QUENE_NAME,true,consumer);
    32     }
    33 
    34     /**
    35      * 这个是老的API
    36      * @throws Exception
    37      */
    38     public static void oldApi() throws Exception {
    39         //获取一个连接
    40         Connection connection = ConnectionUtil.getConnection();
    41         //创建通道
    42         Channel channel = connection.createChannel();
    43         //定义消费者
    44         QueueingConsumer consumer = new QueueingConsumer(channel);
    45         //监听队列
    46         channel.basicConsume(QUENE_NAME, true, consumer);
    47         while (true) {
    48             Delivery delivery = consumer.nextDelivery();
    49             String strBody = new String(delivery.getBody());
    50             System.out.println("receive strBody:" + strBody);
    51         }
    52     }
    53 }

    4.效果  

      

  • 相关阅读:
    JAVA 之 JSTL
    IDEA 之 ERROR:无法在web.xml或使用此应用程序部署的jar文件中解析绝对uri:[http://java.sun.com/jsp/jstl/core]
    JAVA 之 EL表达式
    IDEA 之 ERROR:端口被占用
    【ubuntu】windows+ubuntu 设置windows为第一启动项
    【ubuntu】Error: environment block too small. Press any key to continue
    Navicat premium15安装破解教程
    通过django中间件和python魔法方法实现自定义session(通过文件存储session)
    每日作业 7/2
    每日作业 7/1
  • 原文地址:https://www.cnblogs.com/juncaoit/p/8570703.html
Copyright © 2011-2022 走看看