zoukankan      html  css  js  c++  java
  • storm-jdbc详解

                                        今天来说说Storm集成Jdbc是如何完成的,代码如下:

    写入数据:

    先来讲讲官方API:

    Map hikariConfigMap = Maps.newHashMap();
    hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    hikariConfigMap.put("dataSource.user","root");
    hikariConfigMap.put("dataSource.password","password");
    ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    
    String tableName = "user_details";//表名
    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
    
    JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                        .withTableName("user")
                                        .withQueryTimeoutSecs(30);
                                        Or//这里使用表名就不要用下面的SQL了,2选1
    JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                        .withInsertQuery("insert into user values (?,?)")
                                        .withQueryTimeoutSecs(30);                    

    如果storm元组的字段与你打算写入的数据库表中的列名称具有相同的名称。要使用 SimpleJdbcMapper。

    Map hikariConfigMap = Maps.newHashMap();
    hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    hikariConfigMap.put("dataSource.user","root");
    hikariConfigMap.put("dataSource.password","password");
    ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    String tableName = "user_details";
    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
    //以上是整个表直接加载进去,但是如果你的ID是自增的或者有其他值是默认的,则需要使用inster to sql语句了。

    例如,如果你的插入查询是Insert into user (user_id, user_name) values (?,?)

    List<Column> columnSchema = Lists.newArrayList(
        new Column("user_id", java.sql.Types.INTEGER),
        new Column("user_name", java.sql.Types.VARCHAR));
    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);

    读取数据就很简单了,我就不去讲解官方API,直接贴我写的代码了:

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    /**
     * @author cwc
     * @date 2018年5月31日  
     * @description:存储数据的spout,我的读与写共用的这一个spout
     * @version 1.0.0 
     */
    public class JdbcSpout extends BaseRichSpout {
    	private static final long serialVersionUID = 1L;
    	private SpoutOutputCollector collector;
    
    	@Override
    	public void nextTuple() {
    		try {
    			Thread.sleep(100);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		//这里因为时间原因,我就简写2个值,你可以自己造点别的类型的假数据跑跑
            this.collector.emit(new Values("peter",111));
            System.out.println("信息加载中---------------------");
    	}
    
    	@Override
    	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
    		this.collector =collector;
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("name", "age"));
    	}
    
    }
    import java.util.Map;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    
    /**
     * @author cwc
     * @date 2018年5月30日  
     * @description:打印拿到的数据
     * @version 1.0.0 
     */
    public class JdbcOutBolt extends BaseRichBolt{
    
    	private OutputCollector collector;
    	@Override
    	public void execute(Tuple tuple) {
    				String str =tuple.getString(0);
    				System.out.println(str);
    				
    	}
    
    	@Override
    	public void prepare(Map arg0, TopologyContext arg1, OutputCollector collector) {
    		// TODO Auto-generated method stub
    		this.collector=collector;
    	}
    
    	@Override
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    		declarer.declare(new Fields("JdbcOutBolt"));
    	}
    	
    	
    }
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
    import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
    import org.apache.storm.jdbc.common.Column;
    import org.apache.storm.jdbc.common.ConnectionProvider;
    import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
    import org.apache.storm.jdbc.mapper.JdbcMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
    import java.sql.Types;
    import com.google.common.collect.Lists;
    /**
     * @author cwc
     * @date 2018年5月31日  
     * @description:
     * @version 1.0.0 
     */
    public class JdbcMain {
    	public static void main(String[] args){
    		
    			Map<String,Object> hikariConfigMap = new HashMap<String, Object>(){{
    			put("dataSourceClassName", "com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    			put("dataSource.url", "jdbc:mysql://localhost:3306/mytest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai");
    			put("dataSource.user", "root");
    			put("dataSource.password", "0992");}};
    			ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    			
    			JdbcWrite( connectionProvider);
    //			JdbcRead( connectionProvider);
    	}
    	/**
    	 * 写
    	 * @param connectionProvider
    	 */
    	public static void JdbcWrite(ConnectionProvider connectionProvider){
    		List<Column> columnSchema = Lists.newArrayList(
    				new Column("name", java.sql.Types.VARCHAR),
    			    new Column("age", java.sql.Types.INTEGER));
    			JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
    			JdbcInsertBolt PersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                        .withInsertQuery(" insert into storms(name,age) values (?,?) ")
                        .withQueryTimeoutSecs(30);   
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("jdbc-save", new JdbcSpout(), 2);
    		builder.setBolt("save",  PersistanceBolt, 1).shuffleGrouping("jdbc-save");
    		Config conf = new Config();
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("test", conf, builder.createTopology());
    	}
    	/**
    	 * 读
    	 * @param connectionProvider
    	 */
    	public static void JdbcRead(ConnectionProvider connectionProvider){
    		Fields outputFields = new Fields("name", "age");
    		List<Column> queryParamColumns = Lists.newArrayList(new Column("age", Types.INTEGER));
    		String selectSql = "select name,age from storms where age = ?";
    		SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    		JdbcLookupBolt JdbcLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
    		        .withQueryTimeoutSecs(30);
    		
    		TopologyBuilder builder = new TopologyBuilder();
    		builder.setSpout("jdbc-reader", new JdbcSpout(), 2);
    		builder.setBolt("read",  JdbcLookupBolt, 1).shuffleGrouping("jdbc-reader");
    		builder.setBolt("JdbcOutBolt",new JdbcOutBolt(), 1).shuffleGrouping("");
    		Config conf = new Config();
    		LocalCluster cluster = new LocalCluster();
    		cluster.submitTopology("test", conf, builder.createTopology());
    	}
    }

    我spout共用的一个spout哈,大家注意下。

    我这节写的比较简单,大家想看看深度,可以去看看这篇博客,博主写的比较好 ~~

    https://blog.csdn.net/yidan7063/article/details/79147692

    有事没事看了来个评论,共勉共勉。。







    深夜码文不易,若对看官有帮助,望看官可以在右侧打赏。
  • 相关阅读:
    Spring Boot整合RabbitMQ
    程序员都在用的 IDEA 插件(不断更新)
    Skywalking03:Skywalking本地调试
    Skywalking01:Skywalking介绍
    Skywalking05:在Skywalking RocketBot上添加监控图表
    Skywalking04:扩展Metric监控信息
    文档,不仅仅是文档。
    pyecharts的绘图原理详解
    JS的新趋势State of JS 2020(来自网络)
    花半秒钟看透事物本质
  • 原文地址:https://www.cnblogs.com/wanchen-chen/p/12934129.html
Copyright © 2011-2022 走看看