zoukankan      html  css  js  c++  java
  • 如何快速写一个分布式实时应用程序

        在开源搜索引擎Iveely的0.8.0中,我们有提到Iveely Computing实时计算平台,因为Iveely搜索引擎也是基于这个平台做的开发,因此,我们可以利用这个平台,轻松构建分布式实时应用程序。在开始构建程序之前,请按照这里部署Iveely Computing,确定部署无误之后,我们可以从下面代码开始学习。

        不管是hadoop还是storm,都会在入门的时候,有一个WordCount示例,Iveely Computing也不例外。

        首先,WordCount代码如下

    /*
     * To change this license header, choose License Headers in Project Properties.
     * To change this template file, choose Tools | Templates
     * and open the template in the editor.
     */
    package iveely.computing.example;
    
    import com.iveely.computing.api.FieldsDeclarer;
    import com.iveely.computing.api.IInput;
    import com.iveely.computing.api.IOutput;
    import com.iveely.computing.api.StreamChannel;
    import com.iveely.computing.api.TopologyBuilder;
    import com.iveely.computing.api.TopologySubmitter;
    import com.iveely.computing.api.Tuple;
    import java.util.HashMap;
    import java.util.Random;
    import java.util.TreeMap;
    
    /**
     *
     * @author liufanping@iveely.com
     */
    public class WordCount {
    
        public static class WordInput extends IInput {
    
            /**
             * Output data to collector.
             */
            private StreamChannel _channel;
    
            /**
             * Count of emitted.
             */
            private int emitCount = 0;
    
            @Override
            public void start(HashMap<String, Object> conf, StreamChannel channel) {
                _channel = channel;
            }
    
            @Override
            public void declareOutputFields(FieldsDeclarer declarer) {
                declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0});
            }
    
            @Override
            public void nextTuple() {
                final String[] words = new String[]{"iveely", "mike", "jackson", "golda", "bertels", "blue", "china", "pan", "qq", "baidu", "ABC", "desk", "pen", "music", "play", "mouse", "mac", "windows", "microsoft", "c++", "java"};
                final Random rand = new Random();
                final String word = words[rand.nextInt(words.length)];
                final int count = 1;
                System.out.println(getName() + ":" + word);
                _channel.emit(word, count);
            }
    
            @Override
            public void end(HashMap<String, Object> conf) {
    
            }
    
            @Override
            public void toOutput() {
                _channel.addOutputTo(new WordOutputA());
                _channel.addOutputTo(new WordOutputB());
            }
        }
    
        public static class WordOutputA extends IOutput {
    
            /**
             * Output data to collector.
             */
            private StreamChannel _channel;
    
            @Override
            public void start(HashMap<String, Object> conf, StreamChannel channel) {
                _channel = channel;
            }
    
            @Override
            public void declareOutputFields(FieldsDeclarer declarer) {
                declarer.declare(new String[]{"word", "totalCount"}, null);
            }
    
            @Override
            public void execute(Tuple tuple) {
                String word = (String) tuple.get(0).toString();
                Integer defaultCount = Integer.parseInt(tuple.get(1).toString());
                _channel.emit(word, defaultCount);
            }
    
            @Override
            public void end(HashMap<String, Object> conf) {
                // Output map to data base or others.
            }
    
            @Override
            public void toOutput() {
    
            }
        }
    
        public static class WordOutputB extends IOutput {
    
            private TreeMap<String, Integer> map;
    
            /**
             * Output data to collector.
             */
            private StreamChannel _channel;
    
            @Override
            public void start(HashMap<String, Object> conf, StreamChannel channel) {
                map = new TreeMap<>();
                _channel = channel;
            }
    
            @Override
            public void declareOutputFields(FieldsDeclarer declarer) {
                declarer.declare(new String[]{"word", "totalCount"}, null);
            }
    
            @Override
            public void execute(Tuple tuple) {
                String word = (String) tuple.get(0).toString();
                System.out.println(this.getName() + ":" + word);
                Integer defaultCount = Integer.parseInt(tuple.get(1).toString());
                if (map.containsKey(word)) {
                    int currentCount = map.get(word);
                    map.put(word, defaultCount + currentCount);
                } else {
                    map.put(word, defaultCount);
                }
            }
    
            @Override
            public void end(HashMap<String, Object> conf) {
                // Output map to data base or others.
            }
    
            @Override
            public void toOutput() {
    
            }
        }
    
        public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder("WordCount");
            builder.setInput(new WordInput(), 1);
            builder.setSlave(2);
            builder.isLocalMode = false;
            builder.setOutput(new WordOutputB(), 2);
            builder.setOutput(new WordOutputA(), 2);
            TopologySubmitter.submit(builder, args);
        }
    }

         其次,代码解析

         代码中,包含了四个部分:

         类WordInput:数据输入流,继承IInput,用于数据的产生。

         类WordOutputA:数据输出流,继承IOutput , 用于数据的输出。

         类WordOutputB:同上。

         main函数:执行的入口。

         第三,IInput

         public void start(HashMap<String, Object> conf, StreamChannel channel)

         类似于初始化函数,在开始产生数据之前,做的一些初始化,conf是一些配置文件,在main函数中的ToplogyBuilder中通过put方法设置进去,属于用户自定义数据。StreamChannel是流数据发送接口。在调用NextTuple的时候,会用到。本函数仅会调用一次。

         public void declareOutputFields(FieldsDeclarer declarer)

         用于声明本次产生数据的格式,declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0, 1}); 表示输出数据有两个字段,第一个是word本身,第二个是默认的数量,后面new Integer[]{0}非常重要,是数据分发的分组,0表示,按照“word”本身进行分组。这样不同的word就会分发到不同的处理节点,本函数仅会调用一次。

         public void nextTuple()

         是数据的真正产生源,此方法会不断被调用,直到产生数据完成,产生完成是通过_channel.emitEnd();方法来表示完成,是需要手动调用,否则程序将会一直无休止运行下去,产生数据之后,一定要记得将数据发送出去:_channel.emit(word, count);。

         public void end(HashMap<String, Object> conf)

         当在nextTuple中调用_channel.emitEnd();之后,会调用此方法,此方法类似于程序推出前的清理工作,此方法仅调用一次。

         public void toOutput()

         此方法表示声明数据输出到的下一步流程。例如_channel.addOutputTo(new WordOutputA());表示输出的数据将会被WordOutoutA继续处理。当然可以addOutputTo到更多的IOutput。

         第四,IOutput

         在IOutput中,大部分均和IInput中类似,不同的在于IInput中有一个nextTuple,而在IOutput中,是 public void execute(Tuple tuple),此方法的调用方式为接收到数据之后才会触发。IOutput依然是可以输出到多个IOutput中去。

         第五,main函数

         main函数是程序的执行入口,对于我们的程序依然是,main函数中,包含两种模式,在这两种模式下,代码略有不同。

         调试模式

         用于本地调试,不需要部署Iveely Computing,可断点,跟调试一个普通程序一样,此刻main函数应该是这样。

    public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder("WordCount");
            builder.setInput(new WordInput(), 1);
            builder.setSlave(2);
            builder.isLocalMode = true;
            builder.setOutput(new WordOutputB(), 2);
            builder.setOutput(new WordOutputA(), 2);
            TopologySubmitter.submit(builder, args);
        }

         在builder参数中的isLocalMode设置为true即可。

         部署模式

         用于将程序提交到Iveely Computing,在调试模式下,确定程序无误之后,提交给Iveely Computing运行。

     public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder("WordCount");
            builder.setInput(new WordInput(), 1);
            builder.setSlave(2);
            builder.isLocalMode = false;
            builder.setOutput(new WordOutputB(), 2);
            builder.setOutput(new WordOutputA(), 2);
            TopologySubmitter.submit(builder, args);
        }

          其余代码并无太大区别。其中setSlave表示期望给多个少机器节点运行,此处是两个,当不能满足这么多节点的时候,会根据当前最多的节点给予分配。 builder.setInput(new WordInput(), 1); 中的第二个参数表示给予1个线程读取数据。

          编译文件到jar,参考这里的提交应用程序到Iveely Computing,并查看运行情况。

          总结:上述是利用WordCount示例,大致对Iveely Computing的API做了一个介绍,如果有任何疑问,请邮件我:liufanping@iveely.com。

         

          背景:开源搜索引擎Iveely 0.8.0发布,终见天日

  • 相关阅读:
    第六章 实验报告
    第三次实验报告
    第五章 循环结构课后反思
    第二次实验报告
    第一次实验报告
    第一次作业
    第九章
    指针实验报告
    第七次实验报告
    第六章
  • 原文地址:https://www.cnblogs.com/liufanping/p/4494129.html
Copyright © 2011-2022 走看看