zoukankan      html  css  js  c++  java
  • RabbitMQ术语

    工作队列:Working Queue

    分配:多个客户端接收同一个Queue,如何做负载均衡(分配)。

        Round-robin分配:多个接收端接收同一个Queue时,采用了Round-robin分配算法,即伦叫调度-依次分配给各个接收方。

    消息确认:

        默认开启了消息确认(接收方接收到消息后,立即向服务器发回确认)。消息接收方处理完消息后,向服务器发送消息确认,服务器在删除该消息。对于耗时的work/task可以先关闭自动消息确认,在work/task完成后,再手动发回确认。

        channel.basicConsume("hello", false /*关闭自动消息确认*/, consumer);

        /*work/task完成之后*/

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    持久化:

        Server端的Queue持久化:

            Tip:如果已经声明了同名非持久化的Queue,再次声明无效。

            发送方和接收方都需要指定该参数。

            boolean durable = true;

            channel.queueDeclare("task_queue", durable, false, false, null);

        Message持久化:

            channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    负载均衡:

        为了解决各个接收端工作量相差太大的问题(有的一直busy,有的空闲比较多),突破Round-robin。最多为当前接收方发送一条消息,如果接收方还未处理完这条消息(也即,还没有发回确认),就不要再给他分配消息了,应该把当前消息分配给其他空闲接收方。

        int prefetchCount = 1;

        channel.basicQos(prefetchCount);

    路由(Routing):

        固定关键词路由(direct):

            使用类型为direct的exchange,发送具有RoutingKey标签的消息给订阅该关键词的Queue。

            场景示例:消息发送方发送了类型为[error][info]的两种消息,写磁盘的消息接受者只接受error类型的消息,Console打印两者。

            发送方:

                ConnectionFactory factory = new ConnectionFactory();

                factory.setHost("localhost");

                Connection connection = factory.newConnection();

                Channel channel = connection.createChannel();

                cahannel.exchangeDeclare(EXCHANGE_NAME, "direct" /*exchange类型为direct*/);

                channel.basiPublish(EXCHANGE_NAME, "info" /*RouingKey为info*/, null, message.getBytes);

                channel.close();

                connection.close();

            接收方:

                ConnectionFactory factory = new ConnectionFactory();

                factory.setHost("localhost");

                Connection connection = factory.newConnection();

                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "direct" /*exchange类型为direct*/);

                //创建匿名Queue

                String queueName = channel.queueDeclare().getQueue();

                //订阅某个关键词,绑定到匿名Queue中

                channel.queueBind(queueName, EXCHANGE_NAME, "error")

                channel.queueBind(queueName, EXCHANGE_NAME, "error");

                

                QueueingConsumer consumer = new QueueingCounsumer(channel);

                channel.basicConsume(queueName, true, consumer);

                Consumer.Delivery delivery = new consumer.nextDelivery();

                String message = new String(delivery.getBody());

                String routingKey = delivery.getEnvelope();

        关键词模式路由(topic):

            这种模式可以看做对Routing的扩展。Routing只能使用固定关键词,而Topics模式可以订阅模糊关键词。     关键词必须是一组word,由点号分隔。例               如"xxx.yyy.zzz",限定255 bytes。

            *表示一个word;

            #表示0个或者多个word; 

            发送方:

                ConnectionFactory factory = new ConnectionFactory();

                factory.setHost("localhost");

                Connection connection = factory.newConnection();

                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "topic" /*exchange类型*/);

                channel.basiPublish(EXCHANGE_NAME, "xxx.yyy" /*RouingKey*/, null, message.getBytes);

                channel.close();

                connection.close();

            接收方:

                ConnectionFactory factory = new ConnectionFactory();

                factory.setHost("localhost");

                Connection connection = factory.newConnection();

                Channel channel = connection.createChannel();

                channel.exchangeDeclare(EXCHANGE_NAME, "topic" /*exchange类型为topic*/);

                //创建匿名Queue

                String queueName = channel.queueDeclare().getQueue();

                //订阅某个关键词,绑定到匿名Queue中

                channel.queueBind(queueName, EXCHANGE_NAME, "*.yyy")

                QueueingConsumer consumer = new QueueingCounsumer(channel);

                channel.basicConsume(queueName, true, consumer);

                QueueingConsumer.Delivery delivery = consumer.nextDelivery();

                String message = new String(delivery.getBody());

                String routingKey = delivery.getEnvelope(); //可获取路由关键词

        

  • 相关阅读:
    iOS序列化与反序列化
    iOS官方文档阅读 基本格式指北
    iOS实现地图半翻页效果--老代码备用参考
    AVQueuePlayer
    ios音频视频资料--备用
    ios coredata 老代码备用参考
    uitableviewcell 自适应大小 参考
    触发UIButton长按事件
    gsoap 学习 1-由wsdl文件生成h头文件
    gsoap简介
  • 原文地址:https://www.cnblogs.com/man-li/p/4193202.html
Copyright © 2011-2022 走看看