zoukankan      html  css  js  c++  java
  • rabbitmq 使用

    发消息

    <task:executor id="baseSiteToOmsTaskExecutor" pool-size="4-256" queue-capacity="128" />
    <bean id="baseSiteToOmsRabbitConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="host" value="${rabbitmq.baseSiteToOms.host}" />
    <property name="port" value="${rabbitmq.baseSiteToOms.port}" />
    <property name="username" value="${rabbitmq.baseSiteToOms.username}" />
    <property name="password" value="${rabbitmq.baseSiteToOms.password}" />
    <property name="virtualHost" value="${rabbitmq.baseSiteToOms.VirtualHost}" />
    <!-- <property name="connectionTimeout" value="${rabbitmq.connection.timeout}" /> -->
    </bean>
    <bean id="baseSiteToOmsRabbitConnFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg ref="baseSiteToOmsRabbitConnectionFactory" />
    <property name="channelCacheSize" value="25" />
    <property name="executor" ref="baseSiteToOmsTaskExecutor" />
    </bean>
    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="baseSiteToOmsRabbitConnFactory" />

    <!-- 创建rabbitTemplate 消息模板类 -->
    <bean id="baseSiteToOmsRabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="baseSiteToOmsRabbitConnFactory"></constructor-arg>
    <property name="exchange" value="${rabbitmq.baseSiteToOms.ExchangeName}" />
    </bean>
    <!--定义queue -->
    <rabbit:queue name="${rabbitmq.baseSiteToOms.QueueName}" durable="${rabbitmq.baseSiteToOms.QueueDurable}"
    auto-delete="false" exclusive="false" />


    <!-- 定义direct exchange,绑定queueTest -->
    <rabbit:fanout-exchange name="${rabbitmq.baseSiteToOms.ExchangeName}"
    durable="${rabbitmq.baseSiteToOms.ExchangeDurable}" auto-delete="false">
    <rabbit:bindings>
    <!-- 网点同步 -->
    <rabbit:binding queue="${rabbitmq.baseSiteToOms.QueueName}"></rabbit:binding>
    </rabbit:bindings>
    </rabbit:fanout-exchange>

    @Service("baseSiteRabitToOmsSender")
    public class BaseSiteRabitToOmsSender extends MessageProducer{

    @Resource(name = "baseSiteToOmsRabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    private final Logger log = LoggerFactory.getLogger(this.getClass());


    @Override
    public DetailResult sendByQueue(Object message, String queue) {
    this.rabbitTemplate.setRoutingKey(queue);
    String messageJson = JSONObject.toJSONString(message);

    try {
    this.rabbitTemplate.convertAndSend(messageJson);
    this.log.debug("send message " + messageJson, "OTHER");
    } catch (RuntimeException var7) {
    var7.printStackTrace();
    this.log.error("send failed ", "EXCEPTION", var7);

    try {
    this.rabbitTemplate.convertAndSend(message);
    } catch (RuntimeException var6) {
    var6.printStackTrace();
    this.log.error("send failed again", "EXCEPTION", var6);
    return new DetailResult(false, var6.toString());
    }
    }

    return new DetailResult(true, "");
    }
    }

    接收消息
    <task:executor id="taskExecutor" pool-size="4-256" queue-capacity="128" />  
    <bean id="rabbitConnectionFactory" class="com.rabbitmq.client.ConnectionFactory">
    <property name="host" value="${rabbitmq.host}" />
    <property name="port" value="${rabbitmq.port}" />
    <property name="username" value="${rabbitmq.username}" />
    <property name="password" value="${rabbitmq.password}" />
    <property name="virtualHost" value="${rabbitmq.vhost}" />
    <!-- <property name="connectionTimeout" value="${rabbitmq.connection.timeout}" /> -->
    </bean>
    <bean id="rabbitConnFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <constructor-arg ref="rabbitConnectionFactory" />
    <property name="channelCacheSize" value="25" />
    <property name="executor" ref="taskExecutor" />
    </bean>
    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="rabbitConnFactory" />
    <!-- 创建rabbitTemplate 消息模板类  -->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <constructor-arg ref="rabbitConnFactory"></constructor-arg>
    <property name="exchange" value="${rabbitmq.ReciveExchangeName}" />
    </bean>
    <!-- queue -->
    <rabbit:queue name="${rabbitmq.ReciveQueueName}" durable="false" auto-delete="false" exclusive="false" />

    <!-- 消息接收者 -->
    <bean id="messageConsumer" class="com.ycgwl.rosefinch.module.basedev.server.customer.MessageConsumer"></bean>

    <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->

    <rabbit:listener-container connection-factory="rabbitConnFactory" concurrency="5">
    <rabbit:listener queues="${rabbitmq.ReciveQueueName}" ref="messageConsumer"/>
    </rabbit:listener-container>
  • 相关阅读:
    3、Nginx负载均衡实现的策略
    2、Nginx 是如何实现并发的?为什么 Nginx 不使用多线程?Nginx常见的优化手段有哪些?502错误可能原因有哪些?
    1、HTTP 的负载均衡?Nginx负载均衡
    用 Python 手写十大经典排序算法
    处理TypeError: testFunc() missing 1 required positional argument: 'self' -- 没有实例化对象的错误
    Socket技术详解
    MAC终端常用命令
    接口自动化测试框架 -- reudom
    如何在Pypi发布上传你自己的Python库
    Docker数据目录迁移解决方案
  • 原文地址:https://www.cnblogs.com/zfzf1/p/7278532.html
Copyright © 2011-2022 走看看