zoukankan      html  css  js  c++  java
  • (二) storm的基本使用

    SimpleTopology.java

    View Code
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    
    /**
     * Hello world!
     *
     */
    public class SimpleTopology
    {
        public static void main( String[] args ) throws Exception
        {
            TopologyBuilder topologyBuilder=new TopologyBuilder();
            topologyBuilder.setSpout("simple-spout", new SimpleSpout(),1);
            topologyBuilder.setBolt("simple-bilt",new SimpleBolt1(), 3).shuffleGrouping("simple-spout");
            topologyBuilder.setBolt("wordcounter", new SimpleBolt2(), 3).fieldsGrouping("simple-bilt", new Fields("info"));
            topologyBuilder.setBolt("word-to-upper", new SimpleBolt4(),5).shuffleGrouping("simple-spout");
            topologyBuilder.setBolt("store", new SimpleBolt3(),10).shuffleGrouping("word-to-upper");
            Config config=new Config();
            config.setDebug(true);
            if(null!=args&&args.length>0){
                //使用集群模式运行
                config.setNumWorkers(1);
                StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
            }
            else{
                //使用本地模式运行
                config.setMaxTaskParallelism(1);
                LocalCluster cluster=new LocalCluster();
                cluster.submitTopology("simple", config, topologyBuilder.createTopology());
            }
        }
    }

    SimpleSpout.java

    View Code
    import java.util.List;
    import java.util.Map;
    
    import redis.clients.jedis.Jedis;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class SimpleSpout extends BaseRichSpout{
    
        /**
         *
         */
        private static final long serialVersionUID = -6335251364034714629L;
        private SpoutOutputCollector collector;
        private Jedis jedis;
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("source"));
        }
    
        @SuppressWarnings("rawtypes")
        public void open(Map conf, TopologyContext context,
                SpoutOutputCollector collector) {
                this.collector=collector;
                jedis=new Jedis("192.168.180.101", 6379);
        }
    
        public void nextTuple() {
              List<String> messages=jedis.brpop(3600,"msg_queue");
              if(!messages.isEmpty()){
                  for (String msg : messages) {
                      collector.emit(new Values(msg));
                }
              }
        }
    
    }

    SimpleBolt1.java

    View Code
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class SimpleBolt1 extends BaseBasicBolt {
    
        /**
         *
         */
        private static final long serialVersionUID = -5266922733759958473L;
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            String message=input.getString(0);
            if(null!=message.trim()){
                collector.emit(new Values(message));
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("info"));
        }
    
    }

    SimpleBolt2.java

    View Code
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class SimpleBolt2 extends BaseBasicBolt {
    
        /**
         *
         */
        private static final long serialVersionUID = 2246728833921545676L;
        Integer id;
        String name;
        Map<String, Integer> counters;
    
        @SuppressWarnings("rawtypes")
        public void prepare(Map stormConf, TopologyContext context) {
            this.counters=new HashMap<String, Integer>();
            this.name=context.getThisComponentId();
            this.id=context.getThisTaskId();
            System.out.println(String.format("componentId:%s",this.name));
        }
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word=input.getString(0);
            if(counters.containsKey(word)){
                Integer c=counters.get(word);
                counters.put(word, c+1);
            }
            else{
                counters.put(word, 1);
            }
            collector.emit(new Values(word,counters.get(word)));
            System.out.println(String.format("stats result is:%s:%s", word,counters.get(word)));
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word","count"));
        }
    
    }

    SimpleBolt3.java

    View Code
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    
    public class SimpleBolt3 extends BaseBasicBolt{
    
        /**
         *
         */
        private static final long serialVersionUID = 9140971206523366543L;
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word=input.getString(0);
            StoreDatabase.insertRow(word);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
        }
    
    }

    StoreDatabase.java

    View Code
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.sql.Statement;
    
    public class StoreDatabase {
        public static Connection connection;
        public static Statement stmt;
        static {
            String dbDriver = "com.mysql.jdbc.Driver";
            String dbUrl = "jdbc:mysql://192.168.187.16/blog";
            String user = "zhxia";
            String password = "admin";
            try {
                Class.forName(dbDriver);
    
            } catch (ClassNotFoundException ex) {
                ex.printStackTrace();
            }
            try {
                connection = DriverManager.getConnection(dbUrl, user, password);
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        public static int insertRow(String word){
            int effectRows=0;
            String sql=String.format("insert into words(word)values('%s')", word);
            try{
                stmt=connection.createStatement();
                effectRows=stmt.executeUpdate(sql);
                stmt.close();
            }
            catch (SQLException e) {
                e.printStackTrace();
                System.err.println("数据插入失败");
            }
            return effectRows;
        }
    }

    SimpleBolt4.java

    View Code
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class SimpleBolt4  extends BaseBasicBolt{
    
        /**
         *
         */
        private static final long serialVersionUID = -8025390241512976224L;
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            String word=input.getString(0);
            if(null!=word&&word.trim()!=""){
                String upper=word.trim().toUpperCase();
                System.out.println(String.format("upper word is:%s", upper));
                collector.emit(new Values(upper));
            }
        }
    
    }

    pom.xml

    View Code
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.haozu.app</groupId>
        <artifactId>app-storm</artifactId>
        <packaging>jar</packaging>
        <version>1.0</version>
        <name>app-storm</name>
        <url>http://maven.apache.org</url>
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>storm</groupId>
                <artifactId>storm</artifactId>
                <scope>provided</scope>
                <version>0.8.1</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.21</version>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.0.0</version>
                <type>jar</type>
                <scope>compile</scope>
            </dependency>
        </dependencies>
        <repositories>
            <repository>
                <id>haozu</id>
                <name>haozu repositories</name>
                <url>http://nexus.dev.haozu.com:10010/nexus/content/groups/public</url>
                <layout>default</layout>
            </repository>
        </repositories>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>com.haozu.app.SimpleTopology</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    打包:

    mvn clean assembly:assembly -Dmaven.test.skip=true

    提交作业并运行:

    本地模式运行:storm jar target/app-storm-1.0-jar-with-dependencies.jar  com.haozu.app.SimpleTopology 

    集群模式运行:storm jar target/app-storm-1.0-jar-with-dependencies.jar  com.haozu.app.SimpleTopology "test"

  • 相关阅读:
    论频谱中负频率成分的物理意义(转载)
    VS2008的glaux库
    通过域名显示IP列表
    Shader errorX3205的解决
    Curl, Divergence, Circulation
    关于FIONREAD命令的作用
    Cairngorm的结构及开发使用(2)(转)
    结合Flex Builder和Flash CS4制作一个中国地图的应用(转)
    大型高并发高负载网站的系统架构(转)
    Cairngorm的结构及开发使用(4)(转)
  • 原文地址:https://www.cnblogs.com/xiazh/p/2827732.html
Copyright © 2011-2022 走看看