工作中刚接触mq消息业务,其实也就是监听一下别的项目发送的消息然后进行对应的转发,但是监听的mq会有多个,而且转发的地址也可能有多个,这里就使用spring集成的方式!记录一下实现方式:
监听多个mq配置,主要还是在xml或者配置类里进行配置多个,这里以两个为例:
properties文件中配置好多个mq的tcp地址,
<!-- mq配置 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${amq.tpl.server}" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> </bean> --> <!-- 监听的消息队列 --> <bean id="wechatQueueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>templateQueue</value> </constructor-arg> <!--可继续配置多个队列 --> </bean> <!-- 消息监听器配置,引用制定的mq服务器与监听队列-> <bean id="templateMessageListener" class="com.zhuzher.amq.listener.TemplateMessageListener"/> <bean id="templateMessageContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="wechatQueueDestination" /> <property name="messageListener" ref="templateMessageListener" /> </bean>
多个,mq就切换不同的地址,配置不同的连接工厂就可以了,然后再配置监听器!
然后就是消息转发了,这里采用httpclient调用接口方式实现,然后将地址配置在数据库中,达到高可扩展的目的,为了提高性能还可以在项目启动的时候把地址加载的内存中,取地址就从内存中获取,并提供一个刷新的接口即可!
这里直接贴上内存类的代码:
//存储转发地址 public class ForwardAddressHelper { private static Logger log=Logger.getLogger(ForwardAddressHelper.class); @Autowired PmsForwardService pmsForwardService; //存储转发地址 private static List<PmsForwardAddress> address = new ArrayList<>(); private static ForwardAddressHelper forwardAddressHelper=null;//单例 //私有化构造函数 private ForwardAddressHelper(){} public static ForwardAddressHelper getInstance() { if (forwardAddressHelper == null) { synchronized (ForwardAddressHelper.class) { if (forwardAddressHelper == null) { forwardAddressHelper = new ForwardAddressHelper(); } } } return forwardAddressHelper; } /** * 初始化转发地址 */ public void init(){ if(ForwardAddressHelper.address.size()==0){ System.out.println("----------------初始化成功----------------"); initAddress(); } } /** * 重载转发地址 */ public void reLoad() { ForwardAddressHelper.address.clear(); init(); } /** * 初始化转发地址数据 */ private void initAddress(){ log.info("--------------转发地址初始化-----------"); ForwardAddressHelper.address.addAll(pmsForwardService.queryAddress()); } /** * 获取所有转发地址 */ public static List<PmsForwardAddress> getAddress(){ return ForwardAddressHelper.address; } }
,然后只需要在spring容器启动的时候调用这个init方法就可以了,这里有两种,一种是监听器方式,还有一种是xml配置,这里我就直接使用xml了:
lazy-init="false" :表示容器加载立即执行
<bean id="forwardAddressHelper" lazy-init="false" class="com.helper.ForwardAddressHelper" init-method="init"/>
,然后就是写消息监听类了,实现具体业务,由于已经配置了监听器,所以直接写就行,这里直接上代码,具体业务就是用httpclient调一遍接口,消息内容是接口的参数:
public class PmsMessageListener implements MessageListener { static Logger log=Logger.getLogger(PmsMessageListener.class); static final Gson GSON = new Gson(); @Autowired ForwardAddressHelper forwardAddressHelper; @Override public void onMessage(Message message) { log.debug("监听器接收到消息:"+message); if(null == message || !(message instanceof TextMessage))return; TextMessage textMessage = (TextMessage) message; String text = null; try { text = textMessage.getText();log.debug("message:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } if(StringUtil.isBlank(text))return; Map<String, String> messageMap = GSON.fromJson(text, new TypeToken<Map<String, String>>(){}.getType()); pmsForward(messageMap); } //消息转发-获取参数中对应参数调用对应接口 public void pmsForward(Map<String, String> map){ List<PmsForwardAddress> address = forwardAddressHelper.getAddress();//从内存获取转发地址 //封装参数 List<NameValuePair> params = new ArrayList<NameValuePair>(); Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator(); while(iterator.hasNext()){ params.add(new BasicNameValuePair(iterator.next().getKey(),iterator.next().getValue())); } address.forEach(x->{ CloseableHttpResponse response = null; CloseableHttpClient httpClient = HttpClients.createDefault(); try { URIBuilder builder = new URIBuilder(x.getAddress()); builder.setParameters(params); HttpGet get = new HttpGet(builder.build()); response = httpClient.execute(get); if(response != null && response.getStatusLine().getStatusCode() == 200)log.info("消息转发成功"); } catch (Exception e) {e.printStackTrace();log.error("消息转发失败"); } finally { try { httpClient.close();if(response != null)response.close(); } catch (IOException e) {e.printStackTrace();} } }); } }
PmsMessageListener 这个类配置在了xml文件中,会监听我们指定的mq的消息队列,只要有消息来就会取数据库里配置的接口一一调用!