zoukankan      html  css  js  c++  java
  • Trident整合MongoDB

    MongoDB是大数据技术中常用的NoSql型数据库,它提供的大量的查询、聚合等操作函数,对于大量查询的日志系统来说,该MongoDB是大数据日志存储的福音。Storm的高级编程技术Trident,也提供了与Mongo集成的方法,但官方只提供了新增的处理,对于常用的修改操作并未提供接口,本文提供了一种使用Trident进行mongoDB修改操作的方式,并且对持久化的数据提供了输出的拓展操作,具体代码见下方:

    import java.util.Objects;
    
    /**
     * <p>
     * Date-Time: 2018/09/05   15:14
     * Company: 百趣
     * </p>
     * 请求类型枚举
     *
     * @author fangyuanjie
     * @version 1.0.0
     */
    
    public enum MethodTypeEnum {
    
        // GET请求
        GET("GET", "GET请求"),
    
        // POST请求
        POST("POST", "POST请求");
    
        private String code;
        private String desc;
    
        public String getCode() {
            return code;
        }
    
        public void setCode(String code) {
            this.code = code;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        MethodTypeEnum(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    
        public static MethodTypeEnum getByCode(String code) {
            for (MethodTypeEnum methodTypeEnum : values()) {
                if (Objects.equals(methodTypeEnum.getCode(), code)) {
                    return methodTypeEnum;
                }
            }
            return null;
        }
    
    }import com.alibaba.fastjson.JSONException;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang.StringUtils;
    import org.apache.storm.trident.operation.BaseFilter;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * <p>
     * Copyright: Copyright (c) 2018/9/10 14:28
     * <p>
     * Company: 百趣
     * <p>
     *  格式过滤
     * @author tangzhe
     * @version 1.0.0
     */
    public class FormatFilter extends BaseFilter {
    
        @Override
        public boolean isKeep(TridentTuple tuple) {
            String message = tuple.getStringByField("str");
            System.out.println(this.getClass().getSimpleName() + "->message:" + message);
            if (StringUtils.isBlank(message)) {
                System.out.println(this.getClass().getSimpleName() + ": 消息不能为空!");
                return false;
            }
            JSONObject jsonObject;
            try {
                jsonObject = JSONObject.parseObject(message);
            } catch (Exception e) {
                System.out.println(this.getClass().getSimpleName() + ": 消息格式有误!");
                return false;
            }
            if (jsonObject.getLong("reqTime") == null ||
                    jsonObject.getJSONObject("headers") == null ||
                    jsonObject.getString("reqURI") == null) {
                System.out.println(this.getClass().getSimpleName() + ": 请求信息不能为空!");
                return false;
            }
            try {
                jsonObject.getJSONObject("headers");
                jsonObject.getJSONObject("uriArgs");
                jsonObject.getJSONObject("bodyData");
            } catch (JSONException e) {
                System.out.println(this.getClass().getSimpleName() + ": 请求信息格式有误!");
                return false;
            }
            return true;
        }
    
    }import com.alibaba.fastjson.JSONObject;
    import net.baiqu.storm.trident.enums.MethodTypeEnum;
    import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.Date;
    
    /**
     * <p>
     * Copyright: Copyright (c) 2018/9/10 14:34
     * <p>
     * Company: 百趣
     * <p>
     * 日志解析函数
     * @author tangzhe
     * @version 1.0.0
     */
    public class OperateLogParseFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple tuple, TridentCollector collector) {
            String message = tuple.getStringByField("str");
            JSONObject jsonObject = JSONObject.parseObject(message);
            System.out.println(this.getClass().getSimpleName() + "->message: " + message);
            JSONObject headers = jsonObject.getJSONObject("headers");
            JSONObject uriArgs = null;
            String method = jsonObject.getString("method");
            if (MethodTypeEnum.GET.getCode().equals(method)) {
                uriArgs = jsonObject.getJSONObject("uriArgs");
            } else if (MethodTypeEnum.POST.getCode().equals(method)) {
                uriArgs = jsonObject.getJSONObject("bodyData");
            }
            uriArgs = uriArgs != null ? uriArgs : new JSONObject();
            String appId = jsonObject.getString("appId");
            String userId = uriArgs.getString("userId");
            String ip = jsonObject.getString("ip");
            String host = headers.getString("host");
            String requestURI = jsonObject.getString("reqURI");
            String username = uriArgs.getString("username");
            String role = uriArgs.getString("role");
            String memo = uriArgs.getString("memo");
            Date requestTime = new Date(jsonObject.getLong("reqTime") * 1000);
            collector.emit(new Values(appId, host, requestURI, method, ip, requestTime,
                    userId, username, role, memo, new Date()));
        }
    
    }import org.apache.storm.trident.operation.BaseFunction;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.tuple.TridentTuple;
    
    /**
     * <p>
     * Copyright: Copyright (c) 2018/9/10 16:33
     * <p>
     * Company: 百趣
     * <p>
     * 结果记录函数
     * @author tangzhe
     * @version 1.0.0
     */
    public class OperatePrintFunction extends BaseFunction {
    
        @Override
        public void execute(TridentTuple input, TridentCollector collector) {
            String result = input.getStringByField("result");
            if ("success".equalsIgnoreCase(result)) {
                System.out.println(this.getClass().getSimpleName() + "->: 插入mongo成功");
            } else {
                System.out.println(this.getClass().getSimpleName() + "->: 插入mongo失败");
            }
        }
    }import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.state.BaseStateUpdater;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.List;
    
    /**
     * <p>
     * Copyright: Copyright (c) 2018/9/10 16:29
     * <p>
     * Company: 百趣
     * <p>
     *
     * @author tangzhe
     * @version 1.0.0
     */
    public class MyMongoStateUpdater extends BaseStateUpdater<MongoState> {
    
        @Override
        public void updateState(MongoState state, List<TridentTuple> tuples,
                                TridentCollector collector) {
            try {
                state.updateState(tuples, collector);
                collector.emit(new Values("success"));
            } catch (Exception e) {
                e.printStackTrace();
                collector.emit(new Values("fail"));
            }
        }
    }import com.google.common.collect.Lists;
    import com.mongodb.client.model.Filters;
    import org.apache.commons.lang.Validate;
    import org.apache.storm.mongodb.common.MongoDBClient;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.state.State;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.bson.Document;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.Serializable;
    import java.util.List;
    import java.util.Map;
    
    /**
     * <p>
     * Date-Time: 2018/09/10   13:50
     * Company: 百趣
     * </p>
     *
     * @author tangzhe
     * @version 1.0.0
     */
    public class OperateMongoState implements State {
    
        private static final Logger LOG = LoggerFactory.getLogger(MongoState.class);
    
        private OperateMongoState.Options options;
        private MongoDBClient mongoClient;
        private Map map;
    
        protected OperateMongoState(Map map, OperateMongoState.Options options) {
            this.options = options;
            this.map = map;
        }
    
        public static class Options implements Serializable {
            private String url;
            private String collectionName;
            private MongoMapper mapper;
    
            public OperateMongoState.Options withUrl(String url) {
                this.url = url;
                return this;
            }
    
            public OperateMongoState.Options withCollectionName(String collectionName) {
                this.collectionName = collectionName;
                return this;
            }
    
            public OperateMongoState.Options withMapper(MongoMapper mapper) {
                this.mapper = mapper;
                return this;
            }
        }
    
        protected void prepare() {
            Validate.notEmpty(options.url, "url can not be blank or null");
            Validate.notEmpty(options.collectionName, "collectionName can not be blank or null");
            Validate.notNull(options.mapper, "MongoMapper can not be null");
    
            this.mongoClient = new MongoDBClient(options.url, options.collectionName);
        }
    
        @Override
        public void beginCommit(Long txid) {
            LOG.debug("beginCommit is noop.");
        }
    
        @Override
        public void commit(Long txid) {
            LOG.debug("commit is noop.");
        }
    
        public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
            List<Document> documents = Lists.newArrayList();
            for (TridentTuple tuple : tuples) {
                Document document = options.mapper.toDocument(tuple);
                documents.add(document);
            }
            this.mongoClient.update(
                    Filters.eq("logDate",
                            tuples.get(0).getStringByField("logDate")),
                    new Document("$set", documents.get(0)), true);
        }
    
    }import org.apache.storm.task.IMetricsContext;
    import org.apache.storm.trident.state.State;
    import org.apache.storm.trident.state.StateFactory;
    
    import java.util.Map;
    
    /**
     * <p>
     * Date-Time: 2018/09/10   13:50
     * Company: 百趣
     * </p>
     *
     * @author tangzhe
     * @version 1.0.0
     */
    public class OperateMongoStateFactory implements StateFactory {
    
        private OperateMongoState.Options options;
    
        public OperateMongoStateFactory(OperateMongoState.Options options) {
            this.options = options;
        }
    
        @Override
        public State makeState(Map conf, IMetricsContext metrics,
                               int partitionIndex, int numPartitions) {
            OperateMongoState state = new OperateMongoState(conf, options);
            state.prepare();
            return state;
        }
    
    }
    package net.baiqu.storm.trident.state;
    
    import org.apache.storm.trident.operation.TridentCollector;
    import org.apache.storm.trident.state.BaseStateUpdater;
    import org.apache.storm.trident.tuple.TridentTuple;
    import org.apache.storm.tuple.Values;
    
    import java.util.List;
    
    /**
     * <p>
     * Date-Time: 2018/09/10   13:50
     * Company: 百趣
     * </p>
     *
     * @author tangzhe
     * @version 1.0.0
     */
    public class OperateMongoStateUpdater extends BaseStateUpdater<OperateMongoState> {
    
        @Override
        public void updateState(OperateMongoState state, List<TridentTuple> tuples, TridentCollector collector) {
            state.updateState(tuples, collector);
            String userId = tuples.get(0).getStringByField("userId");
            collector.emit(new Values(userId));
        }
    
    }
    package net.baiqu.storm.trident.topology;
    
    import kafka.api.OffsetRequest;
    import net.baiqu.storm.trident.filter.FormatFilter;
    import net.baiqu.storm.trident.function.OperateLogParseFunction;
    import net.baiqu.storm.trident.function.OperatePrintFunction;
    import net.baiqu.storm.trident.state.MyMongoStateUpdater;
    import net.baiqu.storm.trident.util.TridentMongoFactory;
    import net.baiqu.storm.utils.Constants;
    import org.apache.commons.lang.StringUtils;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.BrokerHosts;
    import org.apache.storm.kafka.StringScheme;
    import org.apache.storm.kafka.ZkHosts;
    import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout;
    import org.apache.storm.kafka.trident.TridentKafkaConfig;
    import org.apache.storm.spout.SchemeAsMultiScheme;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    
    /**
     * <p>
     * Date-Time: 2018/09/10   13:50
     * Company: 百趣
     * </p>
     *
     * @author tangzhe
     * @version 1.0.0
     */
    public class OperateLogTridentTopology {
    
        public static void main(String[] args) {
            TridentTopology topology = new TridentTopology();
    
            BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS);
            String topic = Constants.KAFKA_LOG_TOPIC;
            String zkRoot = Constants.ZK_KAFKA_ROOT;
            String id = Constants.KAFKA_SPOUT_ID;
    
            TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id);
            kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    
            // demo模式设置读取偏移量的操作
            if (StringUtils.equalsIgnoreCase("demo", Constants.MODE)) {
                kafkaConfig.startOffsetTime = OffsetRequest.LatestTime();
            }
    
            TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig);
    
            Stream stream = topology.newStream("kafkaSpout", kafkaSpout).parallelismHint(1);
            stream.shuffle().each(new Fields("str"), new FormatFilter())
                    .parallelismHint(1)
                    .shuffle().each(new Fields("str"), new OperateLogParseFunction(),
                    new Fields("appId", "host", "requestURI", "method", "ip",
                            "requestTime", "userId", "username", "role", "memo", "logDate"))
                    .parallelismHint(1)
                    .partitionPersist(TridentMongoFactory.getMongoInsertState(),
                            new Fields("appId", "host", "requestURI", "method", "ip",
                                    "requestTime", "userId", "username", "role", "memo", "logDate"),
                            new MyMongoStateUpdater(),
                            new Fields("result"))
                    .parallelismHint(1)
                    .newValuesStream().shuffle().each(
                            new Fields("result"), new OperatePrintFunction(), new Fields("none"))
                    .parallelismHint(1);
    
            Config config = new Config();
            if (StringUtils.equalsIgnoreCase("demo", Constants.MODE)) {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("operateLogTridentTopology", config, topology.build());
            } else {
                config.setNumWorkers(1);
                config.put(Config.NIMBUS_HOST, Constants.NIMBUS_HOST);
                config.put(Config.NIMBUS_THRIFT_PORT, Constants.NIMBUS_THRIFT_PORT);
                config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1);
                try {
                    StormSubmitter.submitTopology(args[0], config, topology.build());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    package net.baiqu.storm.trident.util;
    
    import net.baiqu.storm.trident.state.OperateMongoState;
    import net.baiqu.storm.trident.state.OperateMongoStateFactory;
    import net.baiqu.storm.utils.Constants;
    import org.apache.commons.lang.StringUtils;
    import org.apache.storm.mongodb.common.mapper.MongoMapper;
    import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper;
    import org.apache.storm.mongodb.trident.state.MongoState;
    import org.apache.storm.mongodb.trident.state.MongoStateFactory;
    import org.apache.storm.trident.state.StateFactory;
    
    /**
     * <p>
     * Copyright: Copyright (c) 2018/9/10 14:56
     * <p>
     * Company: 百趣
     * <p>
     * trident mongo 工厂类
     * @author tangzhe
     * @version 1.0.0
     */
    public class TridentMongoFactory {
    
        public static final String URL = "mongodb://" + Constants.MONGODB_USERNAME + ":"
                + Constants.MONGODB_PASSWORD.replace("@", "%40")
                + "@" + Constants.MONGODB_HOSTS + ":" + Constants.MONGODB_PORT + "/"
                + Constants.MONGODB_DATABASE + "?connectTimeoutMS=" + Constants.MONGODB_TIMEOUT;
    
        public static final String URL2 = "mongodb://" + Constants.MONGODB_HOSTS + ":" + Constants.MONGODB_PORT + "/"
                + Constants.MONGODB_DATABASE + "?connectTimeoutMS=" + Constants.MONGODB_TIMEOUT;
    
        public static final String OPERATE_LOG_DB = "operate_log";
    
        /**
         * 使用自带state实现插入mongo
         */
        public static StateFactory getMongoInsertState() {
            String url = getUrl();
    
            MongoMapper mapper = new SimpleMongoMapper()
                    .withFields("appId", "host", "requestURI", "method", "ip", "requestTime",
                            "userId", "username", "role", "memo", "logDate");
    
            MongoState.Options options = new MongoState.Options()
                    .withUrl(url)
                    .withCollectionName(OPERATE_LOG_DB)
                    .withMapper(mapper);
    
            return new MongoStateFactory(options);
        }
    
        /**
         * 自定义state实现更新mongo
         */
        public static StateFactory getMongoUpdateState() {
            String url = getUrl();
            MongoMapper mapper = new SimpleMongoMapper()
                    .withFields("appId", "host", "requestURI", "method", "ip", "requestTime",
                            "userId", "username", "role", "memo", "logDate");
    
            OperateMongoState.Options options = new OperateMongoState.Options()
                    .withUrl(url)
                    .withCollectionName(OPERATE_LOG_DB)
                    .withMapper(mapper);
    
            return new OperateMongoStateFactory(options);
        }
    
        /**
         * 获取mongo url
         */
        private static String getUrl() {
            String url;
            if (StringUtils.isNotBlank(Constants.MONGODB_USERNAME)) {
                url = URL;
            } else {
                url = URL2;
            }
            return url;
        }
    
    }
  • 相关阅读:
    如何使用pip安装PythonMySQLdb模块?
    Linux:信号(1):signal函数、pause函数、alarm函数
    python字符串前面加上'r'的作用
    在LINUX中 用Ctrl+z挂起的命令怎么切回到原任务的命令窗口
    A*寻路初探 GameDev.net
    [3d跑酷] Xcode5 打包 发布配置
    [cb]NGUI组件基类之 UIWidget
    [cb]Unity 项目架构
    使用Unity3D的50个技巧:Unity3D最佳实践
    Doxygen Tool For Unity
  • 原文地址:https://www.cnblogs.com/tangzhe/p/9621123.html
Copyright © 2011-2022 走看看