21、SkyWalking源码分析Collector接收Trace数据

1. 概述

分布式链路追踪系统,链路的追踪大体流程如下:

1、 Agent收集Trace数据;
2、 Agent发送Trace数据给Collector;
3、 Collector接收Trace数据
4、 Collector存储Trace数据到存储器,例如,数据库;

本文主要分享【第三部分】 SkyWalking Collector 接收 Trace 数据

友情提示:Collector 接收到 TraceSegment 的数据,对应的类是 Protobuf 生成的。考虑到更加易读易懂,本文使用 TraceSegment 相关的原始类

大体流程如下:

*

  • Collector 接收到 TraceSegment 数据后,进行构建
  • 【蓝色流程】构建成功,进行流式处理,最终存储到存储器( 例如,ES / H2 )。
  • 【粉色流程】构建失败,写入 Buffer 文件进行暂存。
  • 【绿色流程】后台线程,定时读取 Buffer 文件,重新提交构建。

什么是构建

从TraceSegment 数据中,会构建出更多的数据维度,如下图所示:

*

构建的过程,本文只分享调用的过程,具体怎么生成新的数据,数据的流式处理与存储,在 《SkyWalking 源码解析 —— Collector 存储 Trace 数据》 详细解析。

为什么构建会失败

在TraceSegment 里的数据结构,例如操作名( operationName )和操作编号( operationId ) ,在 《SkyWalking 源码分析 —— Agent 收集 Trace 数据》 中我们可以看到,考虑到网络传输,优先使用 operationId ,若不存在( 例如操作还未注册,或者注册了 Agent 未同步到本地 ),则使用 operationName

但是,Collector 构建过程时,要求的是 operationId ,如果传递的是 operationName 时,需要将 operationName 转换成 operationId 。若此时 operationName 未注册时,则无法获取到 operationId ,导致构建失败

那么有胖友可能有疑惑,在构建过程中,注册 operationName 呢?答案是不行,
《SkyWalking 源码分析 —— Agent DictionaryManager 字典管理》「2.2 操作的同步 API」 中,我们可以看到,operationName 的注册,是异步的过程。因而,即使构建的过程中,调用注册,也无法获得 operationId

涉及的逻辑点比较多,如果胖友理解不能,下面我们可以直接看代码。

2. TraceSegmentServiceHandler

我们先来看看 API 的定义,TraceSegmentService.proto ,如下图所示:

*

TraceSegmentServiceHandler#collect(Application, StreamObserver<ApplicationMapping>), 代码如下:

  • 第 51 行:调用 ITraceSegmentService#send(UpstreamSegment) 方法,处理一条 TraceSegment 。

2.1 TraceSegmentService

org.skywalking.apm.collector.agent.stream.service.trace.ITraceSegmentService ,继承 Service 接口,TraceSegment 服务接口。

org.skywalking.apm.collector.agent.stream.worker.trace.ApplicationIDService ,实现 IApplicationIDService 接口,TraceSegment 服务实现类。

  • 实现了 #send(UpstreamSegment) 方法,代码如下:

    • 第 40 至 41 行:创建 SegmentParse 对象,后调用 SegmentParse#parse(UpstreamSegment, Source) 方法,解析并处理 TraceSegment 。

2.2 SegmentParse

org.skywalking.apm.collector.agent.stream.parser.SegmentParse ,Segment 解析器。属性如下:

#parse(UpstreamSegment, Source) 方法,解析并处理 TraceSegment 。在该方法里,我们会看到,本文开头提到的【构造】。整个构造的过程,实际分成两步:1)预构建;2)执行构建。代码如下:

  • 第 88 至 89 行:从 segment 参数中,解析出 :

  • traceIds ,关联的链路追踪全局编号

  • segmentObject ,TraceSegmentObject 对象。

  • 第 91 行:创建 SegmentDecorator 对象。该对象的用途,在 「2.3 Standardization 标准化」 统一解析。

  • ——– 构建失败 ——–

  • 第 94 行:调用 #preBuild(List<UniqueId>, SegmentDecorator) 方法,预构建

  • 第 97 至 99 行:调用 #writeToBufferFile() 方法,将 TraceSegment 写入 Buffer 文件暂存。为什么会判断 source == Source.Agent 呢?#parse(UpstreamSegment, Source) 方法的调用,共有两个 Source

  • 目前我们看到 TraceSegmentService 的调用使用的是 Source.Agent

  • 而后台线程,定时调用该方法重新构建使用的是 Source.Buffer ,如果不加盖判断,会预构建失败重复写入。

  • 第 100 行:返回 false ,表示构建失败。

  • ——– 构建成功 ——–

  • 第 106 行:调用 #notifyListenerToBuild() 方法,通知 Span 监听器们,执行构建各自的数据。在 《SkyWalking 源码解析 —— Collector 存储 Trace 数据》 详细解析。

  • 第 109 行:调用 buildSegment(id, dataBinary) 方法,执行构建 TraceSegment 。

  • 第 110 行:返回 true ,表示构建成功。

  • 第 112 至 115 行:发生 InvalidProtocolBufferException 异常,返回 false ,表示构建失败。

