zoukankan      html  css  js  c++  java
  • RabbitMQ 几种工作模式---(一)helloworld

    生产者类:

    
    
    package com..helloworld;


    import com..utils.RabbitConstant;
    import com..utils.RabbitUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {

    //TCP 物理连接
    Connection conn= RabbitUtils.getConnection();
    //创建通信“通道”,相当于TCP中的虚拟连接
    Channel channel = conn.createChannel();
    //创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
    //第一个参数:队列名称ID
    //第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
    //第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
    //第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
    //其他额外的参数, null
    channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
    //四个参数
    //exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
    //队列名称
    //额外的设置属性
    //最后一个参数是要传递的消息字节数组
    String message = "hello!";
    channel.basicPublish("" , RabbitConstant.QUEUE_HELLOWORLD,null , message.getBytes());
    channel.close();
    conn.close();
    System.out.println("发送数据成功");
    }
    }
     

    运行后台打印:

    页面队列数据显示:

    消费者类:

    package com..helloworld;
    
    
    import com..utils.RabbitConstant;
    import com..utils.RabbitUtils;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            Connection conn= RabbitUtils.getConnection();
            //创建通道
            Channel channel =  conn.createChannel();
            channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);
            //创建一个消息消费者
            //第二个参数代表是否自动确认收到消息,false代表手动编程来确认消息,这是MQ的推荐做法
            //第三个参数要传入DefaultConsumer的实现类
            channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD, false, new Reciver(channel));
    
        }
    }
    
    class Reciver extends DefaultConsumer{
        private Channel channel;
        //重写构造函数,Channel通道对象需要从外层传入,在handleDelivery中要用到
        public Reciver(Channel channel) {
            super(channel);
            this.channel = channel;
        }
    
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
            /*super.handleDelivery(consumerTag,envelope,properties,body);*/
            String messageBody = new String(body);
            System.out.println("消费者接收到:" + messageBody);
            //签收消息,确认消息
            //envelope.getDeliveryTag() 获取这个消息的TagId
            //false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
            channel.basicAck(envelope.getDeliveryTag() , false);
        }
    }

    运行后台打印:

    页面队列数据显示:

     

  • 相关阅读:
    linux如何查看端口或服务被占用情况
    linux网络查看及配置相关命令
    linux查看程序运行相关命令
    shell脚本编写一个用真实用户去访问的vsftpd服务器
    shell脚本监控CPU和内存利用率
    小白的个人技能树(基于自动化软件测试开发实习和软件开发实习)
    MySQL 8.0.12 基于Windows 安装教程(超级详细)
    C语言 0x7fffffff是多少(也就是INT_MAX,首位是 0,其余都是1,f代表1111)
    数通知识点
    数据结构之算法基础
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14303763.html
Copyright © 2011-2022 走看看