1. Trident入门
Trident
-------------------
三叉戟
storm高级抽象,支持有状态流处理;
好处是确保消费被处理一次;
以小批次方式处理输入流,得到精准一次性处理 ;
不再使用bolt,使用functions、aggreates、filters以及states。
Trident Tuple: trident top的数据模型,trident处理数据的单元;
每个tuple有预定义的字段列表构成,字段类型可以是byte;
character,integer,long,float,double,Boolean or byte array。
Trident functions: 包含修改tuple的业务逻辑,输入的是tuple的字段,输出多个tuple。
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;
/**
* 求和函数
*/
public class SumFunction extends BaseFunction {
@Override
public void execute(TridentTuple input, TridentCollector collector) {
Integer num1 = input.getInteger(0);
Integer num2 = input.getInteger(1);
int sum = num1 + num2;
collector.emit(new Values(sum));
}
}
如果tuple有a, b, c, d四个field,只有a和b作为输入传给function,functions会生成新的sum字段,
sum字段和输入的元祖进行合并,生成一个完成tuple,因此,新的tuple的总和字段个数是a, b, c, d, sum。
Trident Filter
--------------------
1. 描述
获取字段集合作为输入,输出boolean,如果反悔true,tuple在流中保留,否则删除,
a, b, c, d, sum是元祖的字段,sum作为输入传递给filter,判断sum是否为偶数,
如果是偶数,tuple(a, b, c, d, sum)保留,否则tuple删除。
2. 代码
import org.apache.storm.trident.operation.BaseFilter;
import org.apache.storm.trident.tuple.TridentTuple;
/**
* 校验是否是偶数的过滤器
*/
public class CheckEvenFilter extends BaseFilter {
@Override
public boolean isKeep(TridentTuple input) {
Integer sum = input.getInteger(0);
if (sum % 2 == 0) {
return true;
}
return false;
}
}
Trident projections
--------------------
1. 描述
投影操作中,trident值保留在投影中制定的字段,
x, y, z --> projection(x) --> x
2. 调用投影的方式
mystream.project(new fields("x"));
写一个topology
import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; public class PrintFunction extends BaseFunction { @Override public void execute(TridentTuple input, TridentCollector collector) { Integer sum = input.getInteger(0); System.out.println(this.getCLass.getSimpleName + ": " + sum); } }
import com.google.common.collect.ImmutableList; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.testing.FeederBatchSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class TridentTopologyApp { public static void main(String[] args) { // 创建topology TridentTopology topology = new TridentTopology(); // 创建spout FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b", "c", "d")); // 创建流 Stream stream = topology.newStream("spout", testSpout); stream.shuffle().each(new Fields("a", "b"), new SumFunction(), new Fields("sum")).parallelismHint(1) .shuffle().each(new Fields("sum"), new CheckEvenFilter()).parallelismHint(1) .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("xxx")).parallelismHint(1); // 本地提交 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TridentDemo", new Config(), topology.build()); // 测试数据 testSpout.feed(ImmutableList.of(new Values(1, 2, 3, 4))); testSpout.feed(ImmutableList.of(new Values(2, 3, 4, 5))); testSpout.feed(ImmutableList.of(new Values(3, 4, 5, 6))); testSpout.feed(ImmutableList.of(new Values(4, 5, 6, 7))); } }
输出结果
SumFunction:1, 2 CheckEvenFilter:3 PrintFunction: 3 SumFunction:2, 3 CheckEvenFilter:5 PrintFunction: 5 SumFunction:3, 4 CheckEvenFilter:7 PrintFunction: 7 SumFunction:4, 5 CheckEvenFilter:9 PrintFunction: 9
加入一个求平均数的函数
import org.apache.storm.trident.operation.BaseFunction; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.tuple.TridentTuple; /** * 求平均值方法 */ public class AverageFunction extends BaseFunction { @Override public void execute(TridentTuple input, TridentCollector collector) { int a = input.getIntegerByField("a"); int b = input.getIntegerByField("b"); int c = input.getIntegerByField("c"); int d = input.getIntegerByField("d"); int sum = input.getIntegerByField("sum"); float avg = (float) ((a+b+c+d+sum) / 5.0); System.out.println(this.getClass().getSimpleName() + ": avg = " + avg); } }
import com.google.common.collect.ImmutableList; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.testing.FeederBatchSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class TridentTopologyApp { public static void main(String[] args) { // 创建topology TridentTopology topology = new TridentTopology(); // 创建spout FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b", "c", "d")); // 创建流 Stream stream = topology.newStream("spout", testSpout); stream.shuffle().each(new Fields("a", "b"), new SumFunction(), new Fields("sum")).parallelismHint(1) .shuffle().each(new Fields("sum"), new CheckEvenFilter()).parallelismHint(1) .shuffle().each(new Fields("sum"), new PrintFunction(), new Fields("res")).parallelismHint(1) .shuffle().each(new Fields("a", "b", "c", "d", "sum"), new AverageFunction(), new Fields("avg")).parallelismHint(1); // 本地提交 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TridentDemo", new Config(), topology.build()); // 测试数据 testSpout.feed(ImmutableList.of(new Values(1, 2, 3, 4))); testSpout.feed(ImmutableList.of(new Values(2, 3, 4, 5))); testSpout.feed(ImmutableList.of(new Values(3, 4, 5, 6))); testSpout.feed(ImmutableList.of(new Values(4, 5, 6, 7))); } }
2. Trident聚合函数
分区聚合
import com.google.common.collect.ImmutableList; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.testing.FeederBatchSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class TridentTopologyApp2 { public static void main(String[] args) { // 创建topology TridentTopology topology = new TridentTopology(); // 创建spout FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("a", "b")); // 创建流 Stream stream = topology.newStream("testSpout", testSpout); stream.shuffle().each(new Fields("a", "b"), new MyFilter1()).parallelismHint(1) .global().each(new Fields("a", "b"), new MyFilter2()).parallelismHint(1) .partitionBy(new Fields("a")) //.each(new Fields("a", "b"), new MyFunction1(), new Fields("none")).parallelismHint(1) .partitionAggregate(new Fields("a"), new MyCount(), new Fields("count")) .each(new Fields("count"), new MyPrintFunction1(), new Fields("xxx")).parallelismHint(1); // 本地提交 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("TridentDemo2", new Config(), topology.build()); // 测试数据 testSpout.feed(ImmutableList.of(new Values(1, 2))); testSpout.feed(ImmutableList.of(new Values(2, 3))); testSpout.feed(ImmutableList.of(new Values(2, 4))); testSpout.feed(ImmutableList.of(new Values(3, 5))); } }
批次聚合