zoukankan      html  css  js  c++  java
  • Spout的实现步骤

    Spout的实现步骤:
    ·        对文件的改变进行分开的监听,并监视文件夹下有无新日志文件加入。
    ·        在数据得到了字段的说明后,将其转换成tuple。
    ·        声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。

    Spout的详细编码在Listing Three中显示。

    Listing ThreeSpoutopennextTupledelcareOutputFields方法的逻辑。
    1.  public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )  
    2.  {   
    3.             _collector = collector;  
    4.           try  
    5.           {  
    6.           fileReader  =  new BufferedReader(new FileReader(new File(file)));
    7.           }
    8.           catch (FileNotFoundException e)
    9.           {
    10.          System.exit(1);   
    11.          }
    12. }                                                         
    13.  
    14. public void nextTuple()
    15. {  
    16.          protected void ListenFile(File file)
    17.          {
    18.          Utils.sleep(2000);  
    19.          RandomAccessFile access = null;
    20.          String line = null;  
    21.             try  
    22.             {
    23.                 while ((line = access.readLine()) != null)
    24.                 {
    25.                     if (line !=null)
    26.                     {  
    27.                          String[] fields=null;
    28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\"+tupleInfo.getDelimiter());  
    29.                           else  
    30.                           fields = line.split  (tupleInfo.getDelimiter());  
    31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));
    32.                     }
    33.                }
    34.             }
    35.             catch (IOException ex){ }
    36.             }
    37. }  
    38.  
    39. public void declareOutputFields(OutputFieldsDeclarer declarer)
    40. {  
    41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
    42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)
    43.       {
    44.               fieldsArr = tupleInfo.getFieldList().get(i).getColumnName();
    45.       }
    46. declarer.declare(new Fields(fieldsArr));
    47. }     

    declareOutputFileds()决定了tuple发射的格式,这种话Bolt就能够用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有加入Spout就会进行读入而且发送给Bolt进行处理。

    很多其它精彩内容请关注:http://bbs.superwu.cn
    关注超人学院微信二维码:
  • 相关阅读:
    JDK1.8-Stream API使用
    JDK1.8-Collectors方法介绍
    SpringBoot程序启动原理及自动化配置的原理
    SpringBoot之spring.factories
    Spring注入Bean的几种方式
    SpringBoot+Security+JWT实现单点登录
    SpringCloudConfig + CloudBus + WebHooks +RibbitMQ,实现配置集中管理和自动刷新
    SpringBoot的WebMvcConfigurer介绍
    Spring 事务的理解
    4-1 自动生成spider模板的命令
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5176363.html
Copyright © 2011-2022 走看看