zoukankan      html  css  js  c++  java
  • 消息中间件RabbitMq的代码使用案例

     正文前先来一波福利推荐:

    福利一:

    百万年薪架构师视频,该视频可以学到很多东西,是本人花钱买的VIP课程,学习消化了一年,为了支持一下女朋友公众号也方便大家学习,共享给大家。

    福利二:

    毕业答辩以及工作上各种答辩,平时积累了不少精品PPT,现在共享给大家,大大小小加起来有几千套,总有适合你的一款,很多是网上是下载不到。

    获取方式:

    微信关注 精品3分钟 ,id为 jingpin3mins,关注后回复   百万年薪架构师 ,精品收藏PPT  获取云盘链接,谢谢大家支持!

    ------------------------正文开始---------------------------

    消费者:

    ---------------------- 构造初始化:

    public RabbitMqReceiver(String host, int port, String username, String password) 
    {
    connFactory = new ConnectionFactory();
    connFactory.setHost(host);
    connFactory.setPort(port);
    connFactory.setUsername(username);
    connFactory.setPassword(password);
    }
    ********************************************************************************


    ---------------------- 构造初始化:
        public Channel createChannel() throws IOException {
    getConnection();
    Channel channel = connection.createChannel();
    if (channel != null) {
               int prefetchCount = 1;
               channel.basicQos(prefetchCount);//最多为当前接收方发送一条消息。如果接收方还未处理完毕消息,还没有回发确认,就不要再给他分配消息了,应该把当前消息分配给其它空闲接收方。
            boolean durable = true; //Server端的Queue持久化
            channel.queueDeclare("task_queue", durable, false, false, null); 
    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功创建Channel");
    } else {
    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver创建Channel失败");
    }

    return channel;
    }
    ********************************************************************************

    ---------------------- 取得connection实例:
    private void getConnection() throws IOException
    {
    synchronized (this) {
    if (connection == null || !connection.isOpen()) {
    connection = connFactory.newConnection();
    if (connection != null) {
    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver成功获取连接");
    } else {
    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver获取连接失败");
    }
    } else {
    logger.info(mqInfo.getAddress() + ":" + mqInfo.getPort() + " MQ Receiver连接已存在,复用此连接");
    }
    }
    }
    ********************************************************************************

    ----------------------获取Consumer实例:
    public QueueingConsumer createConsumer(Channel channel, String queueName) throws IOException {
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(queueName, true, consumer); //自动消息确认打开,默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器再删除该消息。

    return consumer;
    }
    ********************************************************************************

    ----------------------从从rabbitMQ提取消息并转换为对象:
    private String getMessageFromMQ() {
    String message = StringUtils.EMPTY;
    String source = StringUtils.EMPTY;
    try {
    message = receiver.nextMessage(checkNotNull(consumer), 1000);
    source = message;
    } catch (ShutdownSignalException e) {
    logger.error("", e);
    } catch (ConsumerCancelledException e) {
    logger.error("consumer exception", e);
    } catch (InterruptedException e) {
    logger.error("timeout exception", e);
    }
    try {
    if (StringUtils.isNotBlank(message)) {
    message = checkNotNull(StringUtils.substringAfter(message, "yyy:"), "xxx");
    message = checkNotNull(StringEscapeUtils.unescapeJava(message), "unescape error");
    int size = message.length();
    if (size > 1) {
    message = checkNotNull(message.substring(0, message.length() - 1), "get json-data error");// 去掉末尾的”
    } else {
    logger.warn(String.format("数据异常,message=%s", source));
    }
    }
    } catch (Throwable e) {
    logger.error(String.format("数据异常,message=%s", source), e);
    }
    return message;
    }
    ********************************************************************************

    ----------------------每次读取一条消息:

    public String nextMessage(QueueingConsumer consumer, long timeOut) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException {

    QueueingConsumer.Delivery delivery;
    if (timeOut > 0) {
    delivery = consumer.nextDelivery(timeOut);
    } else {
    delivery = consumer.nextDelivery();
    }
    if (delivery == null) {
    return StringUtils.EMPTY;
    }

    String message = new String(delivery.getBody());
    return message;
    }
    ********************************************************************************
    
    
    ---------------------- 在storm中创建mq实例:
    SpoutOutputCollector collector;
    RabbitMqReceiver receiver;
    Channel channel;
    QueueingConsumer consumer;
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  //初始化调用一次
    {
    this.collector = collector;
    receiver = checkNotNull(new RabbitMqReceiver(conf.get("crash.mq.host").toString(),
    Integer.valueOf(conf.get("crash.mq.port").toString()), conf.get("crash.mq.user").toString(),
    conf.get("crash.mq.pwd").toString()), "receiver is null");
    try {
    channel = checkNotNull(receiver.createChannel(), "channel is null");
    consumer = checkNotNull(receiver.createConsumer(channel, conf.get("crash.mq.channel").toString()),
    "comsumer is null");
    } catch (Exception e) {
    logger.error("init mq-client error:", e);
    }
    }


    ---------------------- 在storm中循环执行获得消息实例:
    @Override
    public void nextTuple()
    {
    String message = getMessageFromMQ();
    }

    生产者:
    --------------------------------------------------:


    private final static String QUEUE_NAME = "hello2";// 队列名不能重复 之前已有就会失败
    public class Producer {  
    
        private final static String QUEUE_NAME = "hello2";// 队列名不能重复 之前已有就会失败  
    
        public static void main(String[] argv) throws java.io.IOException {  
    
            /* 使用工厂类建立Connection和Channel,并且设置参数 */  
            ConnectionFactory factory = new ConnectionFactory();  
            factory.setHost("192.168.10.111");// MQ的IP  
            factory.setPort(5672);// MQ端口  
            factory.setUsername("asdf");// MQ用户名  
            factory.setPassword("123456");// MQ密码  
            Connection connection = factory.newConnection();  
            Channel channel = connection.createChannel();  
    
            /* 创建消息队列,并且发送消息 */  
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);  
            String message = "消息2";  
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  //Message持久化
            System.out.println("生产了个'" + message + "'");  
    
            /* 关闭连接 */  
            channel.close();  
            connection.close();  
        }  
    
    }
    

      



  • 相关阅读:
    数据结构与算法-字符串与字符串匹配算法
    操作系统-PV操作的原理和几种常见问题
    操作系统-进程(8)-临界区管理
    利用队列实现逐行打印杨辉三角形的前n行
    操作系统-进程(7)死锁和银行家算法
    计算机网络-网络层(6)ICMP协议
    操作系统-进程(6)管程
    计算机网络-链路层(5)点对点链路控制
    操作系统-进程(5)进程通信机制
    RTSP/RTMP流媒体协议网页无插件直播视频平台浏览器请求地址自带尾缀符解释说明
  • 原文地址:https://www.cnblogs.com/gxyandwmm/p/11019315.html
Copyright © 2011-2022 走看看