zoukankan      html  css  js  c++  java
  • ActiveMQ监听消息并进行转发,监听不同的mq服务器和不同的队列

    工作中刚接触mq消息业务,其实也就是监听一下别的项目发送的消息然后进行对应的转发,但是监听的mq会有多个,而且转发的地址也可能有多个,这里就使用spring集成的方式!记录一下实现方式:

    监听多个mq配置,主要还是在xml或者配置类里进行配置多个,这里以两个为例:

    properties文件中配置好多个mq的tcp地址,

    <!-- mq配置 -->
    		<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    			<property name="brokerURL" value="${amq.tpl.server}" />
    		</bean>
    		<bean id="connectionFactory"
    			class="org.springframework.jms.connection.SingleConnectionFactory">
    			<property name="targetConnectionFactory" ref="targetConnectionFactory" />
    		</bean>
    		<!-- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    			<property name="connectionFactory" ref="connectionFactory" />
    		</bean> -->
    		<!-- 监听的消息队列 -->
    		<bean id="wechatQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    			<constructor-arg>
    				<value>templateQueue</value>
    			</constructor-arg>
    			<!--可继续配置多个队列 -->
    		</bean>
    		<!-- 消息监听器配置,引用制定的mq服务器与监听队列->
    		<bean id="templateMessageListener" class="com.zhuzher.amq.listener.TemplateMessageListener"/> 
    		<bean id="templateMessageContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
    		    <property name="connectionFactory" ref="connectionFactory" />  
    		    <property name="destination" ref="wechatQueueDestination" />  
    		    <property name="messageListener" ref="templateMessageListener" />  
    		</bean>
    

      多个,mq就切换不同的地址,配置不同的连接工厂就可以了,然后再配置监听器!

    然后就是消息转发了,这里采用httpclient调用接口方式实现,然后将地址配置在数据库中,达到高可扩展的目的,为了提高性能还可以在项目启动的时候把地址加载的内存中,取地址就从内存中获取,并提供一个刷新的接口即可!

    这里直接贴上内存类的代码:

    //存储转发地址
    public class ForwardAddressHelper {
    	private static Logger log=Logger.getLogger(ForwardAddressHelper.class);
    	@Autowired PmsForwardService pmsForwardService;
    	//存储转发地址
    	private static List<PmsForwardAddress> address = new ArrayList<>();
    	private static ForwardAddressHelper forwardAddressHelper=null;//单例
    	//私有化构造函数
    	private  ForwardAddressHelper(){}
    	public static ForwardAddressHelper getInstance() {
            if (forwardAddressHelper == null) {
                synchronized (ForwardAddressHelper.class) {
                    if (forwardAddressHelper == null) {
                    	forwardAddressHelper = new ForwardAddressHelper();
                    }
                }
            }
            return forwardAddressHelper;
        }
    	/**
    	 * 初始化转发地址
    	 */
    	public void init(){
    		if(ForwardAddressHelper.address.size()==0){
    			System.out.println("----------------初始化成功----------------");
    			initAddress();
    		}
    	}
    	/**
    	 * 重载转发地址
    	 */
    	public void reLoad() {
    		ForwardAddressHelper.address.clear();
    		init();
    	}
    	/**
    	 * 初始化转发地址数据
    	 */
    	private void initAddress(){
    		log.info("--------------转发地址初始化-----------");
    		ForwardAddressHelper.address.addAll(pmsForwardService.queryAddress());
    	}
    	/**
    	 * 获取所有转发地址
    	 */
    	public static List<PmsForwardAddress> getAddress(){
    		return ForwardAddressHelper.address;
    	}
    }
    

      ,然后只需要在spring容器启动的时候调用这个init方法就可以了,这里有两种,一种是监听器方式,还有一种是xml配置,这里我就直接使用xml了:

    lazy-init="false" :表示容器加载立即执行
    <bean id="forwardAddressHelper" lazy-init="false" class="com.helper.ForwardAddressHelper" init-method="init"/>

    ,然后就是写消息监听类了,实现具体业务,由于已经配置了监听器,所以直接写就行,这里直接上代码,具体业务就是用httpclient调一遍接口,消息内容是接口的参数:
    public class PmsMessageListener implements MessageListener {
          
    	static Logger log=Logger.getLogger(PmsMessageListener.class);
    	static final Gson GSON = new Gson();
    	@Autowired ForwardAddressHelper forwardAddressHelper;
    	@Override
    	public void onMessage(Message message) {
    		log.debug("监听器接收到消息:"+message);
    		if(null == message || !(message instanceof TextMessage))return;
    		TextMessage textMessage = (TextMessage) message;
    		String text = null;
    		try {
    			text = textMessage.getText();log.debug("message:"+textMessage.getText());
    		} catch (JMSException e) { e.printStackTrace(); }
    		if(StringUtil.isBlank(text))return;
    		Map<String, String> messageMap = GSON.fromJson(text, new TypeToken<Map<String, String>>(){}.getType());
    		pmsForward(messageMap);
    	}
    	//消息转发-获取参数中对应参数调用对应接口
    	public void pmsForward(Map<String, String> map){
    		List<PmsForwardAddress> address = forwardAddressHelper.getAddress();//从内存获取转发地址
    		//封装参数
    		List<NameValuePair> params = new ArrayList<NameValuePair>();
    		Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
    		while(iterator.hasNext()){
    			params.add(new BasicNameValuePair(iterator.next().getKey(),iterator.next().getValue()));
    		}
    		address.forEach(x->{
    			CloseableHttpResponse response = null;
    			CloseableHttpClient httpClient = HttpClients.createDefault();
    			try {
    				URIBuilder builder = new URIBuilder(x.getAddress());
    			    builder.setParameters(params);
    			    HttpGet get = new HttpGet(builder.build());
    			    response = httpClient.execute(get);
    			    if(response != null && response.getStatusLine().getStatusCode() == 200)log.info("消息转发成功");
    			} catch (Exception e) {e.printStackTrace();log.error("消息转发失败");
    			} finally {
    	            try { httpClient.close();if(response != null)response.close();
    	            } catch (IOException e) {e.printStackTrace();}
    			}
    		});
    	}
    
    }
    

      

    PmsMessageListener 这个类配置在了xml文件中,会监听我们指定的mq的消息队列,只要有消息来就会取数据库里配置的接口一一调用!
  • 相关阅读:
    我理解的优秀软件工程师
    Hello 博客园!
    线程安全与可重入函数之间的区别与联系
    linux-粘滞位的使用
    死锁产生的四个必要条件及处理死锁的策略
    数据结构—位图
    Linux下进度条的简单实现
    Linux-find命令
    Linux文件3个时间点(access time,modify time,change time)
    各种排序算法的实现、总结
  • 原文地址:https://www.cnblogs.com/houzheng/p/9686815.html
Copyright © 2011-2022 走看看