zoukankan      html  css  js  c++  java
  • RabbitMQ 入门指南(Java)

    RabbitMQ是一个受欢迎的消息代理,通常用于应用程序之间或者程序的不同组件之间通过消息来进行集成。本文简单介绍了如何使用 RabbitMQ,假定你已经配置好了rabbitmq服务器。

    RabbitMQ是用Erlang,对于主要的编程语言都有驱动或者客户端。我们这里要用的是Java,所以先要获得Java客户端。。下面是Java客户端的maven依赖的配置。

    <dependency>

            <groupId>com.rabbitmq</groupId>

            <artifactId>amqp-client</artifactId>

            <version>3.0.4</version>

    </dependency>

    RabbitMQ这样的消息代理可用来模拟不同的场景,例如点对点的消息分发或者订阅/推送。我们的程序足够简单,有两个基本的组件,一个生产者用于产生消息,还有一个消费者用来使用产生的消息。

    在这个例子里,生产者会产生大量的消息,每个消息带有一个序列号,另一个线程中的消费者会使用这些消息。

    抽象类EndPoint:

    我们首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者, 连接队列的代码都是一样的,这样可以通用一些

    package co.syntx.examples.rabbitmq;

    import java.io.IOException;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.ConnectionFactory;

    /**

     * Represents a connection with a queue

     * @author syntx

     *

     */

    public abstract class EndPoint{

         

        protected Channel channel;

        protected Connection connection;

        protected String endPointName;

         

        public EndPoint(String endpointName) throws IOException{

             this.endPointName = endpointName;

             

             //Create a connection factory

             ConnectionFactory factory = new ConnectionFactory();

             

             //hostname of your rabbitmq server

             factory.setHost("localhost");

             

             //getting a connection

             connection = factory.newConnection();

             

             //creating a channel

             channel = connection.createChannel();

             

             //declaring a queue for this channel. If queue does not exist,

             //it will be created on the server.

             channel.queueDeclare(endpointName, false, false, false, null);

        }

         

         

        /**

         * 关闭channelconnection。并非必须,因为隐含是自动调用的。

         * @throws IOException

         */

         public void close() throws IOException{

             this.channel.close();

             this.connection.close();

         }

    }

    生产者:

    生产者类的任务是向队列里写一条消息。我们使用Apache Commons Lang把可序列化的Java对象转换成 byte 数组。commons langmaven依赖如下:

    <dependency>

    <groupId>commons-lang</groupId>

    <artifactId>commons-lang</artifactId>

    <version>2.6</version>

    </dependency>

    package co.syntx.examples.rabbitmq;

    import java.io.IOException;

    import java.io.Serializable;

    import org.apache.commons.lang.SerializationUtils;

    /**

     * The producer endpoint that writes to the queue.

     * @author syntx

     *

     */

    public class Producer extends EndPoint{

         

        public Producer(String endPointName) throws IOException{

            super(endPointName);

        }

        public void sendMessage(Serializable object) throws IOException {

            channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));

        }  

    }

    消费者:

    消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

    package co.syntx.examples.rabbitmq;

    import java.io.IOException;

    import java.util.HashMap;

    import java.util.Map;

    import org.apache.commons.lang.SerializationUtils;

    import com.rabbitmq.client.AMQP.BasicProperties;

    import com.rabbitmq.client.Consumer;

    import com.rabbitmq.client.Envelope;

    import com.rabbitmq.client.ShutdownSignalException;

    /**

     * 读取队列的程序端,实现了Runnable接口。

     * @author syntx

     *

     */

    public class QueueConsumer extends EndPoint implements Runnable, Consumer{

         

        public QueueConsumer(String endPointName) throws IOException{

            super(endPointName);       

        }

         

        public void run() {

            try {

                //start consuming messages. Auto acknowledge messages.

                channel.basicConsume(endPointName, true,this);

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

        /**

         * Called when consumer is registered.

         */

        public void handleConsumeOk(String consumerTag) {

            System.out.println("Consumer "+consumerTag +" registered");    

        }

        /**

         * Called when new message is available.

         */

        public void handleDelivery(String consumerTag, Envelope env,

                BasicProperties props, byte[] body) throws IOException {

            Map map = (HashMap)SerializationUtils.deserialize(body);

            System.out.println("Message Number "+ map.get("message number") + " received.");

             

        }

        public void handleCancel(String consumerTag) {}

        public void handleCancelOk(String consumerTag) {}

        public void handleRecoverOk(String consumerTag) {}

        public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}

    }

    Putting it together:

    在下面的测试类中,先运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走。

    package co.syntx.examples.rabbitmq;

    import java.io.IOException;

    import java.sql.SQLException;

    import java.util.HashMap;

    public class Main {

        public Main() throws Exception{

             

            QueueConsumer consumer = new QueueConsumer("queue");

            Thread consumerThread = new Thread(consumer);

            consumerThread.start();

             

            Producer producer = new Producer("queue");

             

            for (int i = 0; i < 100000; i++) {

                HashMap message = new HashMap();

                message.put("message number", i);

                producer.sendMessage(message);

                System.out.println("Message Number "+ i +" sent.");

            }

        }

         

        /**

         * @param args

         * @throws SQLException

         * @throws IOException

         */

        public static void main(String[] args) throws Exception{

          new Main();

        }

    }

  • 相关阅读:
    浅析如何保证vuex中的state动态添加属性的响应式及解决deep watch / computed监听vuex state对象属性变化不生效的问题
    浅析security遇到java.lang.IllegalArgumentException:Cannot pass null or empty values to constructor问题处理
    Java AES加解密报错javax.crypto.IllegalBlockSizeException: Input length must be multiple of 16 when decrypting with padded cipher的问题原因及2种解决方式
    浅析Vue3中如何通过vmodel实现父子组件的双向数据绑定及利用computed简化父子组件双向绑定
    浅析pdfbox将pdf文件转图片报错Cannot read JPEG2000 image的问题及JPEG与JPEG2000介绍了解
    浅析Vue3中vuex的基本使用、模块化及如何使用mapState/mapGetters和mapActions
    浅析Object.assign()基本用法(对象合并、同名属性覆盖、仅1个参数时直接返回、target不是对象会转成对象、源对象位置为非对象时不同的处理规则字符串的特殊情况、拷贝的属性限制)及需要注意的点(浅拷贝、同名属性替换、数组的处理把索引当属性替换、取值函数先取值再拷贝)和常见应用(给对象添加属性、合并多个对象、给属性设置默认值)
    浅析setup如何通过ref获取子组件实例中的DOM结构/数据/方法及获取子组件实例数据都是空的处理(defineExpose API 的使用)、Vue3模板引用refs、在组合式API中使用template refs、for循环中如何获取及重置refs、如何监听模板引用
    解决uniapp ios播放本地视频不显示controls的问题、uniapp video开始播放如何设置默认全屏
    Deep learning:三十四(用NN实现数据的降维)
  • 原文地址:https://www.cnblogs.com/quxiuke/p/6131753.html
Copyright © 2011-2022 走看看