59、Flink实战:Flink之FlinkCDC(下)

目录

0、 相关文章链接;

1、 FlinkCDC1.x中存在的痛点;

2、 FlinkCDC2.x的设计目标;

3、 FlinkCDC2.x的设计实现;

3、 1.整体概览;

3、 2.Chunk切分;

3、 3.Chunk分配;

3、 4.Chunk读取;

3、 5.Chunk汇报;

3、 6.Chunk分配;

4、 FlinkCDC2.x的核心原理分析;

4、 1.BinlogChunk中开始读取位置源码;

4、 2.读取低位点到高位点之间的Binlog;


0. 相关文章链接

[Flink文章汇总][Flink]

1. FlinkCDC1.x中存在的痛点

![*][nbsp]

2. FlinkCDC2.x的设计目标

![*][nbsp 1]

3. FlinkCDC2.x的设计实现

3.1. 整体概览

在对于有主键的表做初始化模式,整体的流程主要分为 5 个阶段:

1、 Chunk切分;
2、 Chunk分配;(实现并行读取数据&CheckPoint);
3、 Chunk读取;(实现无锁读取);
4、 Chunk汇报;
5、 Chunk分配;

![*][nbsp 2]

3.2. Chunk切分

![*][nbsp 3]

根据Netflix DBlog 的论文中的无锁算法原理,对于目标表按照主键进行数据分片,设置每个切片的区间为左闭右开或者左开右闭来保证数据的连续性。

3.3. Chunk分配

![*][nbsp 4]

将划分好的 Chunk 分发给多个 SourceReader,每个 SourceReader 读取表中的一部分数据, 实现了并行读取的目标。

同时在每个 Chunk 读取的时候可以单独做 CheckPoint,某个 Chunk 读取失败只需要单独执行该 Chunk 的任务,而不需要像 1.x 中失败了只能从头读取。

若每个SourceReader 保证了数据一致性,则全表就保证了数据一致性。

3.4. Chunk读取

![*][nbsp 5]

读取可以分为 5 个阶段:

1、 SourceReader读取表数据之前先记录当前的Binlog位置信息记为低位点;
2、 SourceReader将自身区间内的数据查询出来并放置在buffer中;
3、 查询完成之后记录当前的Binlog位置信息记为高位点;
4、 在增量部分消费从低位点到高位点的Binlog;
5、 根据主键,对buffer中的数据进行修正并输出;

通过以上5个阶段可以保证每个Chunk最终的输出就是在高位点时该Chunk中最新的数据,但是目前只是做到了保证单个 Chunk 中的数据一致性。

3.5. Chunk汇报

![*][nbsp 6]

在Snapshot Chunk 读取完成之后,有一个汇报的流程,如上图所示,即 SourceReader 需要将 Snapshot Chunk 完成信息汇报给 SourceEnumerator。

3.6. Chunk分配

![*][nbsp 7]

FlinkCDC 是支持全量+增量数据同步的,在 SourceEnumerator 接收到所有的 SnapshotChunk 完成信息之后,还有一个消费增量数据(Binlog) 的任务,此时是通过下发 Binlog Chunk给任意一个 SourceReader 进行单并发读取来实现的。

4. FlinkCDC2.x的核心原理分析

4.1. Binlog Chunk 中开始读取位置源码

MySqlHybridSplitAssigner

private MySqlBinlogSplit createBinlogSplit() {

final List assignedSnapshotSplit = snapshotSplitAssigner

.getAssignedSplits()

.values()

.stream()

.sorted(Comparator.comparing(MySqlSplit::splitId))

.collect(Collectors.toList());

Map<String, BinlogOffset> splitFinishedOffsets = snapshotSplitAssigner.getSplitFinishedOffsets();

final List finishedSnapshotSplitInfos = new ArrayList<>();

final Map<TableId, TableChanges.TableChange> tableSchemas = new HashMap<>();

BinlogOffset minBinlogOffset = BinlogOffset.INITIAL_OFFSET;

for (MySqlSnapshotSplit split : assignedSnapshotSplit) {

// find the min binlog offset

BinlogOffset binlogOffset = splitFinishedOffsets.get(split.splitId());

if (binlogOffset.compareTo(minBinlogOffset) < 0) {

minBinlogOffset = binlogOffset;

}

finishedSnapshotSplitInfos.add(

new FinishedSnapshotSplitInfo(

split.getTableId(),

split.splitId(),

split.getSplitStart(),

split.getSplitEnd(),

binlogOffset));

tableSchemas.putAll(split.getTableSchemas());

}

final MySqlSnapshotSplit lastSnapshotSplit = assignedSnapshotSplit.get(assignedSnapshotSplit.size() - 1).asSnapshotSplit();

return new MySqlBinlogSplit(

BINLOG_SPLIT_ID,

lastSnapshotSplit.getSplitKeyType(),

minBinlogOffset,

BinlogOffset.NO_STOPPING_OFFSET,

finishedSnapshotSplitInfos,

tableSchemas

);

}

