在开源搜索引擎Iveely的0.8.0中,我们有提到Iveely Computing实时计算平台,因为Iveely搜索引擎也是基于这个平台做的开发,因此,我们可以利用这个平台,轻松构建分布式实时应用程序。在开始构建程序之前,请按照这里部署Iveely Computing,确定部署无误之后,我们可以从下面代码开始学习。
不管是hadoop还是storm,都会在入门的时候,有一个WordCount示例,Iveely Computing也不例外。
首先,WordCount代码如下:
/* * To change this license header, choose License Headers in Project Properties. * To change this template file, choose Tools | Templates * and open the template in the editor. */ package iveely.computing.example; import com.iveely.computing.api.FieldsDeclarer; import com.iveely.computing.api.IInput; import com.iveely.computing.api.IOutput; import com.iveely.computing.api.StreamChannel; import com.iveely.computing.api.TopologyBuilder; import com.iveely.computing.api.TopologySubmitter; import com.iveely.computing.api.Tuple; import java.util.HashMap; import java.util.Random; import java.util.TreeMap; /** * * @author liufanping@iveely.com */ public class WordCount { public static class WordInput extends IInput { /** * Output data to collector. */ private StreamChannel _channel; /** * Count of emitted. */ private int emitCount = 0; @Override public void start(HashMap<String, Object> conf, StreamChannel channel) { _channel = channel; } @Override public void declareOutputFields(FieldsDeclarer declarer) { declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0}); } @Override public void nextTuple() { final String[] words = new String[]{"iveely", "mike", "jackson", "golda", "bertels", "blue", "china", "pan", "qq", "baidu", "ABC", "desk", "pen", "music", "play", "mouse", "mac", "windows", "microsoft", "c++", "java"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; final int count = 1; System.out.println(getName() + ":" + word); _channel.emit(word, count); } @Override public void end(HashMap<String, Object> conf) { } @Override public void toOutput() { _channel.addOutputTo(new WordOutputA()); _channel.addOutputTo(new WordOutputB()); } } public static class WordOutputA extends IOutput { /** * Output data to collector. */ private StreamChannel _channel; @Override public void start(HashMap<String, Object> conf, StreamChannel channel) { _channel = channel; } @Override public void declareOutputFields(FieldsDeclarer declarer) { declarer.declare(new String[]{"word", "totalCount"}, null); } @Override public void execute(Tuple tuple) { String word = (String) tuple.get(0).toString(); Integer defaultCount = Integer.parseInt(tuple.get(1).toString()); _channel.emit(word, defaultCount); } @Override public void end(HashMap<String, Object> conf) { // Output map to data base or others. } @Override public void toOutput() { } } public static class WordOutputB extends IOutput { private TreeMap<String, Integer> map; /** * Output data to collector. */ private StreamChannel _channel; @Override public void start(HashMap<String, Object> conf, StreamChannel channel) { map = new TreeMap<>(); _channel = channel; } @Override public void declareOutputFields(FieldsDeclarer declarer) { declarer.declare(new String[]{"word", "totalCount"}, null); } @Override public void execute(Tuple tuple) { String word = (String) tuple.get(0).toString(); System.out.println(this.getName() + ":" + word); Integer defaultCount = Integer.parseInt(tuple.get(1).toString()); if (map.containsKey(word)) { int currentCount = map.get(word); map.put(word, defaultCount + currentCount); } else { map.put(word, defaultCount); } } @Override public void end(HashMap<String, Object> conf) { // Output map to data base or others. } @Override public void toOutput() { } } public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder("WordCount"); builder.setInput(new WordInput(), 1); builder.setSlave(2); builder.isLocalMode = false; builder.setOutput(new WordOutputB(), 2); builder.setOutput(new WordOutputA(), 2); TopologySubmitter.submit(builder, args); } }
其次,代码解析。
代码中,包含了四个部分:
类WordInput:数据输入流,继承IInput,用于数据的产生。
类WordOutputA:数据输出流,继承IOutput , 用于数据的输出。
类WordOutputB:同上。
main函数:执行的入口。
第三,IInput。
public void start(HashMap<String, Object> conf, StreamChannel channel)
类似于初始化函数,在开始产生数据之前,做的一些初始化,conf是一些配置文件,在main函数中的ToplogyBuilder中通过put方法设置进去,属于用户自定义数据。StreamChannel是流数据发送接口。在调用NextTuple的时候,会用到。本函数仅会调用一次。
public void declareOutputFields(FieldsDeclarer declarer)
用于声明本次产生数据的格式,declarer.declare(new String[]{"word", "defaultCount"}, new Integer[]{0, 1}); 表示输出数据有两个字段,第一个是word本身,第二个是默认的数量,后面new Integer[]{0}非常重要,是数据分发的分组,0表示,按照“word”本身进行分组。这样不同的word就会分发到不同的处理节点,本函数仅会调用一次。
public void nextTuple()
是数据的真正产生源,此方法会不断被调用,直到产生数据完成,产生完成是通过_channel.emitEnd();方法来表示完成,是需要手动调用,否则程序将会一直无休止运行下去,产生数据之后,一定要记得将数据发送出去:_channel.emit(word, count);。
public void end(HashMap<String, Object> conf)
当在nextTuple中调用_channel.emitEnd();之后,会调用此方法,此方法类似于程序推出前的清理工作,此方法仅调用一次。
public void toOutput()
此方法表示声明数据输出到的下一步流程。例如_channel.addOutputTo(new WordOutputA());表示输出的数据将会被WordOutoutA继续处理。当然可以addOutputTo到更多的IOutput。
第四,IOutput。
在IOutput中,大部分均和IInput中类似,不同的在于IInput中有一个nextTuple,而在IOutput中,是 public void execute(Tuple tuple),此方法的调用方式为接收到数据之后才会触发。IOutput依然是可以输出到多个IOutput中去。
第五,main函数。
main函数是程序的执行入口,对于我们的程序依然是,main函数中,包含两种模式,在这两种模式下,代码略有不同。
调试模式
用于本地调试,不需要部署Iveely Computing,可断点,跟调试一个普通程序一样,此刻main函数应该是这样。
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder("WordCount"); builder.setInput(new WordInput(), 1); builder.setSlave(2); builder.isLocalMode = true; builder.setOutput(new WordOutputB(), 2); builder.setOutput(new WordOutputA(), 2); TopologySubmitter.submit(builder, args); }
在builder参数中的isLocalMode设置为true即可。
部署模式
用于将程序提交到Iveely Computing,在调试模式下,确定程序无误之后,提交给Iveely Computing运行。
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder("WordCount"); builder.setInput(new WordInput(), 1); builder.setSlave(2); builder.isLocalMode = false; builder.setOutput(new WordOutputB(), 2); builder.setOutput(new WordOutputA(), 2); TopologySubmitter.submit(builder, args); }
其余代码并无太大区别。其中setSlave表示期望给多个少机器节点运行,此处是两个,当不能满足这么多节点的时候,会根据当前最多的节点给予分配。 builder.setInput(new WordInput(), 1); 中的第二个参数表示给予1个线程读取数据。
编译文件到jar,参考这里的提交应用程序到Iveely Computing,并查看运行情况。
总结:上述是利用WordCount示例,大致对Iveely Computing的API做了一个介绍,如果有任何疑问,请邮件我:liufanping@iveely.com。