zoukankan      html  css  js  c++  java
  • Strom 与mysql整合+与hdfs整合

    Storm 与mysql整合

    代码目录结构如下:

    DataSpout

    
    
    package com.gec.demo.stormToMysql;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.Map;
    import java.util.Random;

    public class DataSpout extends BaseRichSpout {
    private static String[] words=new String[]{
    "hadoop","yarn","mapreduce"
    };
    private static int _id=0;
    private SpoutOutputCollector collector;
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    this.collector=spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
    this.collector.emit(new Values(_id++,words[new Random().nextInt(words.length)]));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("_id","word"));
    }
    }

    
    
     
    JdbcInsertBoltMain

    
    
    package com.gec.demo.stormToMysql;

    import com.google.common.collect.Maps;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
    import org.apache.storm.jdbc.common.ConnectionProvider;
    import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
    import org.apache.storm.jdbc.mapper.JdbcMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    import org.apache.storm.topology.TopologyBuilder;

    import java.util.Map;

    public class JdbcInsertBoltMain {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
    Map hikariConfigMap = Maps.newHashMap();
    hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    hikariConfigMap.put("dataSource.url", "jdbc:mysql://hadoop-001:3306/storm_db");
    hikariConfigMap.put("dataSource.user","root");
    hikariConfigMap.put("dataSource.password","123456");
    ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    String tableName="t_word";
    JdbcMapper simpleJdbcMapper=new SimpleJdbcMapper(tableName,connectionProvider);
    JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
    .withInsertQuery("insert into t_word values (?,?)")
    .withQueryTimeoutSecs(30);


    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("dataspout",new DataSpout());
    //添加我们storm与jdbc整合的bolt
    builder.setBolt("userPersistanceBolt" ,userPersistanceBolt).localOrShuffleGrouping("dataspout");

    Config config = new Config();
    if(null != args && args.length >0){
    StormSubmitter.submitTopology(args[0],config,builder.createTopology());
    }else{
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("JdbcInsertBoltMain",config,builder.createTopology());
    }
    }
    }

    效果图如下:
     
    
    
    
    
    

    Storm 与hdfs整合

    代码目录结构如下:

    DataSpout

    package com.gec.demo.stormtohdfs;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.Map;
    import java.util.Random;

    public class DataSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private static String datas[]=new String[]{
    "hello","world","java","hadoop"
    };


    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

    this.collector=collector;

    }

    /*
    * 循环调用
    * */
    @Override
    public void nextTuple() {

    //生成此数据
    String data=datas[new Random().nextInt(datas.length)];
    //发送数据到下游组件
    collector.emit(new Values(data));

    }

    /**
    * 此方法是对发送数据进行声明
    * */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    declarer.declare(new Fields("word"));

    }
    }

    
    
    StromHdfsMain

    package com.gec.demo.stormtohdfs;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.hdfs.bolt.HdfsBolt;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.RecordFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Tuple;

    public class StromHdfsMain {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
    // use "|" instead of "," for field delimiter
    RecordFormat format=new DelimitedRecordFormat()
    .withFieldDelimiter("|");
    /**
    * 文件的控制策略,使用两种方式,第一种:数据条数的多少
    * 第二种:文件的内容大小
    */
    // sync the filesystem after every 1k tuples
    SyncPolicy syncPolicy=new CountSyncPolicy(1000);
    //rotate files when they reach 5 MB
    FileRotationPolicy rotationPolicy=new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB) ;
    FileNameFormat fileNameFormat=new DefaultFileNameFormat()
    .withPath("/stormToHdfs/");
    HdfsBolt hdfsBolt=new HdfsBolt()
    .withFsUrl("hdfs://hadoop-001:9000")
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(format)
    .withRotationPolicy(rotationPolicy)
    .withSyncPolicy(syncPolicy);
    TopologyBuilder topologyBuilder=new TopologyBuilder();
    topologyBuilder.setSpout("dataSpout",new DataSpout());
    topologyBuilder.setBolt("hdfsBolt",hdfsBolt).localOrShuffleGrouping("dataSpout");
    Config config = new Config();
    if(args !=null && args.length >0){
    StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
    }else{
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("stormToHdfs",config,topologyBuilder.createTopology());

    }
    }
    }

    
    

    效果图如下:

  • 相关阅读:
    Silverlight MIS管理系统
    微软计划将Silverlight移植到机顶盒
    Silverlight VS2010下的RIA开发活动整站
    【原创】展望Silverlight 5.0新版本更新与发展
    Silverlight DeepGrid控件
    js的嵌套函数与闭包函数
    (学)仔细想想、其实有更好的解决办法
    (转变ing)工作内容变化ing
    (转)动态调用WCF
    IIS 异常 “System.OutOfMemoryException”、“存储空间不足,无法完成此操作。”
  • 原文地址:https://www.cnblogs.com/Transkai/p/10909090.html
Copyright © 2011-2022 走看看