2.2.1 预构建

#preBuild(List<UniqueId>, SegmentDecorator) 方法,前置构建,用于通过不同的监听器,对 TraceSegment 进行构建,生成不同的数据。在该过程中,会发生我们在文章头所说的,”为什么构建会失败“。代码如下:

2.2.2 写入 Buffer 文件

#writeToBufferFile(id, upstreamSegment) 方法,将 TraceSegment 写入 Buffer 文件。代码如下:


org.skywalking.apm.collector.agent.stream.parser.standardization.SegmentStandardizationWorker ,继承 AbstractLocalAsyncWorker 抽象类,TraceSegment 标准化 Worker ,负责将接收到的 TraceSegment 异步写到 Buffer 文件。

2.3 Standardization 标准化

本小节涉及到的类如下图:

*

我们先来说说,什么叫 standardization 标准化?其实就是我们在文章开头说的”例如将 operationName 转换成 operationId“。

2.3.1 StandardBuilder

org.skywalking.apm.collector.agent.stream.parser.standardization.StandardBuilder ,标准化 Builder 接口。

  • 定义了 #toBuilder() 接口方法,转换成 Builder 。感觉这个接口方法怪怪的?不要捉急,等会看一个实现类就明白了。

StandardBuilder 有三个实现类:

怎么都是装饰者呢,而且恰好和一个数据结构对应?以 SpanDecorator 为例子,代码如下:

  • spanObject 属性,SpanObject ,Span 的 Protobuf 数据对象。

  • standardBuilder 属性,SpanObject 的 Builder 对象。

  • isOrigin 属性,是否是原始对象。

  • isOrigin = true ,使用 spanObject属性 。

  • isOrigin = false ,使用 standardBuilder 属性。

  • 在 Protobuf 里,数据修改值时,需要使用对应的 Builder 对象。通过使用装饰者设计模式,对使用者屏蔽细节,调用也更加方便。下面在来看看如下方法,是不是就更加明白了:

  • #toBuilder() 实现方法,创建 SpanObject 对应的 Builder ,并修改 isOrigin = false 。另外,会调用 standardBuilder 属性的 #toBuilder() 方法,目前在项目里,此处的 standardBuilder 属性为 SegmentDecorator 。

SegmentDecorator 、ReferenceDecorator 和 SpanDecorator 目的一致。

2.3.2 IdExchanger

org.skywalking.apm.collector.agent.stream.parser.standardization.IdExchanger ,编号兑换器接口

IdExchanger 有三个实现类:


ReferenceIdExchanger#exchange(standardBuilder, applicationId) 方法,代码如下:

SpanIdExchanger#exchange(standardBuilder, applicationId) 方法,类似,已经添加代码注释,胖友自己阅读理解。

3. Buffer 文件

本小节涉及到的类如下图:

*

我们先来看看 Buffer 包括哪些文件:

```java

yunai$ pwd
/Users/yunai/Java/buffer
yunai$ ls
data_20171205004132.sw offset_20171205004132.sw
```

  • Data 文件,记录 TraceSegment 具体数据,通过 SegmentBufferManager 管理。
  • Offset 文件,记录偏移,包括写入文件的名字和偏移,读取文件的名字和偏移,通过 OffsetManager 管理。
  • 从命名上,我们可以看出,这两种文件,文件名字格式为 类型_${时间}.sw ,并且相同类型,同时可以存在多个

org.skywalking.apm.collector.agent.stream.buffer.BufferFileConfig ,Buffer 文件配置 。

org.skywalking.apm.collector.agent.stream.buffer.Offset ,偏移 。

下面,我们来一起看看 Buffer 文件的初始化、写入、读取的三种操作过程。

3.1 初始化

SegmentBufferManager#initialize(ModuleManager) 方法,初始化 Offset 文件、Data 文件、定期读取 Buffer 文件的任务。代码如下:

  • 第 58 行:调用 OffsetManager#initialize() 方法,初始化 Offset 文件。

  • 第 60 至 63 行:创建 Buffer 文件夹成功( 意味着该文件夹不存在 ),调用 #newDataFile() ,创建 Data 文件。代码如下:

  • 第 116 至 119 行:创建的 Data 文件。文件名格式为,data_${yyyyMMddHHmmss}.sw

    • 第 121 行:调用 OffsetManager#setWriteOffset(writeFileName, writeFileOffset) 方法,设置 Offset 的写入的文件名和偏移。
  • 第 124 至 126 行:关闭的 Data 文件的 outputStream

  • 第 129 至 130 行:创建的 Data 文件的 outputStream

  • 第 66 至 77 行:获得 Offset 的写入的 Data 文件,并创建对应的 outputStream 。

  • 第 80 行:调用 SegmentBufferReader#initialize(ModuleManager) 方法,初始化定期读取 Buffer 文件的任务。


