zoukankan      html  css  js  c++  java
  • getting start with storm 翻译 第八章 part-2

    转载请注明出处:http://blog.csdn.net/lonelytrooper/article/details/12435641

    The Bolts

    首先我们看一下该topology中的标准bolts:

    public class UserSplitterBoltimplementsIBasicBolt{

    private static final longserialVersionUID=1L;

    @Override

    public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer) {

    declarer.declareStream("users",newFields("txid","tweet_id","user"));

    }

    @Override

    public Map<String,Object>getComponentConfiguration() {

    return null;

    }

    @Override

    publicvoidprepare(Map stormConf,TopologyContext context) {

    }

    @Override

    publicvoidexecute(Tuple input,BasicOutputCollector collector) {

    String tweet =input.getStringByField("tweet");

    String tweetId =input.getStringByField("tweet_id");

    StringTokenizer strTok =newStringTokenizer(tweet," ");

    TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");

    HashSet<String>users= newHashSet<String>();

    while(strTok.hasMoreTokens()) {

    String user =strTok.nextToken();

    // Ensure this is an actual user, and that it's not repeated in the tweet

    if(user.startsWith("@") && !users.contains(user)) {

    collector.emit("users",newValues(tx,tweetId,user));

    users.add(user);

    }

    }

    }

    @Override

    publicvoidcleanup() {

    }

    }

    正如本章前边提到的,UserSplitterBolt接收元组,解析tweets的文本,并发送@后边的单词或者Twitter用户。HashtagSplitterBolt以一种非常简单的方式工作。

    public class HashtagSplitterBoltimplementsIBasicBolt{

    private static final longserialVersionUID=1L;

    @Override

    public voiddeclareOutputFields(OutputFieldsDeclarerdeclarer) {

    declarer.declareStream("hashtags",newFields("txid","tweet_id","hashtag"));

    }

    @Override

    public Map<String,Object>getComponentConfiguration() {

    return null;

    }

    @Override

    public voidprepare(Map stormConf,TopologyContext context) {

    }

    @Override

    public voidexecute(Tuple input,BasicOutputCollector collector) {

    String tweet =input.getStringByField("tweet");

    String tweetId =input.getStringByField("tweet_id");

    StringTokenizer strTok =newStringTokenizer(tweet," ");

    TransactionAttempt tx = (TransactionAttempt)input.getValueByField("txid");

    HashSet<String>words= newHashSet<String>();

    while(strTok.hasMoreTokens()) {

    String word =strTok.nextToken();

    if(word.startsWith("#") && !words.contains(word)) {

    collector.emit("hashtags",newValues(tx,tweetId,word));

    words.add(word);

    }

    }

    }

    @Override

    publicvoidcleanup() {

    }

    }

    我们现在看下在UserHashtagJoinBolt中发生了什么。首先要注意到的最重要的事情是它是一个BaseBatchBolt。这意味着会对接收到的元组执行execute方法但不会发送任何新的元组。逐步的,当批次结束的时候,Storm会调用finishBatch方法。

    public voidexecute(Tuple tuple) {

    String source =tuple.getSourceStreamId();

    String tweetId =tuple.getStringByField("tweet_id");

    if("hashtags".equals(source)) {

    String hashtag =tuple.getStringByField("hashtag");

    add(tweetHashtags,tweetId,hashtag);

    } else if("users".equals(source)) {

    String user =tuple.getStringByField("user");

    add(userTweets,user,tweetId);

    }

    }

    因为你需要将一条tweet中所有的标签与该tweet中提到的用户关联起来并且计数他们出现的次数,你需要对前边bolt的两条流做连接。对整个批次都这样处理,一旦完成了,finishBatch方法会被调用。

    @Override

    publicvoidfinishBatch() {

    for(String user:userTweets.keySet()) {

    Set<String>tweets= getUserTweets(user);

    HashMap<String,Integer>hashtagsCounter =new HashMap<String,Integer>();

    for(String tweet:tweets) {

    Set<String>hashtags= getTweetHashtags(tweet);

    if(hashtags!=null) {

    for(String hashtag:hashtags) {

    Integer count =hashtagsCounter.get(hashtag);

    if(count==null)

    count =0;

    count ++;

    hashtagsCounter.put(hashtag,count);

    }

    }

    }

    for (String hashtag:hashtagsCounter.keySet()) {

    int count=hashtagsCounter.get(hashtag);

    collector.emit(newValues(id,user,hashtag, count));

    }

    }

    }

    在该方法中,对每一个用户--标签以及它出现的次数,生成并发射一个元组。

    你可以在GitHub看到完整的可下载的代码。

    提交者 Bolts

    正如你已经知道的,在topology中批量的元组被协调器和发射器发送。这些批量的元组被并行的处理,并没有特定的顺序。

    coordinator bolts或者是实现了ICommitter接口的特殊批量bolts,或者它在TransactionalTopologyBuilder中被用setCommiterBolt方法设置过。它与常规的批量bolts的主要不同在于当该批次准备好被提交时会执行提交者 bolts的finishBatch方法。这在所有前边的事务被成功的提交后会发生。另外,finishBatch方法被顺序的执行。所以,当事务ID为1的批次和事务ID为2的批次在topology中被并行的处理时,正在处理事务ID为2的批次的提交者bolt的finishBatch方法只有在事务ID为1的批次的finishBatch方法结束并且没有任何错误的情况下才会被执行。

    该类的实现如下:

    public class RedisCommiterCommiterBoltextendsBaseTransactionalBolt

    implements ICommitter{

    public static final String LAST_COMMITED_TRANSACTION_FIELD="LAST_COMMIT";

    TransactionAttempt id;

    BatchOutputCollector collector;

    Jedis jedis;

    @Override

    public voidprepare(Map conf,TopologyContext context,

    BatchOutputCollector collector,TransactionAttempt id) {

    this.id=id;

    this.collector=collector;

    this.jedis=newJedis("localhost");

    }

    HashMap<String,Long>hashtags = new HashMap<String,Long>();

    HashMap<String,Long>users = newHashMap<String,Long>();

    HashMap<String,Long>usersHashtags =new HashMap<String,Long>();

    private voidcount(HashMap<String,Long>map, String key,intcount) {

    Long value =map.get(key);

    if(value==null)

    value = (long)0;

    value +=count;

    map.put(key,value);

    }

    @Override

    public voidexecute(Tuple tuple) {

    String origin =tuple.getSourceComponent();

    if("users-splitter".equals(origin)) {

    String user =tuple.getStringByField("user");

    count(users,user,1);

    } else if("hashtag-splitter".equals(origin)) {

    String hashtag =tuple.getStringByField("hashtag");

    count(hashtags,hashtag,1);

    } else if("user-hashtag-merger".equals(origin)) {

    String hashtag =tuple.getStringByField("hashtag");

    String user =tuple.getStringByField("user");

    String key =user+ ":" + hashtag;

    Integer count =tuple.getIntegerByField("count");

    count(usersHashtags,key,count);

    }

    }

    @Override

    publicvoidfinishBatch() {

    String lastCommitedTransaction = jedis.get(LAST_COMMITED_TRANSACTION_FIELD);

    String currentTransaction =""+id.getTransactionId();

    if(currentTransaction.equals(lastCommitedTransaction))

    return;

    Transaction multi =jedis.multi();

    multi.set(LAST_COMMITED_TRANSACTION_FIELD,currentTransaction);

    Set<String>keys= hashtags.keySet();

    for(String hashtag:keys) {

    Long count =hashtags.get(hashtag);

    multi.hincrBy("hashtags",hashtag,count);

    }

    keys =users.keySet();

    for(String user:keys) {

    Long count = users.get(user);

    multi.hincrBy("users",user,count);

    }

    keys =usersHashtags.keySet();

    for(String key:keys) {

    Long count =usersHashtags.get(key);

    multi.hincrBy("users_hashtags",key,count);

    }

    multi.exec();

    }

    @Override

    publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer) {

    }

    }

    这些都很直观,但是在finishBatch方法中有一个非常重要的细节。

    ...

    multi.set(LAST_COMMITED_TRANSACTION_FIELD,currentTransaction);

    ...

    这里你正在存储上一个被提交的事务ID到数据库。你为什么要那样做?记住当一个事务失败时,如果有必要的话Storm将重放它足够多次。如果你不确定你已经处理过该事务,那么你可以高估,这样整个topology的事务性含义都没意义了。所以记住:存储上一个被提交的事务ID并且提交前核对它。

  • 相关阅读:
    线性回归(linear regression)之监督学习
    hadoop学习笔记之倒排索引
    Java学习笔记之==与equals
    Java学习笔记之深入理解引用
    mysql
    mysql
    JAVA
    nio
    JVM
    java
  • 原文地址:https://www.cnblogs.com/keanuyaoo/p/3357883.html
Copyright © 2011-2022 走看看