4.2. 读取低位点到高位点之间的Binlog

BinlogSplitReader

```java 
/**
 * Returns the record should emit or not.
 *
 * <p>The watermark signal algorithm is the binlog split reader only sends the binlog event
 * that
 * belongs to its finished snapshot splits. For each snapshot split, the binlog event is valid
 * since the offset is after its high watermark.
 *
 * <pre> E.g: the data input is :
 * snapshot-split-0 info : [0, 1024) highWatermark0
 * snapshot-split-1 info : [1024, 2048) highWatermark1
 * the data output is:
 * only the binlog event belong to [0, 1024) and offset is after highWatermark0
 * should send,
 * only the binlog event belong to [1024, 2048) and offset is after highWatermark1 should
 * send.
 * </pre>
 */
private boolean shouldEmit(SourceRecord sourceRecord) {
    if (isDataChangeRecord(sourceRecord)) {
   
     
        TableId tableId = getTableId(sourceRecord);
        BinlogOffset position = getBinlogPosition(sourceRecord);
        // aligned, all snapshot splits of the table has reached max highWatermark
        if (position.isAtOrBefore(maxSplitHighWatermarkMap.get(tableId))) {
            return true;
        }
        Object[] key =
                getSplitKey(
                        currentBinlogSplit.getSplitKeyType(),
                        sourceRecord,
                        statefulTaskContext.getSchemaNameAdjuster()
                );
        for (FinishedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
            if (RecordUtils.splitKeyRangeContains(
                    key, splitInfo.getSplitStart(), splitInfo.getSplitEnd())
                    && position.isAtOrBefore(splitInfo.getHighWatermark())) {
                return true;
            }
        }
        // not in the monitored splits scope, do not emit
        return false;
    }
    // always send the schema change event and signal event
    // we need record them to state of Flink
    return true;
}

--------------------

**注:**此博客根据某马2020年贺岁视频改编而来 -> [B站网址][B]

**注:其他相关文章链接由此进 ->**[Flink文章汇总][Flink]

**注:**此博文为介绍FlinkCDC2.x的相关知识,对应FlinkCDC的详细使用可以查看[Flink(58):Flink之FlinkCDC(上)][Flink_58_Flink_FlinkCDC]

**注:**此博文中的相关内容和图片截取至网上Flink相关公开内容

--------------------

  


版权声明:本文不是「本站」原创文章,版权归原作者所有 | [原文地址:][Link 1]


[Flink]: https://blog.csdn.net/yang_shibiao/article/details/122570051
[nbsp]: https://cloud.cxykk.com/images/2024/2/6/101/1707184885421.png
[nbsp 1]: https://cloud.cxykk.com/images/2024/2/6/101/1707184892864.png
[nbsp 2]: https://cloud.cxykk.com/images/2024/2/6/102/1707184978470.png
[nbsp 3]: https://cloud.cxykk.com/images/2024/2/6/103/1707184985783.png
[nbsp 4]: https://cloud.cxykk.com/images/2024/2/6/103/1707184993355.png
[nbsp 5]: https://cloud.cxykk.com/images/2024/2/6/103/1707185004873.png
[nbsp 6]: https://cloud.cxykk.com/images/2024/2/6/103/1707185012987.png
[nbsp 7]: https://cloud.cxykk.com/images/2024/2/6/103/1707185021080.png
[B]: https://www.bilibili.com/video/BV1oX4y1K7kM
[Flink_58_Flink_FlinkCDC]: https://blog.csdn.net/yang_shibiao/article/details/122774389
[Link 1]: https://yangshibiao.blog.csdn.net/article/details/122781363