zoukankan      html  css  js  c++  java
  • Storm-Mongodb详解

    这两天研究Trident,踩坑踩的遭不住,来和大家分享下。

    首先写入数据库:

    先看官方API给我们提供的方法:

    //这是用来辅助下面MongoInsert的简单实现类
    public class SimpleMongoMapper implements MongoMapper {
        private String[] fields;
    
        @Override
        public Document toDocument(ITuple tuple) {
            Document document = new Document();
            for(String field : fields){
                document.append(field, tuple.getValueByField(field));
            }
            return document;
        }
    
        public SimpleMongoMapper withFields(String... fields) {
            this.fields = fields;
            return this;
        }
    }
    //写入数据库
    String url = "mongodb://127.0.0.1:27017/test";
    String collectionName = "wordcount";
    
    MongoMapper mapper = new SimpleMongoMapper()
            .withFields("word", "count");
    
    MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);

    一起把官网关于更新的mapper写了:

    public class SimpleMongoUpdateMapper implements MongoMapper {
        private String[] fields;
    
        @Override
        public Document toDocument(ITuple tuple) {
            Document document = new Document();
            for(String field : fields){
                document.append(field, tuple.getValueByField(field));
            }
            return new Document("$set", document);
        }
    
        public SimpleMongoUpdateMapper withFields(String... fields) {
            this.fields = fields;
            return this;
        }
    }

    下面是自己的代码,从spout到最后的存储与跟新:

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    /**
     * @author cwc
     * @date 2018年6月1日  
     * @description:假数据生产厂
     * @version 1.0.0 
     */
    public class MongodbSpout extends BaseRichSpout{
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector collector;
    	 /**
         * 作为字段word输出
         */
        private static final Map<Integer, String> LASTNAME = new HashMap<Integer, String>();  
        static {  
            LASTNAME.put(0, "anderson");  
            LASTNAME.put(1, "watson");  
            LASTNAME.put(2, "ponting");  
            LASTNAME.put(3, "dravid");  
            LASTNAME.put(4, "lara");  
        }  
        /**
         * 作为字段val输出
         */
        private static final Map<Integer, String> COMPANYNAME = new HashMap<Integer, String>();  
        static {  
            COMPANYNAME.put(0, "abc");  
            COMPANYNAME.put(1, "dfg");  
            COMPANYNAME.put(2, "pqr");  
            COMPANYNAME.put(3, "ecd");  
            COMPANYNAME.put(4, "awe");  
        }  
      
        public void open(Map conf, TopologyContext context,  
                SpoutOutputCollector spoutOutputCollector) {  
            this.collector = spoutOutputCollector;  
        }  
      
        public void nextTuple() {  
            final Random rand = new Random();  
            int randomNumber = rand.nextInt(5); 
            this.collector.emit (new Values(LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));
            System.out.println("数据来袭!!!!!!");
        }  
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		//这边字段数量与上面的传输数量注意要一致
    		declarer.declare(new Fields("word","hello"));
    	}
    
    	}
    
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.tuple.ITuple;
    import org.bson.Document;
    
    /**
     * @author cwc
     * @date 2018年6月1日  
     * @description:
     * @version 1.0.0 
     */
    public class SimpleMongoMapper implements MongoMapper {
    
    	 private String[] fields;
    	 	
    	    @Override
    	    public Document toDocument(ITuple tuple) {
    	    	
    	        Document document = new Document();
    	        for(String field : fields){
    	            document.append(field, tuple.getValueByField(field));
    	        }
    	        return document;
    	    }
    
    	    public SimpleMongoMapper withFields(String... fields) {
    	        this.fields = fields;
    	        return this;
    	    }
    }

    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.tuple.ITuple;
    import org.bson.Document;
    
    /**
     * @author cwc
     * @date 2018年6月5日  
     * @description: 用于更新数据的mapper
     * @version 1.0.0 
     */
    public class SimpleMongoUpdateMapper implements MongoMapper {
    	private static final long serialVersionUID = 1L;
    	private String[] fields;
    
        @Override
        public Document toDocument(ITuple tuple) {
            Document document = new Document();
            for(String field : fields){
                document.append(field, tuple.getValueByField(field));
            }
            return new Document("$set", document);
        }
    
        public SimpleMongoUpdateMapper withFields(String... fields) {
            this.fields = fields;
            return this;
        }
    }

    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:打印拿到的数据
     * @version 1.0.0 
     */
    public class MongoOutBolt extends BaseRichBolt{
    	private static final long serialVersionUID = 1L;
    	private OutputCollector collector;
    	@Override
    	public void execute(Tuple tuple) {
    				String str =tuple.getString(0);
    //				String strs =tuple.getString(1);
    				System.err.println(str);
    				
    	}
    
    	@Override
    	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.collector=collector;
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("MongoOutBolt"));
    	}
    	
    	
    }
    

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.mongodb.bolt.MongoInsertBolt;
    import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
    import org.apache.storm.mongodb.common.QueryFilterCreator;
    import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * @author cwc
     * @date 2018年6月1日  
     * @description:storm-mongodb的写入,更新,读取
     * @version 1.0.0 
     */
    public class MongodbMain {
    	private static String url = "mongodb://172.xx.xx.x:27017/test";
    	private static String collectionName = "storm";
    	public static void main(String[]args){
    		
    //		lookMongodb(url, collectionName, args);
    //		writeMongodb(url, collectionName,args);
    		updateMongodb(url, collectionName,args);
    	}
    	/**
    	 * 将数据写入到Mongodb
    	 * @param url
    	 * @param collectionName
    	 */
    	public static void writeMongodb(String url,String collectionName,String[] args){
    		MongoMapper mapper = new SimpleMongoMapper()
    		        .withFields("word", "val","xx");
    		MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
    		
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("mongodb-save", new MongodbSpout(), 2);
    		builder.setBolt("save",  insertBolt, 1).shuffleGrouping("mongodb-save");
    		
    		Config conf = new Config();
    		String name = MongodbMain.class.getSimpleName();
    		
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		conf.setNumWorkers(3);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		} else {
    		conf.setMaxTaskParallelism(3);
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    			Thread.sleep(100000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		cluster.shutdown();
    		}
    	}
    	/**
    	 * 更新mongodb数据
    	 * @param url
    	 * @param collectionName
    	 */
    	public static void updateMongodb(String url,String collectionName,String[] args){
    		MongoMapper mapper =new SimpleMongoUpdateMapper()
    				.withFields("word", "hello");
    		QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
                    .withField("word");
    
            MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
            
            TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("mongodb-update", new MongodbSpout(), 2);
    		builder.setBolt("update",  updateBolt, 1).shuffleGrouping("mongodb-update");
    		
    		Config conf = new Config();
    		String name = MongodbMain.class.getSimpleName();
    		
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		conf.setNumWorkers(3);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		} else {
    		conf.setMaxTaskParallelism(3);
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    			Thread.sleep(100000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		cluster.shutdown();
    		}
    	}
    	/**
    	 * 读取mongodb数据
    	 * @param url
    	 * @param collectionName
    	 */
    	public static void lookMongodb(String url,String collectionName,String[] args){
    		MongodbSpout spout =new MongodbSpout();
    		 MongoLookupMapper mapper = new SimpleMongoLookupMapper()
    	                .withFields("word", "hello");
    		 QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
    	                .withField("word");
    		 MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("mongodb-look", new MongodbSpout(), 2);
    		builder.setBolt("mongodb-out", lookupBolt, 1).shuffleGrouping("mongodb-look");
    		builder.setBolt("out", new MongoOutBolt(), 1).shuffleGrouping("mongodb-out");
    		
    		Config conf = new Config();
    		String name = MongodbMain.class.getSimpleName();
    		
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		conf.setNumWorkers(3);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		} else {
    		conf.setMaxTaskParallelism(3);
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    			Thread.sleep(100000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		cluster.shutdown();
    		}
    	}
    	
    	
    }

    关于storm读取mongodb暂时还有些问题,因为时间原因 过段时间进行解决。

    再看看Trident代码:

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.mongodb.trident.state.MongoStateFactory;
    import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.state.StateFactory;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import com.sunsheen.jfids.bigdata.storm.demo.mongodb.MongodbSpout;
    import com.sunsheen.jfids.bigdata.storm.demo.mongodb.SimpleMongoMapper;
    
    /**
     * @author cwc
     * @date 2018年6月5日  
     * @description:Storm-mongodb写入高级接口,写入普通数据
     * @version 1.0.0 
     */
    public class MongoTridentState {
    	public static void main(String[]args){
    		String url = "mongodb://172.xxx.xxx.xxx:27017/test";
    		String collectionName = "storm";
    		
    		Config conf = new Config();
    		conf.setMaxSpoutPending(3);
    		if (args != null && args.length > 0) { 
    			//服务器
    			try {
    				StormSubmitter.submitTopology(args[1], conf, mongoTrident(url,collectionName));
    			} catch (AlreadyAliveException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (InvalidTopologyException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			} catch (AuthorizationException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			} else { 
    				//本地
    		LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf,  mongoTrident(url,collectionName));
            try {
    			Thread.sleep(100000);
    			cluster.shutdown();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		}
    	}
    	/**
    	 * 写入到mongodb
    	 * @param url
    	 * @param collectionName
    	 * @return
    	 */
    	public static StormTopology mongoTrident(String url,String collectionName){
    		//测试专用
    //		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 5000, new Values("the cow jumped over the moon", 1l),
    //	            new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
    //	            new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
    //	    spout.setCycle(true);
    		MongodbSpout spout =new MongodbSpout();
    		
    		MongoMapper mapper = new SimpleMongoMapper()
    	                .withFields("word");
    
    	        MongoState.Options options = new MongoState.Options()
    	                .withUrl(url)
    	                .withCollectionName(collectionName)
    	                .withMapper(mapper);
    
    	        StateFactory factory = new MongoStateFactory(options);
    
    	        TridentTopology topology = new TridentTopology();
    	        Stream stream = topology.newStream("stream", spout);
    	        stream.partitionPersist(factory, new Fields("word"),  new MongoStateUpdater(), new Fields());
    	        return topology.build();
    	}
    }

    关于storm-mongodb的详解就暂时写到这里,改天有时间再进行补充,研究。



    深夜码文不易,若对看官有帮助,望看官可以在右侧打赏。
  • 相关阅读:
    使用union all 遇到的问题(俩条sql语句行数的和 不等于union all 后的 行数的和 !);遗留问题 怎么找到 相差的呐俩条数据 ?
    78W的数据使用forall 进行批量转移;
    oracle 不能是用变量来作为列名和表名 ,但使用动态sql可以;
    oracle 查询优化及sql改写
    (十三)Jmeter之Bean Shell 的使用(二)
    (十二)Jmeter之Bean Shell的使用(一)
    (十一)Jmeter另一种调试工具 HTTP Mirror Server
    Image Processing and Computer Vision_Review:A survey of recent advances in visual feature detection(Author's Accepted Manuscript)——2014.08
    Image Processing and Computer Vision_Review:Local Invariant Feature Detectors: A Survey——2007.11
    Image Processing and Computer Vision_Review:A survey of recent advances in visual feature detection—2014.08
  • 原文地址:https://www.cnblogs.com/wanchen-chen/p/12934128.html
Copyright © 2011-2022 走看看