zoukankan      html  css  js  c++  java
  • 初识rabbitMQ(一)

    19/5/29  对于rabbitMQ ,我已经研究了几天。 之前完全的没有接触过,所以有很多的概念,很多的坑要踩

    首先是安装 rabbitmq 这个就不记录了。

    1、引入 Maven

    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-amqp</artifactId>
    <version>2.0.3.RELEASE</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.0.3.RELEASE</version>
    </dependency>



    2、配置 ,写配置文件
    <!--步骤1、配置链接工厂-->
    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="host" value="${mq.address}"/>
    <property name="port" value="${mq.port}"/>
    <property name="password" value="${mq.pwd}"/>
    <property name="username" value="${mq.user}"/>
    <property name="publisherConfirms" value="true"/>
    <property name="publisherReturns" value="true"/>
    <property name="virtualHost" value="${mq.vhost}"/>
    <property name="requestedHeartBeat" value="50"/>
    </bean>
    <!--步骤2、创建rabbitTemplate 消息模板-->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <!--构造方法需要链接信息-->
    <constructor-arg ref="connectionFactory"/>
    <!--配置交换机-->
    <property name="exchange" value="${mq.exchange}"/>
    <!--配置路由键-->
    <property name="routingKey" value="${mq.routingKey}"/>
    <!--配置队列-->
    <property name="queue" value="${mq.queue}"/>
    <!--配置消息转换-->
    <property name="messageConverter" ref="serializerMessageConverter"/>
    <property name="confirmCallback" ref="rabbitTemplateConfig" />
    <property name="returnCallback" ref="rabbitTemplateConfig" />
    <property name="mandatory" value="true" />
    </bean>
    <bean id="rabbitTemplateConfig" class="mq.RabbitTemplateConfig"/>
    <!--注入消息转换器-->
    <bean id="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/>
    <!--引入元素文件-->
    <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="properties">
    <bean class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="locations">
    <list>
    <value>classpath:conf/value.properties</value>
    </list>
    </property>
    <property name="fileEncoding" value="UTF-8"/>
    </bean>
    </property>
    </bean>
    <!--申明消费者-->
    <bean id="rmqConsumer" class="mq.RmqConsumer" />
    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="rmqConsumer" />
    <property name="defaultListenerMethod" value="rmqConsumeMessage"/>
    <property name="messageConverter" ref="serializerMessageConverter"/>
    </bean>
    <!--注册监听-->
    <bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>
    <property name="messageListener" ref="messageListenerAdapter"/>
    <property name="acknowledgeMode" value="MANUAL"/>
    </bean>

    这个是我关于rabbitMQ 所用的配置,下面记录一下具体的作用。
    (1、)配置链接
      通过配置链接工厂从而链接到rabbitMQ
    <!--步骤1、配置链接工厂-->
    <bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
    <property name="host" value="${mq.address}"/>//链接的地址 127.0.0.1
    <property name="port" value="${mq.port}"/>//端口号 5627
    <property name="password" value="${mq.pwd}"/> //密码
    <property name="username" value="${mq.user}"/> //用户名
    <property name="publisherConfirms" value="true"/> //是否开启提交到交换机的回调
    <property name="publisherReturns" value="true"/> //是否开启发送到队列的错误回调
    <property name="virtualHost" value="${mq.vhost}"/>// 虚拟机
    <property name="requestedHeartBeat" value="50"/>//心跳时间(这个可删除,我不知道有什么用,以后有领悟再记录)
    </bean>

    属性文件中的内容
    mq.address=127.0.0.1
    mq.exchange=ceshi
    mq.routingKey=ceshiRouting
    mq.queue=ceshiQueues
    mq.port=5672
    mq.user=***
    mq.pwd=t**an****
    mq.timeout=5000
    mq.vhost=testMQ

    关于开启 ConfirmReturn 的回调 还需要在模板 rabbitTemplate 中进行设置

    <!--步骤2、创建rabbitTemplate 消息模板-->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
    <!--构造方法需要链接信息-->
    <constructor-arg ref="connectionFactory"/>
    <!--配置交换机-->
    <property name="exchange" value="${mq.exchange}"/>
    <!--配置路由键-->
    <property name="routingKey" value="${mq.routingKey}"/>
    <!--配置队列-->
    <property name="queue" value="${mq.queue}"/>
    <!--配置消息转换-->
    <property name="messageConverter" ref="serializerMessageConverter"/>
    <property name="confirmCallback" ref="rabbitTemplateConfig" />
    <property name="returnCallback" ref="rabbitTemplateConfig" />
    <property name="mandatory" value="true" />
    </bean>
    注册模板类的bean 类 org.springframework.amqp.rabbit.core.RabbitTemplate  
    在其构造方法中传入链接工厂的引用, 如上 代码  重点看 下面这几行配置 
        <property name="confirmCallback" ref="rabbitTemplateConfig" />
    <property name="returnCallback" ref="rabbitTemplateConfig" />
    <property name="mandatory" value="true" />
    这个就是上面提到的 回调,<property name="mandatory" value="true" />  这个是一定要的 ,删除了会导致 returnCallback 不起效 ,下面贴上实现类代码 

    package mq;

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;

    /**
    * @author tia
    * @date 2019/5/2910:45
    */
    public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    /**
    * 是否成功发送到交换器
    * 成功、失败都会回调
    * @param correlationData
    * @param b
    * @param s
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
    System.out.println("消息唯一标识:"+correlationData);
    System.out.println("确认结果:"+b);
    System.out.println("失败原因:"+s);
    }

    /**
    * 是否成功发送到队列(需要设置mandatory 为true)
    * 失败回调
    * @param message
    * @param i
    * @param s
    * @param s1
    * @param s2
    */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    System.out.println("消息主体:"+message);
    System.out.println("消息主体:"+i);
    System.out.println("描述:"+s);
    System.out.println("交换器:"+s1);
    System.out.println("路由键:"+s2);
    }
    }
    偷了个懒,把两个回调放在了一起 implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback  这两个实现是一定要的。
    这两个方法的作用,就是对消息进行重新发送,或是记录没有发送出去的消息,等等,看个人安排了。

    在我的配置中是没有关于 队列的创建,交换器的创建,虚拟机的创建、绑定等的内容, 这些都在RabbitMQ 的后台完成了 图个简单。

    到这里,就可以向mq发送消息了。我写的一个例子:
    package mq;

    import com.alibaba.fastjson.JSON;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import po.Message;

    import javax.annotation.Resource;
    import java.io.UnsupportedEncodingException;
    import java.util.Date;

    @RestController
    @RequestMapping(value = "/mq",produces = "text/html;charset=UTF-8")
    public class RmqProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmqProducer.class);
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
    *
    发送信息
    */
    public void sendMessage(String queueKey,Object msg) {
    try {
    // 发送信息
    rabbitTemplate.convertAndSend(queueKey,"1");
    } catch (Exception e) {
    LOGGER.error("rmq消费者任务处理出现异常", e);
    }
    }
    @RequestMapping("/sendMessage")
    public void sendActiveCount(String activeMap) throws UnsupportedEncodingException {
    Message message=new Message();
    message.setFrom(1234566l);
    message.setTo(754964641l);
    message.setText("你妹妹好漂亮");
    message.setDate(new Date());
    message.setFromName("你妹妹");
    String s = JSON.toJSONString(message);
    for (int i = 0; i <100 ; i++) {
    rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i));
    }

    }
    }
    主要的内容就是这个方法 rabbitTemplate.convertAndSend("ceshi","ceshi1Routing",s,new CorrelationData("你妹妹"+i)); 哪儿都能发送。

    再看看 消费者 怎么弄,可是花了我大量的时间 去弄这个。

    <!--申明消费者-->
    <bean id="rmqConsumer" class="mq.RmqConsumer" />
    <bean id="messageListenerAdapter" class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
    <constructor-arg ref="rmqConsumer" />
    <property name="defaultListenerMethod" value="rmqConsumeMessage"/>
    <property name="messageConverter" ref="serializerMessageConverter"/>
    </bean>
    <!--注册监听-->
    <bean id="listener" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="queueNames" value="ceshiQueues,ceshi1,ceshi2"/>
    <property name="messageListener" ref="messageListenerAdapter"/>
    <property name="acknowledgeMode" value="MANUAL"/>
    </bean>
    这个监听是一定要有的,或许你可以使用注解来干掉他。
    看到这个了吗? <property name="defaultListenerMethod" value="rmqConsumeMessage"/> 这个东西就是说默认去执行你 <constructor-arg ref="rmqConsumer" /> 这个类的 这个 方法的。不过也有其他的弊端就是 通道的问题
    还有就是 如果实现了 implements ChannelAwareMessageListener 就不起效了。
    看代码:
    package mq;


    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.rabbitmq.client.Channel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;;


    public class RmqConsumer implements ChannelAwareMessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(RmqConsumer.class);
    int i=0;
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
    try{
    Object ddd=null;
    JSONObject jsonObject=JSONObject.parseObject(new String(message.getBody(),"utf-8"));
    po.Message message1 = JSON.toJavaObject(jsonObject, po.Message.class);
    System.out.println(message1.toString());
    if(i++%10==0)
    System.out.println(ddd.toString());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }catch (Exception e){
    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
    channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody());
    System.out.println(e.getMessage());
    }
    }
    }

    这个里面没有配置里提到的方法,他被我吃了。因为他不生效了。
    再说这个通道的问题 channel ,我这儿 消费者方法是不能抛出错误的,会停掉,所以只能处理, <property name="acknowledgeMode" value="MANUAL"/> 者个配置是在配置是否手动确认的。
    MANUAL 手动确认 AUTO 自动确认(默认值) 如果开启自动确认,那么 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 
    将会 报错,也就是说,他不需要手动确认的代码存在。它会默认所有的方法都进行 成功确认,这个真的很无奈。
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); 成功确认 他有两个参数 ,消息Tag 与是否批量确认。如果true 则批量确认 tag值小于该值的所有信息将被成功确认。
    message.getMessageProperties().getDeliveryTag() 消息的Tag
    如果你开启了手动确认,但并没有确认,那么你的消息就会处于未确认状态,就像这样 Unacked 100 ,Total 100, 发送100条消息,都没有确认。那rabbitMQ不会把它删除,一直堆积在内存中,后果,就看你怎么处理了.....
     
    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 失败确认 这个方法有三个参数。 消息Tag 、是否批量确认、和 是否重新回到队列。前两个参数跟 成功确认相同, 最后一个如果为true 将重新回到队列顶端
    注意 是队列顶端,下一次消费者就会调用返回队列的消息。如果这条消息有错误,那就意味着,程序会一直进行 失败确认 返回队列 ,死循环 。
    所以 看这个 channel.basicPublish("ceshi","ceshi1Routing",true,true,null ,message.getBody()); 发送消息,它会把消息发送到队列的末尾,这样最后执行,就可以避免不消费其他正确的消息了。
     





  • 相关阅读:
    Sql Server 2008卸载后再次安装一直报错
    listbox 报错 Cannot have multiple items selected when the SelectionMode is Single.
    Sql Server 2008修改Sa密码
    学习正则表达式
    Sql Server 查询第30条数据到第40条记录数
    Sql Server 复制表
    Sql 常见面试题
    Sql Server 简单查询 异步服务器更新语句
    jQuery stop()用法以及案例展示
    CSS3打造不断旋转的CD封面
  • 原文地址:https://www.cnblogs.com/hxz-nl/p/10945613.html
Copyright © 2011-2022 走看看