接下来就是域构建的操作
预构建要多后端采样进行处理
被采用的traceID会被记录到segemnt对象中
当TraceSegment被解析完成之后会被分配给resourceResolve进行处理
resourceResolve底层封装了dispatchManger,会将TraceSegment分配给不同的worker进行处理
实现字符串到id中的转化、生成span的时候,需要将字符串到id的转化
完成上面的解析之后,会将segment拷贝到segmentcoreinfo中进行暂存,并且依据segment的类型分配给不同的listen进行处理,输入是转化过后的segment
上面主要是segemntSpanListener对转化过后的segemnt进行再次解析
parseFirst方案的具体实现如下
对于notifyEntryListener干的事情主要是下面的形式
上面的三个listen并不会处理local的span
上面就完成了整个预构建的操作,接下来就是进行构建操作会调用segemntSpanListener的构建操作
notifyLocalListener
上述的三个 SpanListener 实现类中,全部都没有实现 LocalSpanListener 接口,所以在 trace-receiver-plugin 插件中并不会处理 Local 类型的 Span。
notifyExitListener
trace-receiver-plugin 插件中,只有 MultiScopesSpanListener 实现了 ExitSpanListener 接口,后面会单独介绍 MultiScopesSpanListener 对各类 Span 的处理,这里暂不展开。
到此,预构建(preBuild)操作就到此结束了。最后需要提醒的是,preBuild() 方法的返回值是一个 boolean 值,表示 exchange 操作是否已经将全部字符串转换成 id。
notifyListenerToBuild
如果预构建(preBuild)中的 exchange 过程已经将全部字符串转换成了相应的 id,则会通过 notifyListenerToBuild() 方法调用所有 SpanListener 实现的 build() 方法。这里重点来看 SegmentSpanListener 的实现:
首先会检测 TraceSegment 是否已被采样,它只会处理被采样的 TraceSegment。
设置 Segment 的 endpointName 字段。
将 Segment 交给 SourceReceiver 继续处理。
RecordStreamProcessor
SourceReceiver 底层封装的 DispatcherManager 会根据 Segment 选择相应的 SourceDispatcher 实现 —— SegmentDispatcher 进行分发。
SegmentDispatcher.dispatch() 方法中会将 Segment 中的数据拷贝到 SegmentRecord 对象中。
SegmentRecord 继承了 StorageData 接口,与前面介绍的 RegisterSource 以及 Metrics 的实现类似,通过注解指明了 Trace 数据存储的 index 名称的前缀(最终写入的 index 是由该前缀以及 TimeBucket 后缀两部分共同构成)以及各个字段对应的 field 名称,如下所示:
复制代码
// @Stream 注解的 name 属性指定了 index 的名称(index 前缀),processor 指定了处理该类型数据的 StreamProcessor 实现
@Stream(name = "segment", processor = RecordStreamProcessor.class...)
public class SegmentRecord extends Record {
// @Column 注解中指定了该字段在 index 中对应的 field 名称
@Setter @Getter @Column(columnName = "segment_id") private String segmentId;
@Setter @Getter @Column(columnName = "trace_id") private String traceId;
@Setter @Getter @Column(columnName = "service_id") private int serviceId;
@Setter @Getter @Column(columnName = "service_instance_id") private int serviceInstanceId;
@Setter @Getter @Column(columnName = "endpoint_name, matchQuery = true) private String endpointName;
@Setter @Getter @Column(columnName = "endpoint_id") private int endpointId;
@Setter @Getter @Column(columnName = "start_time") private long startTime;
@Setter @Getter @Column(columnName = "end_time") private long endTime;
@Setter @Getter @Column(columnName = "latency") private int latency;
@Setter @Getter @Column(columnName = "is_error") private int isError;
@Setter @Getter @Column(columnName = "data_binary") private byte[] dataBinary;
@Setter @Getter @Column(columnName = "version") private int version;
}
在 SegmentRecord 的父类 —— Record 中还定义了一个 timeBucket 字段(long 类型),对应的 field 名称是 "time_bucket"。
RecordStreamProcessor 的核心功能是为每个 Record 类型创建相应的 worker 链,这与前面介绍的 InventoryStreamProcessor 以及 MetricsStreamProcessor 类似。在 RecordStreamProcessor 中,每个 Record 类型对应的 worker 链中只有一个worker 实例 —— RecordPersistentWorker。
与前面介绍的 MetricsPersistentWorker 类型,RecordPersistentWorker 负责 SegmentRecord 数据的持久化:
如上图所示,RecordPersistentWorker 也继承了 PersistentWorker,写入流程大致如下图所示:
RecordPersistentWorker 有两个地方与 MetricsPersistentWorker 有些区别:
RecordPersistentWorker 中使用的 DataCache(以及 Window)实现是 NoMergeDataCache,它与 MergeDataCache 的唯一区别就是没有提供判断数据是否存在的 containKey() 方法,这样就只提供了缓存数据的功能,调用方无法合并重复数据。
当 NoMergeDataCache 中缓存的数据到达阈值之后,RecordPersistentWorker 会通过 RecordDAO 生成批量的 IndexRequest 请求,Trace 数据没有合并的情况,所以 RecordDAO 以及 IRecordDAO 接口没有定义 prepareBatchUpdate() 方法。
RecordDAO.perpareBatchInsert() 方法的具体实现如下:
复制代码
public IndexRequest prepareBatchInsert(Model model, Record record) throws IOException {
XContentBuilder builder = map2builder(storageBuilder.data2Map(record));
// 生成的是最终 Index 名称,这里的 Index 由前缀字符串(即"segment")+TimeBucket 两部分构成
String modelName = TimeSeriesUtils.timeSeries(model, record.getTimeBucket());
// 创建 IndexRequest 请求
return getClient().prepareInsert(modelName, record.id(), builder);
}
与 MetricsPersistentWorker 一样,RecordPersistentWorker 生成的全部 IndexRequest 请求会交给全局唯一的 BatchProcessEsDAO 实例批量发送到 ES ,完成写入。
到此,以 SegmentSpanListener 为主的 TraceSegment 处理线就介绍完了。