zoukankan      html  css  js  c++  java
  • Storm-Mongodb详解




    public class SimpleMongoMapper implements MongoMapper {
        private String[] fields;
        public Document toDocument(ITuple tuple) {
            Document document = new Document();
            for(String field : fields){
                document.append(field, tuple.getValueByField(field));
            return document;
        public SimpleMongoMapper withFields(String... fields) {
            this.fields = fields;
            return this;
    String url = "mongodb://";
    String collectionName = "wordcount";
    MongoMapper mapper = new SimpleMongoMapper()
            .withFields("word", "count");
    MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);


    public class SimpleMongoUpdateMapper implements MongoMapper {
        private String[] fields;
        public Document toDocument(ITuple tuple) {
            Document document = new Document();
            for(String field : fields){
                document.append(field, tuple.getValueByField(field));
            return new Document("$set", document);
        public SimpleMongoUpdateMapper withFields(String... fields) {
            this.fields = fields;
            return this;


    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
     * @author cwc
     * @date 2018年6月1日  
     * @description:假数据生产厂
     * @version 1.0.0 
    public class MongodbSpout extends BaseRichSpout{
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector collector;
         * 作为字段word输出
        private static final Map<Integer, String> LASTNAME = new HashMap<Integer, String>();  
        static {  
            LASTNAME.put(0, "anderson");  
            LASTNAME.put(1, "watson");  
            LASTNAME.put(2, "ponting");  
            LASTNAME.put(3, "dravid");  
            LASTNAME.put(4, "lara");  
         * 作为字段val输出
        private static final Map<Integer, String> COMPANYNAME = new HashMap<Integer, String>();  
        static {  
            COMPANYNAME.put(0, "abc");  
            COMPANYNAME.put(1, "dfg");  
            COMPANYNAME.put(2, "pqr");  
            COMPANYNAME.put(3, "ecd");  
            COMPANYNAME.put(4, "awe");  
        public void open(Map conf, TopologyContext context,  
                SpoutOutputCollector spoutOutputCollector) {  
            this.collector = spoutOutputCollector;  
        public void nextTuple() {  
            final Random rand = new Random();  
            int randomNumber = rand.nextInt(5); 
            this.collector.emit (new Values(LASTNAME.get(randomNumber),COMPANYNAME.get(randomNumber)));
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("word","hello"));
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.tuple.ITuple;
    import org.bson.Document;
     * @author cwc
     * @date 2018年6月1日  
     * @description:
     * @version 1.0.0 
    public class SimpleMongoMapper implements MongoMapper {
    	 private String[] fields;
    	    public Document toDocument(ITuple tuple) {
    	        Document document = new Document();
    	        for(String field : fields){
    	            document.append(field, tuple.getValueByField(field));
    	        return document;
    	    public SimpleMongoMapper withFields(String... fields) {
    	        this.fields = fields;
    	        return this;

    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.tuple.ITuple;
    import org.bson.Document;
     * @author cwc
     * @date 2018年6月5日  
     * @description: 用于更新数据的mapper
     * @version 1.0.0 
    public class SimpleMongoUpdateMapper implements MongoMapper {
    	private static final long serialVersionUID = 1L;
    	private String[] fields;
        public Document toDocument(ITuple tuple) {
            Document document = new Document();
            for(String field : fields){
                document.append(field, tuple.getValueByField(field));
            return new Document("$set", document);
        public SimpleMongoUpdateMapper withFields(String... fields) {
            this.fields = fields;
            return this;

    import java.util.Map;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
     * @author cwc
     * @date 2018年5月30日  
     * @description:打印拿到的数据
     * @version 1.0.0 
    public class MongoOutBolt extends BaseRichBolt{
    	private static final long serialVersionUID = 1L;
    	private OutputCollector collector;
    	public void execute(Tuple tuple) {
    				String str =tuple.getString(0);
    //				String strs =tuple.getString(1);
    	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    		// TODO Auto-generated method stub
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("MongoOutBolt"));

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.mongodb.bolt.MongoInsertBolt;
    import org.apache.storm.mongodb.bolt.MongoUpdateBolt;
    import org.apache.storm.mongodb.common.QueryFilterCreator;
    import org.apache.storm.mongodb.common.SimpleQueryFilterCreator;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.topology.TopologyBuilder;
     * @author cwc
     * @date 2018年6月1日  
     * @description:storm-mongodb的写入,更新,读取
     * @version 1.0.0 
    public class MongodbMain {
    	private static String url = "mongodb://172.xx.xx.x:27017/test";
    	private static String collectionName = "storm";
    	public static void main(String[]args){
    //		lookMongodb(url, collectionName, args);
    //		writeMongodb(url, collectionName,args);
    		updateMongodb(url, collectionName,args);
    	 * 将数据写入到Mongodb
    	 * @param url
    	 * @param collectionName
    	public static void writeMongodb(String url,String collectionName,String[] args){
    		MongoMapper mapper = new SimpleMongoMapper()
    		        .withFields("word", "val","xx");
    		MongoInsertBolt insertBolt = new MongoInsertBolt(url, collectionName, mapper);
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("mongodb-save", new MongodbSpout(), 2);
    		builder.setBolt("save",  insertBolt, 1).shuffleGrouping("mongodb-save");
    		Config conf = new Config();
    		String name = MongodbMain.class.getSimpleName();
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    		} else {
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    	 * 更新mongodb数据
    	 * @param url
    	 * @param collectionName
    	public static void updateMongodb(String url,String collectionName,String[] args){
    		MongoMapper mapper =new SimpleMongoUpdateMapper()
    				.withFields("word", "hello");
    		QueryFilterCreator updateQueryCreator = new SimpleQueryFilterCreator()
            MongoUpdateBolt updateBolt = new MongoUpdateBolt(url, collectionName, updateQueryCreator, mapper);
            TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("mongodb-update", new MongodbSpout(), 2);
    		builder.setBolt("update",  updateBolt, 1).shuffleGrouping("mongodb-update");
    		Config conf = new Config();
    		String name = MongodbMain.class.getSimpleName();
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    		} else {
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    	 * 读取mongodb数据
    	 * @param url
    	 * @param collectionName
    	public static void lookMongodb(String url,String collectionName,String[] args){
    		MongodbSpout spout =new MongodbSpout();
    		 MongoLookupMapper mapper = new SimpleMongoLookupMapper()
    	                .withFields("word", "hello");
    		 QueryFilterCreator filterCreator = new SimpleQueryFilterCreator()
    		 MongoLookupBolt lookupBolt = new MongoLookupBolt(url, collectionName, filterCreator, mapper);
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("mongodb-look", new MongodbSpout(), 2);
    		builder.setBolt("mongodb-out", lookupBolt, 1).shuffleGrouping("mongodb-look");
    		builder.setBolt("out", new MongoOutBolt(), 1).shuffleGrouping("mongodb-out");
    		Config conf = new Config();
    		String name = MongodbMain.class.getSimpleName();
    		if (args != null && args.length > 0) {
    		String nimbus = args[0];
    		conf.put(Config.NIMBUS_HOST, nimbus);
    		try {
    			StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());
    		} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
    			// TODO Auto-generated catch block
    		} else {
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology(name, conf, builder.createTopology());
    		try {
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block

    关于storm读取mongodb暂时还有些问题,因为时间原因 过段时间进行解决。


    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.mongodb.trident.state.MongoStateFactory;
    import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.state.StateFactory;
    import org.apache.storm.trident.testing.FixedBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import com.sunsheen.jfids.bigdata.storm.demo.mongodb.MongodbSpout;
    import com.sunsheen.jfids.bigdata.storm.demo.mongodb.SimpleMongoMapper;
     * @author cwc
     * @date 2018年6月5日  
     * @description:Storm-mongodb写入高级接口,写入普通数据
     * @version 1.0.0 
    public class MongoTridentState {
    	public static void main(String[]args){
    		String url = "mongodb://172.xxx.xxx.xxx:27017/test";
    		String collectionName = "storm";
    		Config conf = new Config();
    		if (args != null && args.length > 0) { 
    			try {
    				StormSubmitter.submitTopology(args[1], conf, mongoTrident(url,collectionName));
    			} catch (AlreadyAliveException e) {
    				// TODO Auto-generated catch block
    			} catch (InvalidTopologyException e) {
    				// TODO Auto-generated catch block
    			} catch (AuthorizationException e) {
    				// TODO Auto-generated catch block
    			} else { 
    		LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf,  mongoTrident(url,collectionName));
            try {
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    	 * 写入到mongodb
    	 * @param url
    	 * @param collectionName
    	 * @return
    	public static StormTopology mongoTrident(String url,String collectionName){
    //		FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence", "key"), 5000, new Values("the cow jumped over the moon", 1l),
    //	            new Values("the man went to the store and bought some candy", 2l), new Values("four score and seven years ago", 3l),
    //	            new Values("how many apples can you eat", 4l), new Values("to be or not to be the person", 5l));
    //	    spout.setCycle(true);
    		MongodbSpout spout =new MongodbSpout();
    		MongoMapper mapper = new SimpleMongoMapper()
    	        MongoState.Options options = new MongoState.Options()
    	        StateFactory factory = new MongoStateFactory(options);
    	        TridentTopology topology = new TridentTopology();
    	        Stream stream = topology.newStream("stream", spout);
    	        stream.partitionPersist(factory, new Fields("word"),  new MongoStateUpdater(), new Fields());
    	        return topology.build();


  • 相关阅读:
    【Azure Redis 缓存】使用Azure Redis服务时候,如突然遇见异常,遇见命令Timeout performing SET xxxxxx等情况,如何第一时间查看是否有Failover存在呢?
    【Azure Redis 缓存】Azure Redis出现了超时问题后,记录一步一步的排查出异常的客户端连接和所执行命令的步骤
    【Azure Developer】Azure REST API: 如何通过 API查看 Recovery Services Vaults(恢复保管库)的备份策略信息? 如备份中是否含有虚拟机的Disk
    【Azure Redis 缓存】云服务Worker Role中调用StackExchange.Redis,遇见莫名异常(RedisConnectionException: UnableToConnect on xxx 或 No connection is available to service this operation: xxx)
    【Azure 环境】Azure通知中心(Notification Hub)使用百度推送平台解说
    【Azure 应用服务】Azure Function App使用SendGrid发送邮件遇见异常消息The operation was canceled,分析源码渐入最源端
    规模化敏捷 LeSS(三):LeSS Huge 是怎样炼成的?
  • 原文地址:https://www.cnblogs.com/wanchen-chen/p/12934128.html
Copyright © 2011-2022 走看看