zoukankan      html  css  js  c++  java
  • trident-mongo实现更新操作

        MongoDB是大数据技术中常用的NoSql型数据库,它提供的大量的查询、聚合等操作函数,对于大量查询的日志系统来说,该MongoDB是大数据日志存储的福音。Storm的高级编程技术Trident,也提供了与Mongo集成的方法,但官方只提供了新增的处理,对于常用的修改操作并未提供接口,本文提供了一种使用Trident进行mongoDB修改操作的方式,并且对持久化的数据提供了输出的拓展操作,具体代码见下方:

    1.自定以:MyMongoState

    package com.storm.trident.state;
    
    import com.mongodb.client.model.Filters;
    import org.apache.commons.lang.Validate;
    import org.apache.storm.mongodb.common.MongoDBClient;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.state.State;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Serializable;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 自定的mongo状态
     */
    public class MyMongoState implements State {
    
        private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
    
        private MyMongoState.Options options;
        private MongoDBClient mongoClient;
        private Map map;
    
        public MyMongoState(Map map, MyMongoState.Options options) {
            this.options = options;
            this.map = map;
        }
    
        public static class Options implements Serializable {
            private String url;
            private String collectionName;
            private MongoMapper mapper;
    
            public MyMongoState.Options withUrl(String url) {
                this.url = url;
                return this;
            }
    
            public MyMongoState.Options withCollectionName(String collectionName) {
                this.collectionName = collectionName;
                return this;
            }
    
            public MyMongoState.Options withMapper(MongoMapper mapper) {
                this.mapper = mapper;
                return this;
            }
        }
    
        protected void prepare() {
            Validate.notEmpty(options.url, "url can not be blank or null");
            Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
            Validate.notNull(options.mapper, "MongoMapper can not be null");
    
            this.mongoClient = new MongoDBClient(options.url, options.collectionName);
        }
    
        public void beginCommit(Long txid) {
            LOG.debug("beginCommit is noop.");
        }
    
        public void commit(Long txid) {
            LOG.debug("commit is noop.");
        }
    
        public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
            for (TridentTuple tuple : tuples) {
                Document document = options.mapper.toDocument(tuple);
                this.mongoClient.update(Filters.eq("userId", tuple.getInteger(0))
                        , new Document("$set", document), true);
            }
        }
    }

    2.自定义状态工厂:MyMongoStateFactory

    package com.storm.trident.state;
    
    import org.apache.storm.task.IMetricsContext;
    import org.apache.storm.trident.state.State;
    import org.apache.storm.trident.state.StateFactory;
    
    import java.util.Map;
    
    /**
     * 自定义的mongo状态工厂
     */
    public class MyMongoStateFactory implements StateFactory {
    
        private MyMongoState.Options options;
    
        public MyMongoStateFactory(MyMongoState.Options options) {
            this.options = options;
        }
    
        public State makeState(Map conf, IMetricsContext metrics,
                               int partitionIndex, int numPartitions) {
            MyMongoState state = new MyMongoState(conf, options);
            state.prepare();
            return state;
        }
    
    }

    3.自定义状态修改类:MyMongoStateUpdater

    package com.storm.trident.state;
    
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.state.BaseStateUpdater;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.List;
    
    /**
     * 自定义的状态更新
     */
    public class MyMongoStateUpdater extends BaseStateUpdater<MyMongoState> {
    
        public void updateState(MyMongoState state, List<TridentTuple> tuples,
                                TridentCollector collector) {
            state.updateState(tuples, collector);
            System.out.println(tuples.get(0).get(0) + "---" + tuples.get(0).get(1));
            collector.emit(new Values(tuples.get(0).get(0)));
        }
    
    }

    4.自定义创建状态工厂类:TridentMongoFactory

    package com.storm.trident.factory;
    
    import com.storm.trident.state.MyMongoState;
    import com.storm.trident.state.MyMongoStateFactory;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
    import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.mongodb.trident.state.MongoStateFactory;
    import org.apache.storm.trident.state.StateFactory;
    
    public class TridentMongoFactory {
        public static final String url = "mongodb://127.0.0.1:27017/test";
        public static final String collectionName = "user";
    
        /**
         * 使用jar包中的类实现持久化
         * @return
         */
        public static StateFactory getMongoInsertState() {
            MongoMapper mapper = new SimpleMongoMapper()
                    .withFields("userId", "name");
    
            MongoState.Options options = new MongoState.Options()
                    .withUrl(url)
                    .withCollectionName(collectionName)
                    .withMapper(mapper);
    
            return new MongoStateFactory(options);
        }
    
        /**
         * 使用自定state实现更新mongo
         * @return
         */
        public static StateFactory getMongoModifyState() {
            MongoMapper mapper = new SimpleMongoMapper()
                    .withFields("userId", "name");
    
            MyMongoState.Options options = new MyMongoState.Options()
                    .withUrl(url)
                    .withCollectionName(collectionName)
                    .withMapper(mapper);
    
            return new MyMongoStateFactory(options);
        }
    }

    5.自定义输出函数:PrintFunction

    package com.storm.trident.function;
    
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * 自定义输出函数
     */
    public class PrintFunction extends BaseFunction {
        public static final Long serialVersionUID = 1L;
    
        public void execute(TridentTuple tuple, TridentCollector collector) {
            System.out.println(tuple.get(0));
            collector.emit(tuple);
        }
    }

    6.创建Topology

    package com.storm.trident.topology;
    
    import com.google.common.collect.ImmutableList;
    import com.storm.trident.factory.TridentMongoFactory;
    import com.storm.trident.function.PrintFunction;
    import com.storm.trident.state.MyMongoStateUpdater;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.mongodb.trident.state.MongoStateUpdater;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.trident.state.StateFactory;
    import org.apache.storm.trident.testing.FeederBatchSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    public class TridentDemoTopology {
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            //创建spout
            FeederBatchSpout spout = new FeederBatchSpout(ImmutableList.of("userId", "name"));
    
            //mongoState-新增
            StateFactory factory = TridentMongoFactory.getMongoInsertState();
    
            //mongoState-修改
            StateFactory modifyFactory = TridentMongoFactory.getMongoModifyState();
    
            //创建流
            topology.newStream("spout", spout)
                    .partitionPersist(modifyFactory, new Fields("userId", "name"), new MyMongoStateUpdater(), new Fields("userId"))
                    .newValuesStream().each(new Fields("userId"),new PrintFunction(),new Fields("id")).parallelismHint(1);
    
            Config config = new Config();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("tridentDemo", config, topology.build());
    
            //模拟发送数据
            spout.feed(ImmutableList.of(new Values(1, "zhangsan1")));
            spout.feed(ImmutableList.of(new Values(2, "lisi2")));
            spout.feed(ImmutableList.of(new Values(3, "wangwu3")));
            spout.feed(ImmutableList.of(new Values(4, "zhaoliu4")));
        }
    }

    按如上代码进行操作,就是进行MongoDB的更新啦。

  • 相关阅读:
    AtCoder Beginner Contest 064 D
    ZOJ 3956 Course Selection System [01背包]
    理解01背包
    模块(二)
    内置函数+递归+模块使用
    函数进阶
    使用markdown编辑器
    函数进阶(二)
    函数进阶(一)
    函数基础
  • 原文地址:https://www.cnblogs.com/jpejie/p/9609153.html
Copyright © 2011-2022 走看看