zoukankan      html  css  js  c++  java
  • storm_jdbc 最完整的版本

    开头:我这里是根据bolt与trident进行分类的,写入和读取的方法可能会在同一个类中,最后会展示一个测试的类来说明怎么用。

    JdbcSpout:这个类是我写入数据和读取数据的公用spout,细节注释里说的比较详细。

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import com.google.common.collect.Lists;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    /**
     * @author cwc
     * @date 2018年5月31日  
     * @description:存储数据的spout,我的读与写共用的这一个spout,用于测试
     * @version 1.0.0 
     */
    public class JdbcSpout extends BaseRichSpout {
    	public static Random random =new Random();
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector collector;
    	//模拟数据
    	public static final List<Values> rows = Lists.newArrayList(
    	            new Values("peter",random.nextInt(80),1),
    	            new Values("bob",random.nextInt(60),2),
    	            new Values("alice",random.nextInt(100),2));
    
    	@Override
    	public void nextTuple() {
    		  Random rand = new Random();
    	      Values row = rows.get(rand.nextInt(rows.size() - 1));
    	      
    //	      this.collector.emit(new Values("bob"));//用于占位符查询的字段
    	      this.collector.emit(row);//用于存储写入
    	      System.out.println(row);
    	      Thread.yield();
    	}
    
    	
    	@Override
    	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
    		this.collector =collector;
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		 declarer.declare(new Fields("name","age","sex"));//用于存储写入
    //		 declarer.declare(new Fields("name"));//用于占位符查询的字段
    	}
    
    }
    

      

    Jdbc_bolt类:注意看注释

    import java.util.List;
    import java.util.Objects;
    
    import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
    import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
    import org.apache.storm.jdbc.common.Column;
    import org.apache.storm.jdbc.common.ConnectionProvider;
    import org.apache.storm.jdbc.mapper.JdbcMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    import org.apache.storm.tuple.Fields;
    
    import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils;
    
    import parquet.org.slf4j.Logger;
    import parquet.org.slf4j.LoggerFactory;
    
    /**
     * @author cwc
     * @date 2018年9月28日  
     * @version 1.0.0 
     * @description:jdbc对数据库的操作 
     *                 向jdbc中写入数据,分别由sql写入和全表写入两种bolt方式
     *                 jdbc通过字段与sql语句占位符的方式查询数据
     */
    public class JdbcOperationBolt {
        private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider();
        
        private static Logger logger = LoggerFactory.getLogger(JdbcOperationBolt.class);
        
        /**
         * jdbc 根据字段向数据库写入数据
         * 传入两个参数,根据占位符sql插入数据 
         * @param columnSchema 列名
         * @param sqlString sql
         * @return
         */
        public static JdbcInsertBolt getInsertBolt(List<Column> columnSchema,String sqlString){
            if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){
                JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
                JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper)
                        .withInsertQuery(sqlString)
                        .withQueryTimeoutSecs(30);  
                return PersistanceBolt;
            }
            logger.error("列名或sql语句不能为空!");
            return null;
        }
        
        
        /**
         * jdbc 根据表名向数据库写入数据
         * 传一个表名参数,进入全表写入
         * 注意,storm中传入的
         * @param tableName 表名
         * @return
         */
        public static JdbcInsertBolt getInsertBolt(String tableName){
            if(tableName!=null||!Objects.equals(tableName, "")){
                JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider());
                JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(cp, simpleJdbcMapper)
                        .withTableName(tableName)
                        .withQueryTimeoutSecs(30);  
                return PersistanceBolt;
            }
            logger.error("表名不能为空!");
            return null;
        }
        
        /**
         * jdbc 读取数据
         * 根据sql与列名读取数据库数据
         * @param outputFields 声明要输出的字段
         * @param queryParamColumns 传入占位符的字段
         * @param sqlString 查询sql
         * @return
         */
        public static JdbcLookupBolt getJdbcLookupBolt(Fields outputFields,List<Column> queryParamColumns,String sqlString){
            if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){
                SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
                JdbcLookupBolt JdbcLookupBolt = new JdbcLookupBolt(cp, sqlString, lookupMapper)
                        .withQueryTimeoutSecs(30);
                return JdbcLookupBolt;
            }
            logger.error("输出字段,输入字段集合,sql查询语句都不能为空!");
            return null;
        }
    }

    我将上面获取数据库连接的代码单独贴出来,因为封装的比较深。

    /**
         * 获取Jdbc需要得ConnectionProvider相关配置
         * @return
         */
        public static ConnectionProvider getConnectionProvider(){
            Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{
                put("dataSourceClassName", JdbcClassName);
                put("dataSource.url", JdbcdbUrl);
                put("dataSource.user", JdbcUserName);
                put("dataSource.password", JdbcPassWord);}};
                ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
            return connectionProvider;
        }

    Jdbc_trident 类

    import java.util.List;
    import java.util.Objects;
    
    import org.apache.storm.jdbc.common.Column;
    import org.apache.storm.jdbc.common.ConnectionProvider;
    import org.apache.storm.jdbc.mapper.JdbcMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    import org.apache.storm.jdbc.trident.state.JdbcState;
    import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
    import org.apache.storm.tuple.Fields;
    
    import com.sunsheen.jfids.bigdata.storm.common.CustomConnectionUtils;
    import parquet.org.slf4j.Logger;
    import parquet.org.slf4j.LoggerFactory;
    
    /**
     * @author cwc
     * @date 2018年9月28日  
     * @version 1.0.0 
     * @description:jdbc Trident 类
     */
    public class JdbcTridentStates {
    
        private static ConnectionProvider cp=CustomConnectionUtils.getConnectionProvider();
        
        private static Logger logger = LoggerFactory.getLogger(JdbcTridentStates.class);
        
        /**
         * jdbc Trident 根据字段向数据库写入数据
         * 传入两个参数,根据占位符sql插入数据 
         * @param columnSchema 列名
         * @param sqlString sql
         * @return
         */
        public static JdbcStateFactory getJdbcStateFactory (List<Column> columnSchema,String sqlString){
            if((columnSchema!=null||columnSchema.size()>0) && (sqlString!=null||!Objects.equals(sqlString, ""))){
                JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
                JdbcState.Options options = new JdbcState.Options()
                        .withConnectionProvider(cp)
                        .withMapper(simpleJdbcMapper)
                        .withInsertQuery(sqlString)
                        .withQueryTimeoutSecs(200);
                JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options);
                return jdbcStateFactory;
            }
            logger.error("列名或sql为空!");
            return null;
        }
        
        /**
         * jdbc Trident 根据表名向数据库写入数据
         * 传一个表名参数,进入全表写入
         * 注意,storm中传入的
         * @param tableName 表名
         * @return
         */
        public static JdbcStateFactory getJdbcStateFactory(String tableName){
            if(tableName!=null||!Objects.equals(tableName, "")){
                JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName,CustomConnectionUtils.getConnectionProvider());
                JdbcState.Options options = new JdbcState.Options()
                        .withConnectionProvider(cp)
                        .withMapper(simpleJdbcMapper)
                        .withTableName(tableName)
                        .withQueryTimeoutSecs(200);
                JdbcStateFactory jdbcStateFactory =new JdbcStateFactory(options);
                return jdbcStateFactory;
            }
            logger.error("表名为空!");
            return null;
        }
        
        /**
         * jdbc Trident 读取数据
         * @param outputFields 输出列表
         * @param queryParamColumns    占位符字段
         * @param sqlString 查询语句
         * @return
         */
        public static JdbcStateFactory getJdbcSelectState(Fields outputFields,List<Column> queryParamColumns,String sqlString){
            if(outputFields!=null&&queryParamColumns!=null&&sqlString!=null&&outputFields.size()>0&&queryParamColumns.size()>0&&Objects.equals(sqlString,"")){
                SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
                JdbcState.Options options = new JdbcState.Options()
                        .withConnectionProvider(cp)
                        .withJdbcLookupMapper(lookupMapper)
                        .withSelectQuery(sqlString)
                        .withQueryTimeoutSecs(30);
                JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
                return jdbcStateFactory;
            }
            logger.error("输出字段,输入字段集合,sql查询语句都不能为空!");
            return null;
        }
        
    }

    测试类:

    import java.util.List;
    import java.util.UUID;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
    import org.apache.storm.jdbc.common.Column;
    import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
    import org.apache.storm.jdbc.trident.state.JdbcUpdater;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.trident.Stream;
    import org.apache.storm.trident.TridentState;
    import org.apache.storm.trident.TridentTopology;
    import org.apache.storm.tuple.Fields;
    import java.sql.Types;
    import com.google.common.collect.Lists;
    import com.sunsheen.jfids.bigdata.storm.bolt.JdbcOperationBolt;
    import com.sunsheen.jfids.bigdata.storm.trident.JdbcTridentStates;
    /**
     * @author cwc
     * @date 2018年9月31日  
     * @description: storm集成Jdbc读写测试类
     * @version 3.0.0 
     */
    public class JdbcMain {
        
        private static String insterSql=" insert into jdbc_test(name,age,sex) values (?,?,?) ";
        private static String selectSql="select age,sex from jdbc_test where name = ?";
        private static String tableName="jdbc_test";
        private static Fields outputFields = new Fields("age", "sex");//就是查询出的数据
        
        private static List<Column> queryParamColumns = Lists.newArrayList(new Column("name", Types.VARCHAR));//占位符的字段
        
        private static List<Column> columnSchema = Lists.newArrayList(
                new Column("name", java.sql.Types.VARCHAR),
                new Column("age", java.sql.Types.INTEGER),
                 new Column("sex", java.sql.Types.INTEGER));
        
        
        public static void main(String[] args){
                JdbcWrite(columnSchema,insterSql,tableName);
                JdbcTrident(columnSchema,insterSql,tableName);
                JdbcRead(outputFields,queryParamColumns,selectSql);
                JdbcReadTrident(outputFields,queryParamColumns,selectSql);
        }
        
        /**
         * 通过jdbc的方式向数据库写数据
         * @param connectionProvider 连接数据库
         * @param columnSchema 需要插入的列名
         * @param sqlString 配合列名进行字段插入
         * @param tableName 通过表名整表插入
         */
        public static void JdbcWrite(List<Column> columnSchema,String sqlString,String tableName){
            Config conf = new Config();
            
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("jdbc-save", new JdbcSpout(), 2);
            builder.setBolt("save", JdbcOperationBolt.getInsertBolt(tableName), 1).shuffleGrouping("jdbc-save");//getInsertBolt根据参数的不同,切换字段或全表插入的模式
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            cluster.shutdown();
        }
        
        /**
         * 通过jdbc Trident的方式向数据库写数据
         * @param connectionProvider 连接数据库
         * @param columnSchema 需要插入的列名
         * @param sqlString 配合列名进行字段插入
         * @param tableName 通过表名整表插入
         */
        public static void  JdbcTrident(List<Column> columnSchema,String sqlString,String tableName){
            TridentTopology topology = new TridentTopology();
            Config config = new Config();
            
    //        JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(columnSchema, insterSql);//字段插入
            JdbcStateFactory jdbcStateFactory=JdbcTridentStates.getJdbcStateFactory(tableName);
            
            Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout());
            TridentState state = topology.newStaticState(jdbcStateFactory);
            
            //将数据更新插入数据库  jdbcStateFactory 根据设置的表名更新到对应的数据库 批处理 一批一批的插入
            stream.partitionPersist(jdbcStateFactory, new Fields("name", "age","sex"), new JdbcUpdater(), new Fields());
           
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(UUID.randomUUID().toString(), config, topology.build());
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }//这个时为了防止你忘记关闭程序,造成内存爆炸,但是不要设置时间太小,太小程序没跑完就终止了,要报错。
            cluster.shutdown();
        }
        
        /**
         * 读数据
         * @param connectionProvider
         */
        public static void JdbcRead(Fields outputFields,List<Column> queryParamColumns,String selectSql){
            JdbcLookupBolt JdbcLookupBolt = JdbcOperationBolt.getJdbcLookupBolt(outputFields, queryParamColumns, selectSql);
            
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("jdbc-reader", new JdbcSpout(), 2);
            builder.setBolt("read",  JdbcLookupBolt, 1).shuffleGrouping("jdbc-reader");
            builder.setBolt("JdbcOutBolt",new JdbcOutBolt(), 1).shuffleGrouping("read");
            Config conf = new Config();
            
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(UUID.randomUUID().toString(), conf, builder.createTopology());
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }//这个时为了防止你忘记关闭程序,造成内存爆炸,但是不要设置时间太小,太小程序没跑完就终止了,要报错。
                cluster.shutdown();        
        }
        
        /**
         * jdbc Trident 查询数据 
         * @param outputFields 要输出传递的字段,这里的字段是storm中随便命名的不是数据库字段
         * @param queryParamColumns 占位符的字段,也就是spout传出过来的字段,通过该字段查询数据
         * @param selectSql 查询语句,这里sql已经把字段名固定了,上面的字段名都是形参用于传输
         */
        public static void JdbcReadTrident(Fields outputFields,List<Column> queryParamColumns,String selectSql){
            TridentTopology topology = new TridentTopology();
            JdbcStateFactory jdbcStateFactory = JdbcTridentStates.getJdbcSelectState(outputFields, queryParamColumns, selectSql);
            
            Stream stream = topology.newStream(UUID.randomUUID().toString(), new JdbcSpout());
            TridentState state = topology.newStaticState(jdbcStateFactory);
    //         stream.partitionPersist(jdbcStateFactory, outputFields, new JdbcUpdater(),outputFields);//这里可以根据自己需要进行处理
            
            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology(UUID.randomUUID().toString(), conf, topology.build());
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }//这个时为了防止你忘记关闭程序,造成内存爆炸,但是不要设置时间太小,太小程序没跑完就终止了,要报错。
                cluster.shutdown();        
        }
        
        
    }

     补充:打印读取出来的数据

    import java.util.Map;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:打印拿到的数据
     * @version 1.0.0 
     */
    public class JdbcOutBolt extends BaseRichBolt{
    
        private OutputCollector collector;
        @Override
        public void execute(Tuple tuple) {
            
                    Object str =tuple.getValue(0);
                    Object str2 =tuple.getInteger(1);
                    System.out.println(str+"-->"+str2);
                    
        }
    
        @Override
        public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
            // TODO Auto-generated method stub
            this.collector=collector;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("JdbcOutBolt"));
        }
        
        
    }

    主要内容大家看代码就清楚了,有问题大家可以在我博客下留言。

  • 相关阅读:
    OC-字典
    作业
    block语法排序 遍历
    oc-NSArray
    oc之获取系统当前时间的方法
    修改mysql的默认字符集
    mysql查询结果添加序列号
    PHP Socket 编程过程详解
    一篇详细的 Mysql Explain 详解
    阿里云云主机挂载数据盘,格式化硬盘(新购云主机)(转)
  • 原文地址:https://www.cnblogs.com/wanchen-chen/p/9720301.html
Copyright © 2011-2022 走看看