zoukankan      html  css  js  c++  java
  • Sender(agumaster_crawler)->RabbitMq->Reciever(agumaster)

    发送方:

    package com.heyang.agumasterCrawler;
    
    import java.util.List;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.heyang.agumasterCrawler.codename.BaseCodeNameCrawler;
    import com.heyang.agumasterCrawler.codename.FenghuangCrawler;
    import com.heyang.agumasterCrawler.entity.Stock;
    import com.heyang.agumasterCrawler.entity.StockBundle;
    import com.heyang.agumasterCrawler.sender.Sender;
    
    @SpringBootApplication
    public class AgumasterCrawlerApplication implements CommandLineRunner {
        @Autowired
        private Sender sender=null;
        
        @Override
        public void run(String... args) throws Exception {
            BaseCodeNameCrawler crawler=new FenghuangCrawler();
            List<Stock> stockList=crawler.getStockList();
            
            StockBundle sb=new StockBundle();
            sb.setSource("凤凰财经");
            sb.setCount(stockList.size());
            sb.setStockList(stockList);
            sb.setType("stock");
            
            ObjectMapper mapper = new ObjectMapper();
            
            String str=mapper.writeValueAsString(sb);
            this.sender.send("stockQueue",str);
        }
        
        public static void main(String[] args) {
            SpringApplication.run(AgumasterCrawlerApplication.class, args);
        }
    }

    传到MQ的消息:

    {"type":"stock","source":"凤凰财经","count":3791,"stockList":[{"id":0,"code":"688466","name":"N金科"},{"id":1,"code":"000825","name":"太钢不锈"},{"id":2,"code":"300022","name":"吉峰科技"},{"id":3,"code":"002536","name":"飞龙股份"},{"id":4,"code":"300459","name":"金科文化"},{"id":5,"code":"00240...
    ]}

    接收方:

    package com.ufo.hy.agumaster.mq;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import com.fasterxml.jackson.databind.JsonNode;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.ufo.hy.agumaster.entity.Stock;
    import com.ufo.hy.agumaster.service.StockService;
    
    /**
     * Used to receive stock code/names
     * @author Heyang
     *
     */
    @Component
    @RabbitListener(queues="stockQueue")
    public class Receiver {
        private final Logger logger = LoggerFactory.getLogger(Receiver.class);
        
        @Autowired
        protected StockService stkService=null;
        
        @RabbitHandler
        public void QueueReceive(String receivedMsg) {
            logger.info("Got mas:"+receivedMsg);
            ObjectMapper mapper = new ObjectMapper();
            
            try {
                JsonNode node = mapper.readTree(receivedMsg);
                String type=node.get("type").asText();
                
                if("stock".equals(type)) {
                    JsonNode listNode=node.path("stockList");
                    String source=node.get("source").asText();
                    String count=node.get("count").asText();
                    logger.info("Got {} stocks from {}.",count,source);
    
                    // 遍历list节点的子节点
                    List<Stock> stockList=new ArrayList<Stock>();
                    Iterator<JsonNode> iterator = listNode.elements();
                    while (iterator.hasNext()) {
                        JsonNode stock = iterator.next();
                        String code=stock.get("code").asText();
                        String name=stock.get("name").asText();
                        
                        stockList.add(new Stock(0,code,name));
                    }
                    
                    int[] arr=stkService.batchUpdate(stockList,source,null);
                    int inserted=arr[0],updated=arr[1];
                    
                    logger.info("Updated {},inserted {}.",updated,inserted);
                }                   
                        
            }catch(Exception ex) {
                ex.printStackTrace();
            }
        }
    } 
  • 相关阅读:
    P2P之UDP穿透NAT的原理与实现
    Jmeter压力测试工具安装及使用教程
    整合Solr与tomcat以及第一个core的配置
    windows下Redis安装及利用java操作Redis
    spring整合Jersey 无法注入service的问题
    MySQL-Navicat连接MySQL出现1251或1130报错的解决方法
    华为VRP
    开发工具-Sublime
    服务器-Windows 2003 R2-取消多用户登录-多个用户登录显示不同的界面解决方法
    服务器-惠普 HP ProLiant-linux系统-RAID信息查看和阵列卡操作-hpacucli工具使用
  • 原文地址:https://www.cnblogs.com/heyang78/p/12862970.html
Copyright © 2011-2022 走看看