使用事物TridentTopology 持久化数据到MySQL 1、构建拓扑JDBCTopology类 package storm.trident.mysql; import java.util.Arrays; import java.util.Map; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.trident.TridentState; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.CombinerAggregator; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.spout.IBatchSpout; import org.apache.storm.trident.state.StateType; import org.apache.storm.trident.testing.FixedBatchSpout; import org.apache.storm.trident.testing.MemoryMapState; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; /** * 事物Trident-MySQL Topology * @author mengyao * */ @SuppressWarnings("all") public class JDBCTopology { public static void main(String[] args) { TridentTopology topology = new TridentTopology(); //Spout数据源 FixedBatchSpout spout = new FixedBatchSpout(new Fields("tels"), 7, new Values("189111 3"), new Values("135111 7"), new Values("189111 2"), new Values("158111 5"), new Values("159111 6"), new Values("159111 3"), new Values("158111 5") ); spout.setCycle(false); //State持久化配置属性 JDBCStateConfig config = new JDBCStateConfig(); config.setDriver("com.mysql.jdbc.Driver"); config.setUrl("jdbc:mysql://localhost:3306/test"); config.setUsername("root"); config.setPassword("123456"); config.setBatchSize(10); config.setCacheSize(10); config.setType(StateType.TRANSACTIONAL); config.setCols("tel"); config.setColVals("sum"); config.setTable("tbl_tel"); topology.newStream("spout", spout) .each(new Fields("tels"), new KeyValueFun(), new Fields("tel", "money")) .groupBy(new Fields("tel")) .persistentAggregate(JDBCState.getFactory(config), new Fields("money"), new SumCombinerAgg(), new Fields("sum")); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test1", new Config(), topology.build()); } } @SuppressWarnings("all") class KeyValueFun extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String record = tuple.getString(0); collector.emit(new Values(record.split(" ")[0], record.split(" ")[1])); } } @SuppressWarnings("all") class SumCombinerAgg implements CombinerAggregator<Long> { @Override public Long init(TridentTuple tuple) { return Long.parseLong(tuple.getString(0)); } @Override public Long combine(Long val1, Long val2) { Long val = val1+val2; System.out.println(val); return val; } @Override public Long zero() { return 0L; } } 2、构建基于IBackingMap的JDBCState类 package storm.trident.mysql; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.storm.task.IMetricsContext; import org.apache.storm.trident.state.OpaqueValue; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; import org.apache.storm.trident.state.StateType; import org.apache.storm.trident.state.TransactionalValue; import org.apache.storm.trident.state.map.CachedMap; import org.apache.storm.trident.state.map.IBackingMap; import org.apache.storm.trident.state.map.NonTransactionalMap; import org.apache.storm.trident.state.map.OpaqueMap; import org.apache.storm.trident.state.map.TransactionalMap; @SuppressWarnings("all") public class JDBCState<T> implements IBackingMap<T> { private static JDBCStateConfig config; JDBCState(JDBCStateConfig config){ this.config = config; } @Override public List<T> multiGet(List<List<Object>> keys) { StringBuilder sqlBuilder = new StringBuilder("SELECT ").append(config.getCols()) .append(","+config.getColVals()) .append(",txid") .append(" FROM "+config.getTable()) .append(" WHERE ") .append(config.getCols()) .append("='"); JDBCUtil jdbcUtil = new JDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword()); List<Object> result = new ArrayList<Object>(); Map<String, Object> map = null; for (List<Object> list : keys) { Object key = list.get(0); map = jdbcUtil.queryForMap(sqlBuilder.toString()+key+"'"); System.out.println(sqlBuilder.toString()+key+"'"+" 【"+map); Bean itemBean = (Bean)map.get(key); long txid=0L; long val=0L; if (itemBean!=null) { val=itemBean.getSum(); txid=itemBean.getTxid(); } if (config.getType()==StateType.OPAQUE) { result.add(new OpaqueValue(txid, val)); } else if (config.getType()==StateType.TRANSACTIONAL) { result.add(new TransactionalValue(txid, val)); } else { result.add(val); } } return (List<T>) result; } @Override public void multiPut(List<List<Object>> keys, List<T> vals) { //构建新增SQL StringBuilder sqlBuilder = new StringBuilder("INSERT INTO ").append(config.getTable()) .append("("+config.getCols()) .append(","+config.getColVals()) .append(",txid") .append(",time") .append(") VALUES "); for (int i = 0; i < keys.size(); i++) { List<Object> key = keys.get(i); if (config.getType()==StateType.TRANSACTIONAL) { TransactionalValue val = (TransactionalValue)vals.get(i); sqlBuilder.append("("); sqlBuilder.append(key.get(0)); sqlBuilder.append(","); sqlBuilder.append(val.getVal()); sqlBuilder.append(","); sqlBuilder.append(val.getTxid()); sqlBuilder.append(",NOW()"); sqlBuilder.append("),"); } } sqlBuilder.setLength(sqlBuilder.length()-1); System.out.println(sqlBuilder.toString()); //新增数据 JDBCUtil jdbcUtil = new JDBCUtil(config.getDriver(),config.getUrl(),config.getUsername(),config.getPassword()); jdbcUtil.insert(sqlBuilder.toString()); } public static Factory getFactory(JDBCStateConfig config) { return new Factory(config); } static class Factory implements StateFactory { private static JDBCStateConfig config; public Factory(JDBCStateConfig config) { this.config = config; } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { final CachedMap map = new CachedMap(new JDBCState(config), config.getCacheSize()); System.out.println(config); if(config.getType()==StateType.OPAQUE) { return OpaqueMap.build(map); } else if(config.getType()==StateType.TRANSACTIONAL){ return TransactionalMap.build(map); }else { return NonTransactionalMap.build(map); } } } } 3、构建基于IBackingMap的JDBCStateConfig配置类 package storm.trident.mysql; import java.util.List; import org.apache.storm.trident.state.StateType; @SuppressWarnings("all") public class JDBCStateConfig { private String url; private String driver; private String username; private String password; private String table; private int batchSize; private String cols; private String colVals; private int cacheSize = 100; private StateType type = StateType.OPAQUE; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getDriver() { return driver; } public void setDriver(String driver) { this.driver = driver; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } public int getBatchSize() { return batchSize; } public void setBatchSize(int batchSize) { this.batchSize = batchSize; } public String getCols() { return cols; } public void setCols(String cols) { this.cols = cols; } public String getColVals() { return colVals; } public void setColVals(String colVals) { this.colVals = colVals; } public int getCacheSize() { return cacheSize; } public void setCacheSize(int cacheSize) { this.cacheSize = cacheSize; } public StateType getType() { return type; } public void setType(StateType type) { this.type = type; } @Override public String toString() { return "Test2StateConfig [url=" + url + ", driver=" + driver + ", username=" + username + ", password=" + password + ", table=" + table + ", batchSize=" + batchSize + ", cols=" + cols + ", colVals=" + colVals + ", cacheSize=" + cacheSize + ", type=" + type + "]"; } } 4、构建JDBC工具类和实体Bean package storm.trident.mysql; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; public class JDBCUtil { private String driver; private String url; private String username; private String password; private Connection connection; private PreparedStatement ps; private ResultSet rs; public JDBCUtil(String driver, String url, String username, String password) { this.driver = driver; this.url = url; this.username = username; this.password = password; init(); } void init(){ try { Class.forName(driver); } catch (ClassNotFoundException e) { e.printStackTrace(); } } public boolean insert(String sql) { int state = 0; try { connection = DriverManager.getConnection(url, username, password); ps = connection.prepareStatement(sql); state = ps.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } finally { try { ps.close(); connection.close(); } catch (SQLException e) { e.printStackTrace(); } } if (state>0) { return true; } return false; } public Map<String, Object> queryForMap(String sql) { Map<String, Object> result = new HashMap<String, Object>(); try { connection = DriverManager.getConnection(url, username, password); ps = connection.prepareStatement(sql); rs = ps.executeQuery(); if(rs.next()){ Bean iteBean=new Bean(rs.getString("tel"), rs.getLong("sum"), rs.getLong("txid"), null); result.put(rs.getString("tel"), iteBean); } } catch (SQLException e) { e.printStackTrace(); } finally { try { ps.close(); connection.close(); } catch (SQLException e) { e.printStackTrace(); } } return result; } public String getDriver() { return driver; } public void setDriver(String driver) { this.driver = driver; } public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } } package storm.trident.mysql; public class Bean { private String tel; private long sum; private long txid; private String time; public Bean(){ } public Bean(String tel, long sum, long txid, String time) { super(); this.tel = tel; this.sum = sum; this.txid = txid; this.time = time; } public String getTel() { return tel; } public void setTel(String tel) { this.tel = tel; } public long getSum() { return sum; } public void setSum(long sum) { this.sum = sum; } public long getTxid() { return txid; } public void setTxid(long txid) { this.txid = txid; } public String getTime() { return time; } public void setTime(String time) { this.time = time; } @Override public String toString() { return "Bean [tel=" + tel + ", sum=" + sum + ", txid=" + txid + ", time=" + time + "]"; } }