MongoDB是大数据技术中常用的NoSql型数据库,它提供的大量的查询、聚合等操作函数,对于大量查询的日志系统来说,该MongoDB是大数据日志存储的福音。Storm的高级编程技术Trident,也提供了与Mongo集成的方法,但官方只提供了新增的处理,对于常用的修改操作并未提供接口,本文提供了一种使用Trident进行mongoDB修改操作的方式,并且对持久化的数据提供了输出的拓展操作,具体代码见下方:
import java.util.Objects; /** * <p> * Date-Time: 2018/09/05 15:14 * Company: 百趣 * </p> * 请求类型枚举 * * @author fangyuanjie * @version 1.0.0 */ public enum MethodTypeEnum { // GET请求 GET("GET", "GET请求"), // POST请求 POST("POST", "POST请求"); private String code; private String desc; public String getCode() { return code; } public void setCode(String code) { this.code = code; } public String getDesc() { return desc; } public void setDesc(String desc) { this.desc = desc; } MethodTypeEnum(String code, String desc) { this.code = code; this.desc = desc; } public static MethodTypeEnum getByCode(String code) { for (MethodTypeEnum methodTypeEnum : values()) { if (Objects.equals(methodTypeEnum.getCode(), code)) { return methodTypeEnum; } } return null; } }import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; /** * <p> * Copyright: Copyright (c) 2018/9/10 14:28 * <p> * Company: 百趣 * <p> * 格式过滤 * @author tangzhe * @version 1.0.0 */ public class FormatFilter extends BaseFilter { @Override public boolean isKeep(TridentTuple tuple) { String message = tuple.getStringByField("str"); System.out.println(this.getClass().getSimpleName() + "->message:" + message); if (StringUtils.isBlank(message)) { System.out.println(this.getClass().getSimpleName() + ": 消息不能为空!"); return false; } JSONObject jsonObject; try { jsonObject = JSONObject.parseObject(message); } catch (Exception e) { System.out.println(this.getClass().getSimpleName() + ": 消息格式有误!"); return false; } if (jsonObject.getLong("reqTime") == null || jsonObject.getJSONObject("headers") == null || jsonObject.getString("reqURI") == null) { System.out.println(this.getClass().getSimpleName() + ": 请求信息不能为空!"); return false; } try { jsonObject.getJSONObject("headers"); jsonObject.getJSONObject("uriArgs"); jsonObject.getJSONObject("bodyData"); } catch (JSONException e) { System.out.println(this.getClass().getSimpleName() + ": 请求信息格式有误!"); return false; } return true; } }import com.alibaba.fastjson.JSONObject; import net.baiqu.storm.trident.enums.MethodTypeEnum; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Values; import java.util.Date; /** * <p> * Copyright: Copyright (c) 2018/9/10 14:34 * <p> * Company: 百趣 * <p> * 日志解析函数 * @author tangzhe * @version 1.0.0 */ public class OperateLogParseFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String message = tuple.getStringByField("str"); JSONObject jsonObject = JSONObject.parseObject(message); System.out.println(this.getClass().getSimpleName() + "->message: " + message); JSONObject headers = jsonObject.getJSONObject("headers"); JSONObject uriArgs = null; String method = jsonObject.getString("method"); if (MethodTypeEnum.GET.getCode().equals(method)) { uriArgs = jsonObject.getJSONObject("uriArgs"); } else if (MethodTypeEnum.POST.getCode().equals(method)) { uriArgs = jsonObject.getJSONObject("bodyData"); } uriArgs = uriArgs != null ? uriArgs : new JSONObject(); String appId = jsonObject.getString("appId"); String userId = uriArgs.getString("userId"); String ip = jsonObject.getString("ip"); String host = headers.getString("host"); String requestURI = jsonObject.getString("reqURI"); String username = uriArgs.getString("username"); String role = uriArgs.getString("role"); String memo = uriArgs.getString("memo"); Date requestTime = new Date(jsonObject.getLong("reqTime") * 1000); collector.emit(new Values(appId, host, requestURI, method, ip, requestTime, userId, username, role, memo, new Date())); } }import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; /** * <p> * Copyright: Copyright (c) 2018/9/10 16:33 * <p> * Company: 百趣 * <p> * 结果记录函数 * @author tangzhe * @version 1.0.0 */ public class OperatePrintFunction extends BaseFunction { @Override public void execute(TridentTuple input, TridentCollector collector) { String result = input.getStringByField("result"); if ("success".equalsIgnoreCase(result)) { System.out.println(this.getClass().getSimpleName() + "->: 插入mongo成功"); } else { System.out.println(this.getClass().getSimpleName() + "->: 插入mongo失败"); } } }import org.apache.storm.mongodb.trident.state.MongoState; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Values; import java.util.List; /** * <p> * Copyright: Copyright (c) 2018/9/10 16:29 * <p> * Company: 百趣 * <p> * * @author tangzhe * @version 1.0.0 */ public class MyMongoStateUpdater extends BaseStateUpdater<MongoState> { @Override public void updateState(MongoState state, List<TridentTuple> tuples, TridentCollector collector) { try { state.updateState(tuples, collector); collector.emit(new Values("success")); } catch (Exception e) { e.printStackTrace(); collector.emit(new Values("fail")); } } }import com.google.common.collect.Lists; import com.mongodb.client.model.Filters; import org.apache.commons.lang.Validate; import org.apache.storm.mongodb.common.MongoDBClient; import org.apache.storm.mongodb.common.mapper.MongoMapper; import org.apache.storm.mongodb.trident.state.MongoState; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.List; import java.util.Map; /** * <p> * Date-Time: 2018/09/10 13:50 * Company: 百趣 * </p> * * @author tangzhe * @version 1.0.0 */ public class OperateMongoState implements State { private static final Logger LOG = LoggerFactory.getLogger(MongoState.class); private OperateMongoState.Options options; private MongoDBClient mongoClient; private Map map; protected OperateMongoState(Map map, OperateMongoState.Options options) { this.options = options; this.map = map; } public static class Options implements Serializable { private String url; private String collectionName; private MongoMapper mapper; public OperateMongoState.Options withUrl(String url) { this.url = url; return this; } public OperateMongoState.Options withCollectionName(String collectionName) { this.collectionName = collectionName; return this; } public OperateMongoState.Options withMapper(MongoMapper mapper) { this.mapper = mapper; return this; } } protected void prepare() { Validate.notEmpty(options.url, "url can not be blank or null"); Validate.notEmpty(options.collectionName, "collectionName can not be blank or null"); Validate.notNull(options.mapper, "MongoMapper can not be null"); this.mongoClient = new MongoDBClient(options.url, options.collectionName); } @Override public void beginCommit(Long txid) { LOG.debug("beginCommit is noop."); } @Override public void commit(Long txid) { LOG.debug("commit is noop."); } public void updateState(List<TridentTuple> tuples, TridentCollector collector) { List<Document> documents = Lists.newArrayList(); for (TridentTuple tuple : tuples) { Document document = options.mapper.toDocument(tuple); documents.add(document); } this.mongoClient.update( Filters.eq("logDate", tuples.get(0).getStringByField("logDate")), new Document("$set", documents.get(0)), true); } }import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; import java.util.Map; /** * <p> * Date-Time: 2018/09/10 13:50 * Company: 百趣 * </p> * * @author tangzhe * @version 1.0.0 */ public class OperateMongoStateFactory implements StateFactory { private OperateMongoState.Options options; public OperateMongoStateFactory(OperateMongoState.Options options) { this.options = options; } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { OperateMongoState state = new OperateMongoState(conf, options); state.prepare(); return state; } } package net.baiqu.storm.trident.state; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.BaseStateUpdater; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Values; import java.util.List; /** * <p> * Date-Time: 2018/09/10 13:50 * Company: 百趣 * </p> * * @author tangzhe * @version 1.0.0 */ public class OperateMongoStateUpdater extends BaseStateUpdater<OperateMongoState> { @Override public void updateState(OperateMongoState state, List<TridentTuple> tuples, TridentCollector collector) { state.updateState(tuples, collector); String userId = tuples.get(0).getStringByField("userId"); collector.emit(new Values(userId)); } } package net.baiqu.storm.trident.topology; import kafka.api.OffsetRequest; import net.baiqu.storm.trident.filter.FormatFilter; import net.baiqu.storm.trident.function.OperateLogParseFunction; import net.baiqu.storm.trident.function.OperatePrintFunction; import net.baiqu.storm.trident.state.MyMongoStateUpdater; import net.baiqu.storm.trident.util.TridentMongoFactory; import net.baiqu.storm.utils.Constants; import org.apache.commons.lang.StringUtils; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.StringScheme; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.kafka.trident.TransactionalTridentKafkaSpout; import org.apache.storm.kafka.trident.TridentKafkaConfig; import org.apache.storm.spout.SchemeAsMultiScheme; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.tuple.Fields; /** * <p> * Date-Time: 2018/09/10 13:50 * Company: 百趣 * </p> * * @author tangzhe * @version 1.0.0 */ public class OperateLogTridentTopology { public static void main(String[] args) { TridentTopology topology = new TridentTopology(); BrokerHosts hosts = new ZkHosts(Constants.ZK_HOSTS); String topic = Constants.KAFKA_LOG_TOPIC; String zkRoot = Constants.ZK_KAFKA_ROOT; String id = Constants.KAFKA_SPOUT_ID; TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(hosts, topic, id); kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // demo模式设置读取偏移量的操作 if (StringUtils.equalsIgnoreCase("demo", Constants.MODE)) { kafkaConfig.startOffsetTime = OffsetRequest.LatestTime(); } TransactionalTridentKafkaSpout kafkaSpout = new TransactionalTridentKafkaSpout(kafkaConfig); Stream stream = topology.newStream("kafkaSpout", kafkaSpout).parallelismHint(1); stream.shuffle().each(new Fields("str"), new FormatFilter()) .parallelismHint(1) .shuffle().each(new Fields("str"), new OperateLogParseFunction(), new Fields("appId", "host", "requestURI", "method", "ip", "requestTime", "userId", "username", "role", "memo", "logDate")) .parallelismHint(1) .partitionPersist(TridentMongoFactory.getMongoInsertState(), new Fields("appId", "host", "requestURI", "method", "ip", "requestTime", "userId", "username", "role", "memo", "logDate"), new MyMongoStateUpdater(), new Fields("result")) .parallelismHint(1) .newValuesStream().shuffle().each( new Fields("result"), new OperatePrintFunction(), new Fields("none")) .parallelismHint(1); Config config = new Config(); if (StringUtils.equalsIgnoreCase("demo", Constants.MODE)) { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("operateLogTridentTopology", config, topology.build()); } else { config.setNumWorkers(1); config.put(Config.NIMBUS_HOST, Constants.NIMBUS_HOST); config.put(Config.NIMBUS_THRIFT_PORT, Constants.NIMBUS_THRIFT_PORT); config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 1); try { StormSubmitter.submitTopology(args[0], config, topology.build()); } catch (Exception e) { e.printStackTrace(); } } } } package net.baiqu.storm.trident.util; import net.baiqu.storm.trident.state.OperateMongoState; import net.baiqu.storm.trident.state.OperateMongoStateFactory; import net.baiqu.storm.utils.Constants; import org.apache.commons.lang.StringUtils; import org.apache.storm.mongodb.common.mapper.MongoMapper; import org.apache.storm.mongodb.common.mapper.SimpleMongoMapper; import org.apache.storm.mongodb.trident.state.MongoState; import org.apache.storm.mongodb.trident.state.MongoStateFactory; import org.apache.storm.trident.state.StateFactory; /** * <p> * Copyright: Copyright (c) 2018/9/10 14:56 * <p> * Company: 百趣 * <p> * trident mongo 工厂类 * @author tangzhe * @version 1.0.0 */ public class TridentMongoFactory { public static final String URL = "mongodb://" + Constants.MONGODB_USERNAME + ":" + Constants.MONGODB_PASSWORD.replace("@", "%40") + "@" + Constants.MONGODB_HOSTS + ":" + Constants.MONGODB_PORT + "/" + Constants.MONGODB_DATABASE + "?connectTimeoutMS=" + Constants.MONGODB_TIMEOUT; public static final String URL2 = "mongodb://" + Constants.MONGODB_HOSTS + ":" + Constants.MONGODB_PORT + "/" + Constants.MONGODB_DATABASE + "?connectTimeoutMS=" + Constants.MONGODB_TIMEOUT; public static final String OPERATE_LOG_DB = "operate_log"; /** * 使用自带state实现插入mongo */ public static StateFactory getMongoInsertState() { String url = getUrl(); MongoMapper mapper = new SimpleMongoMapper() .withFields("appId", "host", "requestURI", "method", "ip", "requestTime", "userId", "username", "role", "memo", "logDate"); MongoState.Options options = new MongoState.Options() .withUrl(url) .withCollectionName(OPERATE_LOG_DB) .withMapper(mapper); return new MongoStateFactory(options); } /** * 自定义state实现更新mongo */ public static StateFactory getMongoUpdateState() { String url = getUrl(); MongoMapper mapper = new SimpleMongoMapper() .withFields("appId", "host", "requestURI", "method", "ip", "requestTime", "userId", "username", "role", "memo", "logDate"); OperateMongoState.Options options = new OperateMongoState.Options() .withUrl(url) .withCollectionName(OPERATE_LOG_DB) .withMapper(mapper); return new OperateMongoStateFactory(options); } /** * 获取mongo url */ private static String getUrl() { String url; if (StringUtils.isNotBlank(Constants.MONGODB_USERNAME)) { url = URL; } else { url = URL2; } return url; } }