发送方:
package com.heyang.agumasterCrawler; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.fasterxml.jackson.databind.ObjectMapper; import com.heyang.agumasterCrawler.codename.BaseCodeNameCrawler; import com.heyang.agumasterCrawler.codename.FenghuangCrawler; import com.heyang.agumasterCrawler.entity.Stock; import com.heyang.agumasterCrawler.entity.StockBundle; import com.heyang.agumasterCrawler.sender.Sender; @SpringBootApplication public class AgumasterCrawlerApplication implements CommandLineRunner { @Autowired private Sender sender=null; @Override public void run(String... args) throws Exception { BaseCodeNameCrawler crawler=new FenghuangCrawler(); List<Stock> stockList=crawler.getStockList(); StockBundle sb=new StockBundle(); sb.setSource("凤凰财经"); sb.setCount(stockList.size()); sb.setStockList(stockList); sb.setType("stock"); ObjectMapper mapper = new ObjectMapper(); String str=mapper.writeValueAsString(sb); this.sender.send("stockQueue",str); } public static void main(String[] args) { SpringApplication.run(AgumasterCrawlerApplication.class, args); } }
传到MQ的消息:
{"type":"stock","source":"凤凰财经","count":3791,"stockList":[{"id":0,"code":"688466","name":"N金科"},{"id":1,"code":"000825","name":"太钢不锈"},{"id":2,"code":"300022","name":"吉峰科技"},{"id":3,"code":"002536","name":"飞龙股份"},{"id":4,"code":"300459","name":"金科文化"},{"id":5,"code":"00240... ]}
接收方:
package com.ufo.hy.agumaster.mq; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.ufo.hy.agumaster.entity.Stock; import com.ufo.hy.agumaster.service.StockService; /** * Used to receive stock code/names * @author Heyang * */ @Component @RabbitListener(queues="stockQueue") public class Receiver { private final Logger logger = LoggerFactory.getLogger(Receiver.class); @Autowired protected StockService stkService=null; @RabbitHandler public void QueueReceive(String receivedMsg) { logger.info("Got mas:"+receivedMsg); ObjectMapper mapper = new ObjectMapper(); try { JsonNode node = mapper.readTree(receivedMsg); String type=node.get("type").asText(); if("stock".equals(type)) { JsonNode listNode=node.path("stockList"); String source=node.get("source").asText(); String count=node.get("count").asText(); logger.info("Got {} stocks from {}.",count,source); // 遍历list节点的子节点 List<Stock> stockList=new ArrayList<Stock>(); Iterator<JsonNode> iterator = listNode.elements(); while (iterator.hasNext()) { JsonNode stock = iterator.next(); String code=stock.get("code").asText(); String name=stock.get("name").asText(); stockList.add(new Stock(0,code,name)); } int[] arr=stkService.batchUpdate(stockList,source,null); int inserted=arr[0],updated=arr[1]; logger.info("Updated {},inserted {}.",updated,inserted); } }catch(Exception ex) { ex.printStackTrace(); } } }