不多说,直接上干货!
这是书籍《从零开始学Storm》赵必厦 2014年出版的配套代码!
storm-starter项目包含使用storm的各种各样的例子。项目托管在GitHub上面,其网址为: http://github.com/nathanmarz/storm-starter
或者
storm-starter项目的包结构:
storm-starter项目的拓扑结构:
新建maven项目的方式
以“新建Maven项目的方式”导入storm-starter项目的步骤如下:
1、新建一个Maven项目,项目名称可以随意,如storm-starter。
2、把storm-starter项目根目录的srcjvm目录中的全部文件复制到Maven项目的src/main/java目录下。
storm-starter-mastersrcjvmstormstarter下的BasicDRPCTopology.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a * "!" to any string you send the DRPC function. * <p/> * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of * Storm. */ public class BasicDRPCTopology { public static class ExclaimBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); Config conf = new Config(); if (args == null || args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); for (String word : new String[]{ "hello", "goodbye" }) { System.out.println("Result for "" + word + "": " + drpc.execute("exclamation", word)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } }
storm-starter-mastersrcjvmstormstarter下的的ExclamationTopology.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; /** * This is a basic example of a Storm topology. */ public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } }
storm-starter-mastersrcjvmstormstarter下的ManualDRPC.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.drpc.DRPCSpout; import backtype.storm.drpc.ReturnResults; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class ManualDRPC { public static class ExclamationBolt extends BaseBasicBolt { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("result", "return-info")); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String arg = tuple.getString(0); Object retInfo = tuple.getValue(1); collector.emit(new Values(arg + "!!!", retInfo)); } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); LocalDRPC drpc = new LocalDRPC(); DRPCSpout spout = new DRPCSpout("exclamation", drpc); builder.setSpout("drpc", spout); builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc"); builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim"); LocalCluster cluster = new LocalCluster(); Config conf = new Config(); cluster.submitTopology("exclaim", conf, builder.createTopology()); System.out.println(drpc.execute("exclamation", "aaa")); System.out.println(drpc.execute("exclamation", "bbb")); } }
storm-starter-mastersrcjvmstormstarter下的PrintSampleStream.java
/* // to use this example, uncomment the twitter4j dependency information in the project.clj, // uncomment storm.starter.spout.TwitterSampleSpout, and uncomment this class package storm.starter; import storm.starter.spout.TwitterSampleSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.utils.Utils; import storm.starter.bolt.PrinterBolt; public class PrintSampleStream { public static void main(String[] args) { String username = args[0]; String pwd = args[1]; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new TwitterSampleSpout(username, pwd)); builder.setBolt("print", new PrinterBolt()) .shuffleGrouping("spout"); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.shutdown(); } } */
storm-starter-mastersrcjvmstormstarter下的ReachTopology.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.drpc.LinearDRPCTopologyBuilder; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.util.*; /** * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation. * <p/> * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower * records. * <p/> * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes * minutes on a single machine into one that takes just a couple seconds. * <p/> * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps. * <p/> * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC. */ public class ReachTopology { public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); }}; public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); put("tim", Arrays.asList("alex")); put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); put("adam", Arrays.asList("david", "carissa")); put("mike", Arrays.asList("john", "bob")); put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); }}; public static class GetTweeters extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { Object id = tuple.getValue(0); String url = tuple.getString(1); List<String> tweeters = TWEETERS_DB.get(url); if (tweeters != null) { for (String tweeter : tweeters) { collector.emit(new Values(id, tweeter)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "tweeter")); } } public static class GetFollowers extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { Object id = tuple.getValue(0); String tweeter = tuple.getString(1); List<String> followers = FOLLOWERS_DB.get(tweeter); if (followers != null) { for (String follower : followers) { collector.emit(new Values(id, follower)); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "follower")); } } public static class PartialUniquer extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Set<String> _followers = new HashSet<String>(); @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _followers.add(tuple.getString(1)); } @Override public void finishBatch() { _collector.emit(new Values(_id, _followers.size())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "partial-count")); } } public static class CountAggregator extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _count += tuple.getInteger(1); } @Override public void finishBatch() { _collector.emit(new Values(_id, _count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "reach")); } } public static LinearDRPCTopologyBuilder construct() { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 4); builder.addBolt(new GetFollowers(), 12).shuffleGrouping(); builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id")); return builder; } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = construct(); Config conf = new Config(); if (args == null || args.length == 0) { conf.setMaxTaskParallelism(3); LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc)); String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" }; for (String url : urlsToTry) { System.out.println("Reach of " + url + ": " + drpc.execute("reach", url)); } cluster.shutdown(); drpc.shutdown(); } else { conf.setNumWorkers(6); StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology()); } } }
storm-starter-mastersrcjvmstormstarter下的RollingTopWords.java
package storm.starter; import backtype.storm.Config; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import storm.starter.bolt.IntermediateRankingsBolt; import storm.starter.bolt.RollingCountBolt; import storm.starter.bolt.TotalRankingsBolt; import storm.starter.util.StormRunner; /** * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality. * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things * like trending topics or trending images on Twitter. */ public class RollingTopWords { private static final int DEFAULT_RUNTIME_IN_SECONDS = 60; private static final int TOP_N = 5; private final TopologyBuilder builder; private final String topologyName; private final Config topologyConfig; private final int runtimeInSeconds; public RollingTopWords() throws InterruptedException { builder = new TopologyBuilder(); topologyName = "slidingWindowCounts"; topologyConfig = createTopologyConfiguration(); runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS; wireTopology(); } private static Config createTopologyConfiguration() { Config conf = new Config(); conf.setDebug(true); return conf; } private void wireTopology() throws InterruptedException { String spoutId = "wordGenerator"; String counterId = "counter"; String intermediateRankerId = "intermediateRanker"; String totalRankerId = "finalRanker"; builder.setSpout(spoutId, new TestWordSpout(), 5); builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word")); builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields( "obj")); builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId); } public void run() throws InterruptedException { StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds); } public static void main(String[] args) throws Exception { new RollingTopWords().run(); } }
storm-starter-mastersrcjvmstormstarter下的SingleJoinExample.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.testing.FeederSpout; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import storm.starter.bolt.SingleJoinBolt; public class SingleJoinExample { public static void main(String[] args) { FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender")); FeederSpout ageSpout = new FeederSpout(new Fields("id", "age")); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("gender", genderSpout); builder.setSpout("age", ageSpout); builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id")) .fieldsGrouping("age", new Fields("id")); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("join-example", conf, builder.createTopology()); for (int i = 0; i < 10; i++) { String gender; if (i % 2 == 0) { gender = "male"; } else { gender = "female"; } genderSpout.feed(new Values(i, gender)); } for (int i = 9; i >= 0; i--) { ageSpout.feed(new Values(i, i + 20)); } Utils.sleep(2000); cluster.shutdown(); } }
storm-starter-mastersrcjvmstormstarter下的TransactionalGlobalCount.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBatchBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This * class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies */ public class TransactionalGlobalCount { public static final int PARTITION_TAKE_PER_BATCH = 3; public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ put(0, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("chicken")); add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); }}); put(1, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); add(new Values("banana")); }}); put(2, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); }}); }}; public static class Value { int count = 0; BigInteger txid; } public static Map<String, Value> DATABASE = new HashMap<String, Value>(); public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT"; public static class BatchCount extends BaseBatchBolt { Object _id; BatchOutputCollector _collector; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _count++; } @Override public void finishBatch() { _collector.emit(new Values(_id, _count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "count")); } } public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter { TransactionAttempt _attempt; BatchOutputCollector _collector; int _sum = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override public void execute(Tuple tuple) { _sum += tuple.getInteger(1); } @Override public void finishBatch() { Value val = DATABASE.get(GLOBAL_COUNT_KEY); Value newval; if (val == null || !val.txid.equals(_attempt.getTransactionId())) { newval = new Value(); newval.txid = _attempt.getTransactionId(); if (val == null) { newval.count = _sum; } else { newval.count = _sum + val.count; } DATABASE.put(GLOBAL_COUNT_KEY, newval); } else { newval = val; } _collector.emit(new Values(_attempt, newval.count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "sum")); } } public static void main(String[] args) throws Exception { MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3); builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout"); builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count"); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug(true); config.setMaxSpoutPending(3); cluster.submitTopology("global-count-topology", config, builder.buildTopology()); Thread.sleep(3000); cluster.shutdown(); } }
storm-starter-mastersrcjvmstormstarter下的TransactionalWords.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.MemoryTransactionalSpout; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.transactional.ICommitter; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.TransactionalTopologyBuilder; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a * stream of words and produces two outputs: * <p/> * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on. * <p/> * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move * between buckets as their counts accumulate. */ public class TransactionalWords { public static class CountValue { Integer prev_count = null; int count = 0; BigInteger txid = null; } public static class BucketValue { int count = 0; BigInteger txid; } public static final int BUCKET_SIZE = 10; public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>(); public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>(); public static final int PARTITION_TAKE_PER_BATCH = 3; public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{ put(0, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("chicken")); add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); }}); put(1, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("dog")); add(new Values("apple")); add(new Values("banana")); }}); put(2, new ArrayList<List<Object>>() {{ add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("cat")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); add(new Values("dog")); }}); }}; public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter { Map<String, Integer> _counts = new HashMap<String, Integer>(); BatchOutputCollector _collector; TransactionAttempt _id; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { String key = tuple.getString(1); Integer curr = _counts.get(key); if (curr == null) curr = 0; _counts.put(key, curr + 1); } @Override public void finishBatch() { for (String key : _counts.keySet()) { CountValue val = COUNT_DATABASE.get(key); CountValue newVal; if (val == null || !val.txid.equals(_id)) { newVal = new CountValue(); newVal.txid = _id.getTransactionId(); if (val != null) { newVal.prev_count = val.count; newVal.count = val.count; } newVal.count = newVal.count + _counts.get(key); COUNT_DATABASE.put(key, newVal); } else { newVal = val; } _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "key", "count", "prev-count")); } } public static class Bucketize extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); int curr = tuple.getInteger(2); Integer prev = tuple.getInteger(3); int currBucket = curr / BUCKET_SIZE; Integer prevBucket = null; if (prev != null) { prevBucket = prev / BUCKET_SIZE; } if (prevBucket == null) { collector.emit(new Values(attempt, currBucket, 1)); } else if (currBucket != prevBucket) { collector.emit(new Values(attempt, currBucket, 1)); collector.emit(new Values(attempt, prevBucket, -1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("attempt", "bucket", "delta")); } } public static class BucketCountUpdater extends BaseTransactionalBolt { Map<Integer, Integer> _accum = new HashMap<Integer, Integer>(); BatchOutputCollector _collector; TransactionAttempt _attempt; int _count = 0; @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) { _collector = collector; _attempt = attempt; } @Override public void execute(Tuple tuple) { Integer bucket = tuple.getInteger(1); Integer delta = tuple.getInteger(2); Integer curr = _accum.get(bucket); if (curr == null) curr = 0; _accum.put(bucket, curr + delta); } @Override public void finishBatch() { for (Integer bucket : _accum.keySet()) { BucketValue currVal = BUCKET_DATABASE.get(bucket); BucketValue newVal; if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) { newVal = new BucketValue(); newVal.txid = _attempt.getTransactionId(); newVal.count = _accum.get(bucket); if (currVal != null) newVal.count += currVal.count; BUCKET_DATABASE.put(bucket, newVal); } else { newVal = currVal; } _collector.emit(new Values(_attempt, bucket, newVal.count)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "bucket", "count")); } } public static void main(String[] args) throws Exception { MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word")); builder.setBolt("bucketize", new Bucketize()).noneGrouping("count"); builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket")); LocalCluster cluster = new LocalCluster(); Config config = new Config(); config.setDebug(true); config.setMaxSpoutPending(3); cluster.submitTopology("top-n-topology", config, builder.buildTopology()); Thread.sleep(3000); cluster.shutdown(); } }
storm-starter-mastersrcjvmstormstarter下的WordCountTopology.java
package storm.starter; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout; import java.util.HashMap; import java.util.Map; /** * This topology demonstrates Storm's stream groupings and multilang capabilities. */ public class WordCountTopology { public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } public static class WordCount extends BaseBasicBolt { Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } }
storm-starter-mastersrcjvmstormstarterspout的RandomSentenceSpout.java
package storm.starter.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
storm-starter-mastersrcjvmstormstarterspout的TwitterSampleSpout.java
/* package storm.starter.spout; import backtype.storm.Config; import twitter4j.conf.ConfigurationBuilder; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; public class TwitterSampleSpout extends BaseRichSpout { SpoutOutputCollector _collector; LinkedBlockingQueue<Status> queue = null; TwitterStream _twitterStream; String _username; String _pwd; public TwitterSampleSpout(String username, String pwd) { _username = username; _pwd = pwd; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { queue = new LinkedBlockingQueue<Status>(1000); _collector = collector; StatusListener listener = new StatusListener() { @Override public void onStatus(Status status) { queue.offer(status); } @Override public void onDeletionNotice(StatusDeletionNotice sdn) { } @Override public void onTrackLimitationNotice(int i) { } @Override public void onScrubGeo(long l, long l1) { } @Override public void onException(Exception e) { } }; TwitterStreamFactory fact = new TwitterStreamFactory(new ConfigurationBuilder().setUser(_username).setPassword(_pwd).build()); _twitterStream = fact.getInstance(); _twitterStream.addListener(listener); _twitterStream.sample(); } @Override public void nextTuple() { Status ret = queue.poll(); if(ret==null) { Utils.sleep(50); } else { _collector.emit(new Values(ret)); } } @Override public void close() { _twitterStream.shutdown(); } @Override public Map<String, Object> getComponentConfiguration() { Config ret = new Config(); ret.setMaxTaskParallelism(1); return ret; } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tweet")); } } */
storm-starter-mastersrcjvmstormstarterolt的AbstractRankerBolt.java
package storm.starter.bolt; import backtype.storm.Config; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; import storm.starter.util.TupleHelpers; import java.util.HashMap; import java.util.Map; /** * This abstract bolt provides the basic behavior of bolts that rank objects according to their count. * <p/> * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those * tuples are retrieved and counted. */ public abstract class AbstractRankerBolt extends BaseBasicBolt { private static final long serialVersionUID = 4931640198501530202L; private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2; private static final int DEFAULT_COUNT = 10; private final int emitFrequencyInSeconds; private final int count; private final Rankings rankings; public AbstractRankerBolt() { this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } public AbstractRankerBolt(int topN) { this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) { if (topN < 1) { throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")"); } if (emitFrequencyInSeconds < 1) { throw new IllegalArgumentException( "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)"); } count = topN; this.emitFrequencyInSeconds = emitFrequencyInSeconds; rankings = new Rankings(count); } protected Rankings getRankings() { return rankings; } /** * This method functions as a template method (design pattern). */ @Override public final void execute(Tuple tuple, BasicOutputCollector collector) { if (TupleHelpers.isTickTuple(tuple)) { getLogger().debug("Received tick tuple, triggering emit of current rankings"); emitRankings(collector); } else { updateRankingsWithTuple(tuple); } } abstract void updateRankingsWithTuple(Tuple tuple); private void emitRankings(BasicOutputCollector collector) { collector.emit(new Values(rankings.copy())); getLogger().debug("Rankings: " + rankings); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rankings")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; } abstract Logger getLogger(); }
storm-starter-mastersrcjvmstormstarterolt的IntermediateRankingsBolt.java
package storm.starter.bolt; import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankable; import storm.starter.tools.RankableObjectWithFields; /** * This bolt ranks incoming objects by their count. * <p/> * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1, * additionalField2, ..., additionalFieldN). */ public final class IntermediateRankingsBolt extends AbstractRankerBolt { private static final long serialVersionUID = -1369800530256637409L; private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class); public IntermediateRankingsBolt() { super(); } public IntermediateRankingsBolt(int topN) { super(topN); } public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) { super(topN, emitFrequencyInSeconds); } @Override void updateRankingsWithTuple(Tuple tuple) { Rankable rankable = RankableObjectWithFields.from(tuple); super.getRankings().updateWith(rankable); } @Override Logger getLogger() { return LOG; } }
storm-starter-mastersrcjvmstormstarterolt的PrinterBolt.java
package storm.starter.bolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; public class PrinterBolt extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer ofd) { } }
storm-starter-mastersrcjvmstormstarterolt的RollingCountBolt.java
package storm.starter.bolt; import backtype.storm.Config; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; import storm.starter.util.TupleHelpers; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; /** * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting. * <p/> * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every * minute. * <p/> * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the * actual duration of the sliding window. The latter is included in case the expected sliding window length (as * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual * window length is tracked and calculated for the window, and not individually for each object within a window. * <p/> * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning * during the first ~ five minutes of startup time if the window length is set to five minutes). */ public class RollingCountBolt extends BaseRichBolt { private static final long serialVersionUID = 5537727428628598519L; private static final Logger LOG = Logger.getLogger(RollingCountBolt.class); private static final int NUM_WINDOW_CHUNKS = 5; private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60; private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS; private static final String WINDOW_LENGTH_WARNING_TEMPLATE = "Actual window length is %d seconds when it should be %d seconds" + " (you can safely ignore this warning during the startup phase)"; private final SlidingWindowCounter<Object> counter; private final int windowLengthInSeconds; private final int emitFrequencyInSeconds; private OutputCollector collector; private NthLastModifiedTimeTracker lastModifiedTracker; public RollingCountBolt() { this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS); } public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) { this.windowLengthInSeconds = windowLengthInSeconds; this.emitFrequencyInSeconds = emitFrequencyInSeconds; counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); } private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) { return windowLengthInSeconds / windowUpdateFrequencyInSeconds; } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds, this.emitFrequencyInSeconds)); } @Override public void execute(Tuple tuple) { if (TupleHelpers.isTickTuple(tuple)) { LOG.debug("Received tick tuple, triggering emit of current window counts"); emitCurrentWindowCounts(); } else { countObjAndAck(tuple); } } private void emitCurrentWindowCounts() { Map<Object, Long> counts = counter.getCountsThenAdvanceWindow(); int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification(); lastModifiedTracker.markAsModified(); if (actualWindowLengthInSeconds != windowLengthInSeconds) { LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds)); } emit(counts, actualWindowLengthInSeconds); } private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) { for (Entry<Object, Long> entry : counts.entrySet()) { Object obj = entry.getKey(); Long count = entry.getValue(); collector.emit(new Values(obj, count, actualWindowLengthInSeconds)); } } private void countObjAndAck(Tuple tuple) { Object obj = tuple.getValue(0); counter.incrementCount(obj); collector.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds")); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; } }
storm-starter-mastersrcjvmstormstarterolt的SingleJoinBolt.java
package storm.starter.bolt; import backtype.storm.Config; import backtype.storm.generated.GlobalStreamId; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.utils.TimeCacheMap; import java.util.*; public class SingleJoinBolt extends BaseRichBolt { OutputCollector _collector; Fields _idFields; Fields _outFields; int _numSources; TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending; Map<String, GlobalStreamId> _fieldLocations; public SingleJoinBolt(Fields outFields) { _outFields = outFields; } @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _fieldLocations = new HashMap<String, GlobalStreamId>(); _collector = collector; int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue(); _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback()); _numSources = context.getThisSources().size(); Set<String> idFields = null; for (GlobalStreamId source : context.getThisSources().keySet()) { Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId()); Set<String> setFields = new HashSet<String>(fields.toList()); if (idFields == null) idFields = setFields; else idFields.retainAll(setFields); for (String outfield : _outFields) { for (String sourcefield : fields) { if (outfield.equals(sourcefield)) { _fieldLocations.put(outfield, source); } } } } _idFields = new Fields(new ArrayList<String>(idFields)); if (_fieldLocations.size() != _outFields.size()) { throw new RuntimeException("Cannot find all outfields among sources"); } } @Override public void execute(Tuple tuple) { List<Object> id = tuple.select(_idFields); GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()); if (!_pending.containsKey(id)) { _pending.put(id, new HashMap<GlobalStreamId, Tuple>()); } Map<GlobalStreamId, Tuple> parts = _pending.get(id); if (parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice"); parts.put(streamId, tuple); if (parts.size() == _numSources) { _pending.remove(id); List<Object> joinResult = new ArrayList<Object>(); for (String outField : _outFields) { GlobalStreamId loc = _fieldLocations.get(outField); joinResult.add(parts.get(loc).getValueByField(outField)); } _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult); for (Tuple part : parts.values()) { _collector.ack(part); } } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(_outFields); } private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> { @Override public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) { for (Tuple tuple : tuples.values()) { _collector.fail(tuple); } } } }
storm-starter-mastersrcjvmstormstarterolt的TotalRankingsBolt.java
package storm.starter.bolt; import backtype.storm.tuple.Tuple; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; /** * This bolt merges incoming {@link Rankings}. * <p/> * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final, * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}. */ public final class TotalRankingsBolt extends AbstractRankerBolt { private static final long serialVersionUID = -8447525895532302198L; private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class); public TotalRankingsBolt() { super(); } public TotalRankingsBolt(int topN) { super(topN); } public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) { super(topN, emitFrequencyInSeconds); } @Override void updateRankingsWithTuple(Tuple tuple) { Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0); super.getRankings().updateWith(rankingsToBeMerged); super.getRankings().pruneZeroCounts(); } @Override Logger getLogger() { return LOG; } }
storm-starter-mastersrcjvmstormstarter ools的NthLastModifiedTimeTracker.java
package storm.starter.tools; import backtype.storm.utils.Time; import org.apache.commons.collections.buffer.CircularFifoBuffer; /** * This class tracks the time-since-last-modify of a "thing" in a rolling fashion. * <p/> * For example, create a 5-slot tracker to track the five most recent time-since-last-modify. * <p/> * You must manually "mark" that the "something" that you want to track -- in terms of modification times -- has just * been modified. */ public class NthLastModifiedTimeTracker { private static final int MILLIS_IN_SEC = 1000; private final CircularFifoBuffer lastModifiedTimesMillis; public NthLastModifiedTimeTracker(int numTimesToTrack) { if (numTimesToTrack < 1) { throw new IllegalArgumentException( "numTimesToTrack must be greater than zero (you requested " + numTimesToTrack + ")"); } lastModifiedTimesMillis = new CircularFifoBuffer(numTimesToTrack); initLastModifiedTimesMillis(); } private void initLastModifiedTimesMillis() { long nowCached = now(); for (int i = 0; i < lastModifiedTimesMillis.maxSize(); i++) { lastModifiedTimesMillis.add(Long.valueOf(nowCached)); } } private long now() { return Time.currentTimeMillis(); } public int secondsSinceOldestModification() { long modifiedTimeMillis = ((Long) lastModifiedTimesMillis.get()).longValue(); return (int) ((now() - modifiedTimeMillis) / MILLIS_IN_SEC); } public void markAsModified() { updateLastModifiedTime(); } private void updateLastModifiedTime() { lastModifiedTimesMillis.add(now()); } }
storm-starter-mastersrcjvmstormstarter ools的Rankable.java
package storm.starter.tools; public interface Rankable extends Comparable<Rankable> { Object getObject(); long getCount(); /** * Note: We do not defensively copy the object wrapped by the Rankable. It is passed as is. * * @return a defensive copy */ Rankable copy(); }
storm-starter-mastersrcjvmstormstarter ools的RankableObjectWithFields.java
package storm.starter.tools; import backtype.storm.tuple.Tuple; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.List; /** * This class wraps an objects and its associated count, including any additional data fields. * <p/> * This class can be used, for instance, to track the number of occurrences of an object in a Storm topology. */ public class RankableObjectWithFields implements Rankable, Serializable { private static final long serialVersionUID = -9102878650001058090L; private static final String toStringSeparator = "|"; private final Object obj; private final long count; private final ImmutableList<Object> fields; public RankableObjectWithFields(Object obj, long count, Object... otherFields) { if (obj == null) { throw new IllegalArgumentException("The object must not be null"); } if (count < 0) { throw new IllegalArgumentException("The count must be >= 0"); } this.obj = obj; this.count = count; fields = ImmutableList.copyOf(otherFields); } /** * Construct a new instance based on the provided {@link Tuple}. * <p/> * This method expects the object to be ranked in the first field (index 0) of the provided tuple, and the number of * occurrences of the object (its count) in the second field (index 1). Any further fields in the tuple will be * extracted and tracked, too. These fields can be accessed via {@link RankableObjectWithFields#getFields()}. * * @param tuple * * @return new instance based on the provided tuple */ public static RankableObjectWithFields from(Tuple tuple) { List<Object> otherFields = Lists.newArrayList(tuple.getValues()); Object obj = otherFields.remove(0); Long count = (Long) otherFields.remove(0); return new RankableObjectWithFields(obj, count, otherFields.toArray()); } public Object getObject() { return obj; } public long getCount() { return count; } /** * @return an immutable list of any additional data fields of the object (may be empty but will never be null) */ public List<Object> getFields() { return fields; } @Override public int compareTo(Rankable other) { long delta = this.getCount() - other.getCount(); if (delta > 0) { return 1; } else if (delta < 0) { return -1; } else { return 0; } } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof RankableObjectWithFields)) { return false; } RankableObjectWithFields other = (RankableObjectWithFields) o; return obj.equals(other.obj) && count == other.count; } @Override public int hashCode() { int result = 17; int countHash = (int) (count ^ (count >>> 32)); result = 31 * result + countHash; result = 31 * result + obj.hashCode(); return result; } public String toString() { StringBuffer buf = new StringBuffer(); buf.append("["); buf.append(obj); buf.append(toStringSeparator); buf.append(count); for (Object field : fields) { buf.append(toStringSeparator); buf.append(field); } buf.append("]"); return buf.toString(); } /** * Note: We do not defensively copy the wrapped object and any accompanying fields. We do guarantee, however, * do return a defensive (shallow) copy of the List object that is wrapping any accompanying fields. * * @return */ @Override public Rankable copy() { List<Object> shallowCopyOfFields = ImmutableList.copyOf(getFields()); return new RankableObjectWithFields(getObject(), getCount(), shallowCopyOfFields); } }
storm-starter-mastersrcjvmstormstarter ools的Rankings.java
package storm.starter.tools; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.Serializable; import java.util.Collections; import java.util.List; public class Rankings implements Serializable { private static final long serialVersionUID = -1549827195410578903L; private static final int DEFAULT_COUNT = 10; private final int maxSize; private final List<Rankable> rankedItems = Lists.newArrayList(); public Rankings() { this(DEFAULT_COUNT); } public Rankings(int topN) { if (topN < 1) { throw new IllegalArgumentException("topN must be >= 1"); } maxSize = topN; } /** * Copy constructor. * @param other */ public Rankings(Rankings other) { this(other.maxSize()); updateWith(other); } /** * @return the maximum possible number (size) of ranked objects this instance can hold */ public int maxSize() { return maxSize; } /** * @return the number (size) of ranked objects this instance is currently holding */ public int size() { return rankedItems.size(); } /** * The returned defensive copy is only "somewhat" defensive. We do, for instance, return a defensive copy of the * enclosing List instance, and we do try to defensively copy any contained Rankable objects, too. However, the * contract of {@link storm.starter.tools.Rankable#copy()} does not guarantee that any Object's embedded within * a Rankable will be defensively copied, too. * * @return a somewhat defensive copy of ranked items */ public List<Rankable> getRankings() { List<Rankable> copy = Lists.newLinkedList(); for (Rankable r: rankedItems) { copy.add(r.copy()); } return ImmutableList.copyOf(copy); } public void updateWith(Rankings other) { for (Rankable r : other.getRankings()) { updateWith(r); } } public void updateWith(Rankable r) { synchronized(rankedItems) { addOrReplace(r); rerank(); shrinkRankingsIfNeeded(); } } private void addOrReplace(Rankable r) { Integer rank = findRankOf(r); if (rank != null) { rankedItems.set(rank, r); } else { rankedItems.add(r); } } private Integer findRankOf(Rankable r) { Object tag = r.getObject(); for (int rank = 0; rank < rankedItems.size(); rank++) { Object cur = rankedItems.get(rank).getObject(); if (cur.equals(tag)) { return rank; } } return null; } private void rerank() { Collections.sort(rankedItems); Collections.reverse(rankedItems); } private void shrinkRankingsIfNeeded() { if (rankedItems.size() > maxSize) { rankedItems.remove(maxSize); } } /** * Removes ranking entries that have a count of zero. */ public void pruneZeroCounts() { int i = 0; while (i < rankedItems.size()) { if (rankedItems.get(i).getCount() == 0) { rankedItems.remove(i); } else { i++; } } } public String toString() { return rankedItems.toString(); } /** * Creates a (defensive) copy of itself. */ public Rankings copy() { return new Rankings(this); } }
storm-starter-mastersrcjvmstormstarter ools的SlidingWindowCounter.java
package storm.starter.tools; import java.io.Serializable; import java.util.Map; /** * This class counts objects in a sliding window fashion. * <p/> * It is designed 1) to give multiple "producer" threads write access to the counter, i.e. being able to increment * counts of objects, and 2) to give a single "consumer" thread (e.g. {@link PeriodicSlidingWindowCounter}) read access * to the counter. Whenever the consumer thread performs a read operation, this class will advance the head slot of the * sliding window counter. This means that the consumer thread indirectly controls where writes of the producer threads * will go to. Also, by itself this class will not advance the head slot. * <p/> * A note for analyzing data based on a sliding window count: During the initial <code>windowLengthInSlots</code> * iterations, this sliding window counter will always return object counts that are equal or greater than in the * previous iteration. This is the effect of the counter "loading up" at the very start of its existence. Conceptually, * this is the desired behavior. * <p/> * To give an example, using a counter with 5 slots which for the sake of this example represent 1 minute of time each: * <p/> * <pre> * {@code * Sliding window counts of an object X over time * * Minute (timeline): * 1 2 3 4 5 6 7 8 * * Observed counts per minute: * 1 1 1 1 0 0 0 0 * * Counts returned by counter: * 1 2 3 4 4 3 2 1 * } * </pre> * <p/> * As you can see in this example, for the first <code>windowLengthInSlots</code> (here: the first five minutes) the * counter will always return counts equal or greater than in the previous iteration (1, 2, 3, 4, 4). This initial load * effect needs to be accounted for whenever you want to perform analyses such as trending topics; otherwise your * analysis algorithm might falsely identify the object to be trending as the counter seems to observe continuously * increasing counts. Also, note that during the initial load phase <em>every object</em> will exhibit increasing * counts. * <p/> * On a high-level, the counter exhibits the following behavior: If you asked the example counter after two minutes, * "how often did you count the object during the past five minutes?", then it should reply "I have counted it 2 times * in the past five minutes", implying that it can only account for the last two of those five minutes because the * counter was not running before that time. * * @param <T> The type of those objects we want to count. */ public final class SlidingWindowCounter<T> implements Serializable { private static final long serialVersionUID = -2645063988768785810L; private SlotBasedCounter<T> objCounter; private int headSlot; private int tailSlot; private int windowLengthInSlots; public SlidingWindowCounter(int windowLengthInSlots) { if (windowLengthInSlots < 2) { throw new IllegalArgumentException( "Window length in slots must be at least two (you requested " + windowLengthInSlots + ")"); } this.windowLengthInSlots = windowLengthInSlots; this.objCounter = new SlotBasedCounter<T>(this.windowLengthInSlots); this.headSlot = 0; this.tailSlot = slotAfter(headSlot); } public void incrementCount(T obj) { objCounter.incrementCount(obj, headSlot); } /** * Return the current (total) counts of all tracked objects, then advance the window. * <p/> * Whenever this method is called, we consider the counts of the current sliding window to be available to and * successfully processed "upstream" (i.e. by the caller). Knowing this we will start counting any subsequent * objects within the next "chunk" of the sliding window. * * @return The current (total) counts of all tracked objects. */ public Map<T, Long> getCountsThenAdvanceWindow() { Map<T, Long> counts = objCounter.getCounts(); objCounter.wipeZeros(); objCounter.wipeSlot(tailSlot); advanceHead(); return counts; } private void advanceHead() { headSlot = tailSlot; tailSlot = slotAfter(tailSlot); } private int slotAfter(int slot) { return (slot + 1) % windowLengthInSlots; } }
storm-starter-mastersrcjvmstormstarter ools的SlotBasedCounter.java
package storm.starter.tools; import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * This class provides per-slot counts of the occurrences of objects. * <p/> * It can be used, for instance, as a building block for implementing sliding window counting of objects. * * @param <T> The type of those objects we want to count. */ public final class SlotBasedCounter<T> implements Serializable { private static final long serialVersionUID = 4858185737378394432L; private final Map<T, long[]> objToCounts = new HashMap<T, long[]>(); private final int numSlots; public SlotBasedCounter(int numSlots) { if (numSlots <= 0) { throw new IllegalArgumentException("Number of slots must be greater than zero (you requested " + numSlots + ")"); } this.numSlots = numSlots; } public void incrementCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { counts = new long[this.numSlots]; objToCounts.put(obj, counts); } counts[slot]++; } public long getCount(T obj, int slot) { long[] counts = objToCounts.get(obj); if (counts == null) { return 0; } else { return counts[slot]; } } public Map<T, Long> getCounts() { Map<T, Long> result = new HashMap<T, Long>(); for (T obj : objToCounts.keySet()) { result.put(obj, computeTotalCount(obj)); } return result; } private long computeTotalCount(T obj) { long[] curr = objToCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; } /** * Reset the slot count of any tracked objects to zero for the given slot. * * @param slot */ public void wipeSlot(int slot) { for (T obj : objToCounts.keySet()) { resetSlotCountToZero(obj, slot); } } private void resetSlotCountToZero(T obj, int slot) { long[] counts = objToCounts.get(obj); counts[slot] = 0; } private boolean shouldBeRemovedFromCounter(T obj) { return computeTotalCount(obj) == 0; } /** * Remove any object from the counter whose total count is zero (to free up memory). */ public void wipeZeros() { Set<T> objToBeRemoved = new HashSet<T>(); for (T obj : objToCounts.keySet()) { if (shouldBeRemovedFromCounter(obj)) { objToBeRemoved.add(obj); } } for (T obj : objToBeRemoved) { objToCounts.remove(obj); } } }
storm-starter-mastersrcjvmstormstarter rident的TridentReach.java
package storm.starter.trident; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.generated.StormTopology; import backtype.storm.task.IMetricsContext; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.CombinerAggregator; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.state.ReadOnlyState; import storm.trident.state.State; import storm.trident.state.StateFactory; import storm.trident.state.map.ReadOnlyMapState; import storm.trident.tuple.TridentTuple; import java.util.*; public class TridentReach { public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{ put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); }}; public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{ put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai")); put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian")); put("tim", Arrays.asList("alex")); put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan")); put("adam", Arrays.asList("david", "carissa")); put("mike", Arrays.asList("john", "bob")); put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob")); }}; public static class StaticSingleKeyMapState extends ReadOnlyState implements ReadOnlyMapState<Object> { public static class Factory implements StateFactory { Map _map; public Factory(Map map) { _map = map; } @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return new StaticSingleKeyMapState(_map); } } Map _map; public StaticSingleKeyMapState(Map map) { _map = map; } @Override public List<Object> multiGet(List<List<Object>> keys) { List<Object> ret = new ArrayList(); for (List<Object> key : keys) { Object singleKey = key.get(0); ret.add(_map.get(singleKey)); } return ret; } } public static class One implements CombinerAggregator<Integer> { @Override public Integer init(TridentTuple tuple) { return 1; } @Override public Integer combine(Integer val1, Integer val2) { return 1; } @Override public Integer zero() { return 1; } } public static class ExpandList extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { List l = (List) tuple.getValue(0); if (l != null) { for (Object o : l) { collector.emit(new Values(o)); } } } } public static StormTopology buildTopology(LocalDRPC drpc) { TridentTopology topology = new TridentTopology(); TridentState urlToTweeters = topology.newStaticState(new StaticSingleKeyMapState.Factory(TWEETERS_DB)); TridentState tweetersToFollowers = topology.newStaticState(new StaticSingleKeyMapState.Factory(FOLLOWERS_DB)); topology.newDRPCStream("reach", drpc).stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields( "tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")).shuffle().stateQuery( tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")).each(new Fields("followers"), new ExpandList(), new Fields("follower")).groupBy(new Fields("follower")).aggregate(new One(), new Fields( "one")).aggregate(new Fields("one"), new Sum(), new Fields("reach")); return topology.build(); } public static void main(String[] args) throws Exception { LocalDRPC drpc = new LocalDRPC(); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("reach", conf, buildTopology(drpc)); Thread.sleep(2000); System.out.println("REACH: " + drpc.execute("reach", "aaa")); System.out.println("REACH: " + drpc.execute("reach", "foo.com/blog/1")); System.out.println("REACH: " + drpc.execute("reach", "engineering.twitter.com/blog/5")); cluster.shutdown(); drpc.shutdown(); } }
storm-starter-mastersrcjvmstormstarter rident的TridentWordCount.java
package storm.starter.trident; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.LocalDRPC; import backtype.storm.StormSubmitter; import backtype.storm.generated.StormTopology; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import storm.trident.TridentState; import storm.trident.TridentTopology; import storm.trident.operation.BaseFunction; import storm.trident.operation.TridentCollector; import storm.trident.operation.builtin.Count; import storm.trident.operation.builtin.FilterNull; import storm.trident.operation.builtin.MapGet; import storm.trident.operation.builtin.Sum; import storm.trident.testing.FixedBatchSpout; import storm.trident.testing.MemoryMapState; import storm.trident.tuple.TridentTuple; public class TridentWordCount { public static class Split extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } } public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat"), new Values("to be or not to be the person")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout).parallelismHint(16).each(new Fields("sentence"), new Split(), new Fields("word")).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")).parallelismHint(16); topology.newDRPCStream("words", drpc).each(new Fields("args"), new Split(), new Fields("word")).groupBy(new Fields( "word")).stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count")).each(new Fields("count"), new FilterNull()).aggregate(new Fields("count"), new Sum(), new Fields("sum")); return topology.build(); } public static void main(String[] args) throws Exception { Config conf = new Config(); conf.setMaxSpoutPending(20); if (args.length == 0) { LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("wordCounter", conf, buildTopology(drpc)); for (int i = 0; i < 100; i++) { System.out.println("DRPC RESULT: " + drpc.execute("words", "cat the dog jumped")); Thread.sleep(1000); } } else { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, buildTopology(null)); } } }
storm-starter-mastersrcjvmstormstarterutil的StormRunner.java
package storm.starter.util; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; public final class StormRunner { private static final int MILLIS_IN_SEC = 1000; private StormRunner() { } public static void runTopologyLocally(StormTopology topology, String topologyName, Config conf, int runtimeInSeconds) throws InterruptedException { LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, topology); Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC); cluster.killTopology(topologyName); cluster.shutdown(); } }
storm-starter-mastersrcjvmstormstarterutil的TupleHelpers.java
package storm.starter.util; import backtype.storm.Constants; import backtype.storm.tuple.Tuple; public final class TupleHelpers { private TupleHelpers() { } public static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals( Constants.SYSTEM_TICK_STREAM_ID); } }