OffsetManager#initialize() 方法,初始化 Offset 文件。代码如下:

  • 第 74 行:创建 Offer 对象。该对象包含了当前分别写入和读取的文件名与偏移量。

  • 第 60 至 63 行:创建 Buffer 文件夹成功( 意味着该文件夹不存在 ),调用 #createOffsetFile() ,创建 Data 文件。代码如下:

  • 第 114 至 116 行:创建的 Offset 文件。文件名格式为,offset_${yyyyMMddHHmmss}.sw

  • 第 118 至 121 行:设置 Offset 对象的写入和读取的文件名与偏移量都为。在上面的方法,此处的【空】,在 Data 文件创建时,会重新设置 Offset 。

    • 第 123 行:调用 #flush() 方法,写入 Offset 对象到 Offset 文件。代码如下:

      • 第 131 行:调用 Offset#serialize() 方法,序列化读写偏移,格式为 ${读取文件名},${读取文件偏移量},${写入文件名},${写入文件偏移量} 。
    • 第 133 至 142 行:写入 Offset 对象到 Offset 文件。写入方式为整行,如下图所示:

      *

  • 第 82 至 94 行:获得所有 Offset 文件,删除老的 Offset 文件,保留最后一个。若不存在 Offset 文件,则调用 #createOffsetFile() 方法,创建的 Offset 文件。

  • 第 98 至 99 行:从 Offset 文件的最后一行读取,反序列化到 Offset 对象。

  • 第 103 行:创建定义任务,延迟 10 秒,间隔 3 秒,调用 #flush() 方法,定时写入 Offset 对象到 Offset 文件。注意,所以 Offset 改变时,不是立即写入 Offset 文件,而是周期性刷盘


SegmentBufferReader#initialize(ModuleManager) 方法,初始化定期读取 Buffer 文件的任务。代码如下:

  • 第 56 行:创建定时任务,延迟 3 秒,间隔 3 秒,调用 #preRead() 方法,读取 Buffer 文件,将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。

3.2 写入

SegmentBufferManager#writeBuffer(UpstreamSegment) 方法,将 TraceSegment 写入 Buffer 文件,包括两个步骤:1)将 TraceSegment 写入 Data 文件;2)更新 Offset 文件的偏移。代码如下:

  • 第 94 至 95 行:调用 AbstractMessageLite#writeDelimitedTo(OutputStream) 方法,将 TraceSegment 写入 Data 文件。该方法包括 flush 操作,代码如下:

*

3.3 读取

SegmentBufferReader#preRead() 方法,读取 Buffer 文件,将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。另外该方法,会删除已经读取完成的 Data 文件。代码如下:

  • ——– 读取文件存在

  • 该情况发生于,Data 文件未被读取完成

  • 第 65 行:调用 #deleteTheDataFilesBeforeReadFile(readFileName) 方法,删除比指定文件早创建的 Data 文件,基于文件名带有创建时间

  • 第 67 至 68 行:调用 #read() 方法,读取 Buffer 文件,将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。另外,返回 true ,文件被全部读取完成、处理并删除。返回 false ,文件未被全部读取完成。

  • 第 133 至 134 行:创建 FileInputStream 对象,并跳转到读取位置。

  • 第 137 至 141 行:获取读取结束的位置。

  • 第 143 至 159 行:循环读取处理,直到到达读取文件上限位置

    • 第 144 至 146 行:从 Data 文件,读取一条 TraceSegment 。
    • 第 149 至 152 行:将 TraceSegment 提交给 SegmentParse 重新解析与构建处理。若解析处理失败,返回 false ,结束循环,等待下次读取处理。
    • 第 155 至 158 行:设置 Offset 对象的读取偏移。
  • 第 161 至 165 行:全部读取处理完成,关闭 InputStream ,同时删除读取的 Data 文件

  • 第 166 至 169 行:发生 IOException 异常,返回 false

  • 第 170 行:返回 true ,文件被全部读取完成、处理并删除。

  • 第 75 行:调用 #readEarliestCreateDataFile() 方法,循环顺序读取 Data 文件,直到有一个没读完。

    • 第 112 至 118 行:若第一个 Data 文件和 Offset 读取的文件相同,返回。说明,在上一次 #read() 方法里,没有读完。
    • 第 121 至 127 行:循环顺序调用 #read(readFile, readFileOffset) 方法,读取 Data 文件,直到有一个没读完。
  • ——– 读取文件不存在 ——–

  • 该情况发生于,Data 文件被全部读取完成,并且删除。

  • 第 73 行:调用 #deleteTheDataFilesBeforeReadFile(readFileName) 方法,删除比指定文件早创建的 Data 文件。

  • 第 75 行:调用 #readEarliestCreateDataFile() 方法,循环顺序读取 Data 文件,直到有一个没读完。

  • ——– 没有可读取的文件 ——–

  • 该情况发生于,Data 文件、Buffer 文件首次初始化创建,未设置可读文件名。

  • 第 79 行:调用 #readEarliestCreateDataFile() 方法,循环顺序读取 Data 文件,直到有一个没读完。

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: