zoukankan      html  css  js  c++  java
  • Strom 与mysql整合+与hdfs整合

    Storm 与mysql整合

    代码目录结构如下:

    DataSpout

    
    
    package com.gec.demo.stormToMysql;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.Map;
    import java.util.Random;

    public class DataSpout extends BaseRichSpout {
    private static String[] words=new String[]{
    "hadoop","yarn","mapreduce"
    };
    private static int _id=0;
    private SpoutOutputCollector collector;
    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    this.collector=spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
    this.collector.emit(new Values(_id++,words[new Random().nextInt(words.length)]));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("_id","word"));
    }
    }

    
    
     
    JdbcInsertBoltMain

    
    
    package com.gec.demo.stormToMysql;

    import com.google.common.collect.Maps;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
    import org.apache.storm.jdbc.common.ConnectionProvider;
    import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
    import org.apache.storm.jdbc.mapper.JdbcMapper;
    import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    import org.apache.storm.topology.TopologyBuilder;

    import java.util.Map;

    public class JdbcInsertBoltMain {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
    Map hikariConfigMap = Maps.newHashMap();
    hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    hikariConfigMap.put("dataSource.url", "jdbc:mysql://hadoop-001:3306/storm_db");
    hikariConfigMap.put("dataSource.user","root");
    hikariConfigMap.put("dataSource.password","123456");
    ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);
    String tableName="t_word";
    JdbcMapper simpleJdbcMapper=new SimpleJdbcMapper(tableName,connectionProvider);
    JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
    .withInsertQuery("insert into t_word values (?,?)")
    .withQueryTimeoutSecs(30);


    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("dataspout",new DataSpout());
    //添加我们storm与jdbc整合的bolt
    builder.setBolt("userPersistanceBolt" ,userPersistanceBolt).localOrShuffleGrouping("dataspout");

    Config config = new Config();
    if(null != args && args.length >0){
    StormSubmitter.submitTopology(args[0],config,builder.createTopology());
    }else{
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("JdbcInsertBoltMain",config,builder.createTopology());
    }
    }
    }

    效果图如下:
     
    
    
    
    
    

    Storm 与hdfs整合

    代码目录结构如下:

    DataSpout

    package com.gec.demo.stormtohdfs;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    import java.util.Map;
    import java.util.Random;

    public class DataSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private static String datas[]=new String[]{
    "hello","world","java","hadoop"
    };


    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

    this.collector=collector;

    }

    /*
    * 循环调用
    * */
    @Override
    public void nextTuple() {

    //生成此数据
    String data=datas[new Random().nextInt(datas.length)];
    //发送数据到下游组件
    collector.emit(new Values(data));

    }

    /**
    * 此方法是对发送数据进行声明
    * */
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    declarer.declare(new Fields("word"));

    }
    }

    
    
    StromHdfsMain

    package com.gec.demo.stormtohdfs;

    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.hdfs.bolt.HdfsBolt;
    import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
    import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
    import org.apache.storm.hdfs.bolt.format.FileNameFormat;
    import org.apache.storm.hdfs.bolt.format.RecordFormat;
    import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
    import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy;
    import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
    import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Tuple;

    public class StromHdfsMain {
    public static void main(String[] args) throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
    // use "|" instead of "," for field delimiter
    RecordFormat format=new DelimitedRecordFormat()
    .withFieldDelimiter("|");
    /**
    * 文件的控制策略,使用两种方式,第一种:数据条数的多少
    * 第二种:文件的内容大小
    */
    // sync the filesystem after every 1k tuples
    SyncPolicy syncPolicy=new CountSyncPolicy(1000);
    //rotate files when they reach 5 MB
    FileRotationPolicy rotationPolicy=new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB) ;
    FileNameFormat fileNameFormat=new DefaultFileNameFormat()
    .withPath("/stormToHdfs/");
    HdfsBolt hdfsBolt=new HdfsBolt()
    .withFsUrl("hdfs://hadoop-001:9000")
    .withFileNameFormat(fileNameFormat)
    .withRecordFormat(format)
    .withRotationPolicy(rotationPolicy)
    .withSyncPolicy(syncPolicy);
    TopologyBuilder topologyBuilder=new TopologyBuilder();
    topologyBuilder.setSpout("dataSpout",new DataSpout());
    topologyBuilder.setBolt("hdfsBolt",hdfsBolt).localOrShuffleGrouping("dataSpout");
    Config config = new Config();
    if(args !=null && args.length >0){
    StormSubmitter.submitTopology(args[0],config,topologyBuilder.createTopology());
    }else{
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("stormToHdfs",config,topologyBuilder.createTopology());

    }
    }
    }

    
    

    效果图如下:

  • 相关阅读:
    .NET Core开发的iNeuOS物联网平台部署在Ubuntu操作系统,无缝跨平台。助力《2019 中国.NET 开发者峰会》。
    iNeuOS 物联网云操作系统2.0发布,集成设备容器、视图建模、机器学习三大模块
    .Net for Spark 实现 WordCount 应用及调试入坑详解
    实际项目中,看 ECharts 和 HighCharts 渲染性能对比,表面看衣装,本质看内功!!!
    iNeuOS云操作系统,.NET Core全系打造
    《ServerSuperIO Designer IDE使用教程》- 7.增加机器学习算法,通讯采集数据与算法相结合。发布:4.2.5 版本
    《ServerSuperIO Designer IDE使用教程》- 6.增加与阿里云物联网(IOT)对接服务,实现数据交互。发布:v4.2.4 版本
    《ServerSuperIO Designer IDE使用教程》- 5.树形结构管理设备驱动,小版本更新。发布:v4.2.3.1版本
    《ServerSuperIO Designer IDE使用教程》-4.增加台达PLC驱动及使用教程,从0到1的改变。发布:v4.2.3版本
    物联网框架ServerSuperIO.Core(.netcore)跨平台,一套设备驱动通吃嵌入式、上位机、云服务
  • 原文地址:https://www.cnblogs.com/Transkai/p/10909090.html
Copyright © 2011-2022 走看看