zoukankan      html  css  js  c++  java
  • rabbitmq java queue

    1.连接抽象封装

    package com.pccw.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import java.io.IOException;
    import com.rabbitmq.client.*;
    
    public abstract class EndPoint {
    
        protected Channel channel;
        protected Connection connection;
        protected String endPointName;
    
        public EndPoint(String endpointName) throws IOException {
            this.endPointName = endpointName;
    
            // Create a connection factory
            ConnectionFactory factory = new ConnectionFactory();
    
            // hostname of your rabbitmq server
            factory.setHost("192.168.220.132");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            
            connection = factory.newConnection();
    
            // creating a channel
            channel = connection.createChannel();
    
            // declaring a queue for this channel. If queue does not exist,
            // it will be created on the server.
            channel.queueDeclare(endpointName, false, false, false, null);
        }
        public void close() throws IOException {
            this.channel.close();
            this.connection.close();
        }
    }
    View Code

    2.生产者

    package com.pccw.rabbitmq;
    
    import java.io.IOException;
    import java.io.Serializable;
    
    import org.apache.commons.lang.SerializationUtils;
    
    public class Producer extends EndPoint{
        
        public Producer(String endPointName) throws IOException{
            super(endPointName);
        }
    
        public void sendMessage(Serializable object) throws IOException {
            channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
        }   
    }
    View Code

    3.消费者

    package com.pccw.rabbitmq;
    
    import java.io.IOException;
    import java.util.*;
    
    import org.apache.commons.lang.SerializationUtils;
    
    import com.rabbitmq.client.*;
    
    public class QueueConsumer extends EndPoint implements Runnable, Consumer {
    
        public QueueConsumer(String endPointName) throws IOException {
            super(endPointName);
        }
    
        public void run() {
            try {
                // start consuming messages. Auto acknowledge messages.
                channel.basicConsume(endPointName, true, this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * Called when consumer is registered.
         */
        public void handleConsumeOk(String consumerTag) {
            System.out.println("Consumer " + consumerTag + " registered");
        }
    
        /**
         * Called when new message is available.
         */
        public void handleDelivery(String arg0, Envelope arg1, com.rabbitmq.client.AMQP.BasicProperties arg2, byte[] body)
                throws IOException {
            Map map = (HashMap) SerializationUtils.deserialize(body);
            System.out.println(map.get("message number"));
    
        }
    
        public void handleCancel(String consumerTag) {
        }
    
        public void handleCancelOk(String consumerTag) {
        }
    
        public void handleRecoverOk(String consumerTag) {
        }
    
        public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {
        }
    
    }
    View Code

    4.生产者测试类

    package com.pccw.rabbitmq;
    
    import java.util.HashMap;
    
    public class MainProducer {
        public MainProducer() throws Exception{
          
            Producer producer = new Producer("queue");
            
            for (int i = 0; i < 1; i++) {
                HashMap message = new HashMap();
                message.put("message number", "中国"+i);
                producer.sendMessage(message);
                System.out.println("Message Number "+ i +" sent.");
            }
            producer.close();
        }
        
        public static void main(String[] args) throws Exception{
          new MainProducer();
        }
    }
    View Code

    5.消费者测试类

    package com.pccw.rabbitmq;
    
    public class MainConsumer {
        public MainConsumer() throws Exception{
            
            QueueConsumer consumer = new QueueConsumer("queue");
            Thread consumerThread = new Thread(consumer);
            consumerThread.start();
        }
      
        public static void main(String[] args) throws Exception{
          new MainConsumer();
        }
    }
    View Code
  • 相关阅读:
    前端-html/css
    数据结构-python
    接口测试-并发处理
    接口测试-高级运用
    接口测试-模拟网络请求
    接口测试-基础
    Jenkins-基础
    appium安装及环境搭建、入门
    Week12-unittest单元测试
    Redis在windows下安装与配置
  • 原文地址:https://www.cnblogs.com/rigid/p/7380693.html
Copyright © 2011-2022 走看看