zoukankan      html  css  js  c++  java
  • Storm学习笔记

    1.如何让一个spout并行读取多个流?
    方法:任何spout、bolts组件都可以访问TopologyContext。利用这个特性可以让Spouts的实例之间划分流。
    示例:获取到storm集群spouts并行度的大小,和这个spout实例的Task Index,让输入的tracks[]的多个流合理地分到某一个spout实例中。这样就可以实现一个spout并行读取多个流。
    //open()方法中
    int
    spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(",");

    一个完整的Spout code:输入参数track代表了多个流,在open()方法中用取模%初始化track,在execute()方法读取track的数据,发送。由于spout的多个实例的myIdx不同,它们可以获得各自的一个track,可以实现一个spout读取多个流。

      1 //ApiStreamingSpout.java
      2 package twitter.streaming;
      3 
      4 import java.io.BufferedReader;
      5 import java.io.IOException;
      6 import java.io.InputStream;
      7 import java.io.InputStreamReader;
      8 import java.util.Map;
      9 import java.util.concurrent.LinkedBlockingQueue;
     10 
     11 import org.apache.http.HttpResponse;
     12 import org.apache.http.StatusLine;
     13 import org.apache.http.auth.AuthScope;
     14 import org.apache.http.auth.UsernamePasswordCredentials;
     15 import org.apache.http.client.methods.HttpGet;
     16 import org.apache.http.impl.client.BasicCredentialsProvider;
     17 import org.apache.http.impl.client.DefaultHttpClient;
     18 import org.apache.log4j.Logger;
     19 import org.json.simple.parser.JSONParser;
     20 import org.json.simple.parser.ParseException;
     21 
     22 import backtype.storm.spout.SpoutOutputCollector;
     23 import backtype.storm.task.TopologyContext;
     24 import backtype.storm.topology.OutputFieldsDeclarer;
     25 import backtype.storm.topology.base.BaseRichSpout;
     26 import backtype.storm.tuple.Fields;
     27 import backtype.storm.tuple.Values;
     28 
     29 public class ApiStreamingSpout extends BaseRichSpout {
     30 
     31     static String STREAMING_API_URL = "https://stream.twitter.com/1/statuses/filter.json?track=";
     32     private String track;
     33     private String user;
     34     private String password;
     35     private DefaultHttpClient client;
     36     private SpoutOutputCollector collector;
     37     private UsernamePasswordCredentials credentials;
     38     private BasicCredentialsProvider credentialProvider;
     39 
     40     LinkedBlockingQueue<String> tweets = new LinkedBlockingQueue<String>();
     41 
     42     static Logger LOG = Logger.getLogger(ApiStreamingSpout.class);
     43     static JSONParser jsonParser = new JSONParser();
     44 
     45     @Override
     46     public void nextTuple() {
     47         /*
     48          * Create the client call
     49          */
     50         client = new DefaultHttpClient();
     51         client.setCredentialsProvider(credentialProvider);
     52         HttpGet get = new HttpGet(STREAMING_API_URL + track); // 每个spout实例track是唯一的。
     53         HttpResponse response;
     54         try {
     55             // Execute
     56             response = client.execute(get);
     57             StatusLine status = response.getStatusLine();
     58             if (status.getStatusCode() == 200) {
     59                 InputStream inputStream = response.getEntity().getContent();
     60                 BufferedReader reader = new BufferedReader(
     61                         new InputStreamReader(inputStream));
     62                 String in;
     63                 // Read line by line
     64                 while ((in = reader.readLine()) != null) {
     65                     try {
     66                         // Parse and emit
     67                         Object json = jsonParser.parse(in);
     68                         collector.emit(new Values(track, json));
     69                     } catch (ParseException e) {
     70                         LOG.error("Error parsing message from twitter", e);
     71                     }
     72                 }
     73             }
     74         } catch (IOException e) {
     75             LOG.error("Error in communication with twitter api ["
     76                     + get.getURI().toString() + "]");
     77             try {
     78                 Thread.sleep(10000);
     79             } catch (InterruptedException e1) {
     80             }
     81         }
     82     }
     83 
     84     /**
     85      * spoutsSize、myIdx实现了一个spout读取多个流tracks。
     86      */
     87     @Override
     88     public void open(Map conf, TopologyContext context,
     89             SpoutOutputCollector collector) {
     90         int spoutsSize = context
     91                 .getComponentTasks(context.getThisComponentId()).size();
     92         int myIdx = context.getThisTaskIndex();
     93         String[] tracks = ((String) conf.get("track")).split(",");
     94         StringBuffer tracksBuffer = new StringBuffer();
     95         for (int i = 0; i < tracks.length; i++) {
     96             if (i % spoutsSize == myIdx) {
     97                 tracksBuffer.append(",");
     98                 tracksBuffer.append(tracks[i]);
     99             }
    100         }
    101 
    102         if (tracksBuffer.length() == 0)
    103             throw new RuntimeException("No track found for spout"
    104                     + " [spoutsSize:" + spoutsSize + ", tracks:"
    105                     + tracks.length + "] the amount"
    106                     + " of tracks must be more then the spout paralellism");
    107 
    108         this.track = tracksBuffer.substring(1).toString();
    109 
    110         user = (String) conf.get("user");
    111         password = (String) conf.get("password");
    112 
    113         credentials = new UsernamePasswordCredentials(user, password);
    114         credentialProvider = new BasicCredentialsProvider();
    115         credentialProvider.setCredentials(AuthScope.ANY, credentials);
    116         this.collector = collector;
    117     }
    118 
    119     @Override
    120     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    121         declarer.declare(new Fields("criteria", "tweet"));
    122     }
    123 }
    View Code

     通过这种技术,可以在数据源间分布收集器。相同的技术可以被应用在其他的场景-例如,从web服务器收集日志文件。PS:没有试过。

    2.Bolt可以使用emit(streamId, tuple)发射元组到多条流,每条流由字符串streamId来识别。然后,在TopologyBuilder 中,你可以决定订阅哪条流。

      没有试过。2个疑问:如何declare呢?spout有这个功能么?

    解答:1.declareOutputFields()方法中声明多条流,不就可以了。

    1 public void declareOutputFields(OutputFieldsDeclarer declarer) {
    2         declarer.declare(new Fields("line"));
    3         declarer.declareStream("second", new Fields("line2"));
    4     }

            2.Bolt和spout的实现来看,应该都是可以的。

     

    3.BaseRichSpout是否是自动调用ack方法的,实现IBasicBolt接口可以自动ack?

        BaseBasicBolt,  is used to do the acking automatically.意思就是说,这个是自动调用ack的 (测试结果:使用BaseBasicBolt,不主动使用input.ack(),那么storm ui界面上看不到ack的个数。)so,最好使用input.ack().。PS:目前的项目编程是如下方法。

    collector.emit();
    input.ack();
    

     通过IBasicBolt可以自动ack,用法如下。Storm UI可以看到该bolt的ack数目。

     1 public class TotalBolt implements IBasicBolt{
     2     
     3     private static final long serialVersionUID = 1L;
     4     static Integer Total = new Integer(0);
     5   //必须要实现的方法,使用BasicOutputCollector
     6     public void execute(Tuple input,BasicOutputCollector collector) {
     7         try {
     8             String clear  = input.getStringByField("PROVINCE_ID");
     9             Total++;
    10             collector.emit(new Values(Total));
    11         } catch (IllegalArgumentException e) {
    12             if (input.getSourceStreamId().equals("signals24Hour")) {
    13                 Total = 0;
    14             }
    15         }
    16     }
    17 }

    4.关于bolt的锚定

    记录原始的spout实例的最好方式是在消息元组中包含一个原始spout的引用。这个技术叫做锚定。

    collector.emit(tuple,new Values(word)); 
    
    什么是锚定?(如果这样可以自动重发的话,确实是不错的选择。????测试:测试无法模拟消息失败,我采用主动调用collector.fail(input),但是没看到重发消息的现象,原因是因为重发消息是不调用nextTuple()方法的,因此日志中看不到自己写的日志输出,实际上是重发了的。)
    为tuple tree中指定的节点增加一个新的节点,我们称之为锚定(anchoring)。
    采用_collector.emit( tuple , new Values(word) )就是锚定了。锚定是分段的,可以一部分bolt锚定。

    每个消息都通过这种方式被锚定:把输入消息作为emit方法的第一个参数。因为word消息被锚定在了输入消息上,这个输入消息是spout发送过来的tuple tree的根节点,如果任意一个word消息处理失败,派生这个tuple tree那个spout 消息将会被重新发送。(锚定的好处是什么)

    但是,这样会不会导致 元组被重发,计数重复? 会。

    系统使用一种哈希算法来根据spout消息的messageId确定由哪个acker跟踪此消息派生出来的tuple tree。

    下面这张图是OutputCollector(Bolt)中的emit方法:

     

     

     测试结果:做了anchors锚定后,没有看到实际的效果。

     自动锚定和ack的接口:IBasicBolt

    A lot of bolts follow a common pattern of reading an input tuple, emitting tuples based on it, and then acking the tuple at the end of the execute method. These bolts fall into the categories of filters and simple functions. Storm has an interface called IBasicBolt that encapsulates this pattern for you. The SplitSentence example can be written as a IBasicBolt like follows:

    public class SplitSentence implements IBasicBolt {
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String sentence = tuple.getString(0);
                for(String word: sentence.split(" ")) {
                    collector.emit(new Values(word));
                }
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }        
        }

    This implementation is simpler than the implementation from before and is semantically identical. Tuples emitted to BasicOutputCollectorare automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.

    In contrast, bolts that do aggregations or joins may delay acking a tuple until after it has computed a result based on a bunch of tuples. Aggregations and joins will commonly multi-anchor their output tuples as well. These things fall outside the simpler pattern of IBasicBolt.

    5 集群的各级容错

    5. 疑问:主动调用input.ack(),日志中没有看到调用了spout的ack方法?这是为什么呢?

        原因:Spout中SpoutOutputCollector.emit()方法中,没有加入messageID,也就是说采用了以下第一种emit方法,用第二种方法便可以看到。加入后在storm ui界面看到。

    SpoutOutputCollector的emit方法:(messageID用long或者Integer型都是可以的。If the messageId was null, Storm will not track the tuple and no callback will be received. The emitted values must be immutable.)

    1 collector.emit(new Values(str), messageID++);//messageID is type long.

    一个消息默认被成功处理的timeOut是30s,超过30s就会触发spout的fail方法。这个值可以根据实际的集群情况进行调整。在Topology中 Conf conf.setMessageTimeoutSecs(int secs) 设置。

    storm ui界面:

  • 相关阅读:
    【无中生有】---4----数据库设计-3
    【无中生有】---2---数据库设计-1
    redis3.0的demo实验
    redis3.0的demo实验
    那些在学习iOS开发前就应该知道的事(part 1)
    模板文件中的插件嵌入点列表
    图像处理框架 Core Image 介绍
    sharepoint services
    DiscuzX2.5数据库字典 值得学习
    Sublime Text 使用技巧
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/3605091.html
Copyright © 2011-2022 走看看