南皮县网站建设,wordpress底部添加工信部链接,制作网站学什么,云主机 网站吗Debezium日常分享系列之#xff1a;异步 Debezium 嵌入式引擎 动机目标非目标保留Kafka Connect模型计划的更改线程池并行运行源任务存储偏移量并发处理CDC事件禁用CDC事件的完全排序自定义记录处理器并行处理记录的选项存储偏移量引擎状态和生命周期防止资源泄漏异常处理退出… Debezium日常分享系列之异步 Debezium 嵌入式引擎 动机目标非目标保留Kafka Connect模型计划的更改线程池并行运行源任务存储偏移量并发处理CDC事件禁用CDC事件的完全排序自定义记录处理器并行处理记录的选项存储偏移量引擎状态和生命周期防止资源泄漏异常处理退出任务轮询循环引擎关闭辅助接口和对象其他未来可靠的更改测试 动机
Debezium有两种基本类型。一种类型是Kafka Connect的源连接器。另一种是独立的引擎可以嵌入到用户应用程序中或者包装在Debezium server,中这是一个独立的应用程序。在第一种情况下连接器的生命周期和连接器任务的执行由Kafka Connect管理而在后一种情况下任务的生命周期完全由Debezium引擎本身管理并且Debezium项目完全控制引擎内部任务的执行方式。DebeziumEngine接口的当前实现EmebeddedEngine以串行方式执行所有步骤。这包括执行事件转换和事件序列化。事件序列化不是由EmebddedEngine直接提供的而是由其扩展ConvertingEngine提供的它作为event processing的一部分实现。此外当前的EmebddedEngine实现不支持执行多个源任务即使源连接器支持比如SQL Server连接器。只执行第一个任务无论连接器配置提供了多少个任务。在大数据集和多核服务器时代使用单个线程处理来自数据库的所有CDC事件是一个明显的限制因素。提供一个新的DebeziumEngine接口的实现可以并行运行一些任务可以显著提高性能。新的实现和相关的更改还应该针对良好的测试覆盖率和测试新实现以及任何其他未来实现的易用性。
目标
提供一个新的实现允许在给定的连接器中运行多个任务如果连接器提供多个任务。在专用线程中运行可能耗时的代码例如事件转换或序列化。可选择禁用消息的完全排序以进一步提速。准备好未来的更改和新功能包括切换到Kafka Streams、通过REST调用将源任务委托给外部工作节点特别是为了能够在Kubernetes集群中的多个Pod上水平扩展连接器负载、与Debezium k8s操作员和UI更好地集成以及将Debezium引擎作为Quarkus扩展提供。另一个高级目标是调整当前的Debezium测试套件以使用DebeziumEngine接口而不是硬编码的EmebddedEngine实现。这样可以轻松切换到任何其他DebeziumEngine的实现从而使新的实现能够轻松地与当前的测试套件进行测试。
非目标
改变DebeziumEngine接口。对Debezium引擎进行任何其他更改。在连接器内部实现任何并行化例如在多个线程中跟踪数据库CDC更改。移除对Kafka Connect API的依赖。添加对在一个Debezium引擎中运行多个连接器的支持参见。添加对运行接收器连接器的支持。
保留Kafka Connect模型
由于Debezium引擎的主要目标是能够在Kafka之外执行Debezium可能会觉得为什么不利用这个机会摆脱Kafka的依赖。原因很简单这将是一个过于复杂的更改不仅影响到Debezium引擎。例如删除WorkerConfig将需要删除OffsetBackingStore这将需要删除OffsetStorageReader等等这将导致Debezium核心和连接器的重大变化。因此这应该作为一个单独的任务来完成需要一个专门的DDD来描述所有的更改并提出Kafka Connect接口和类的替代方案。
计划的更改
线程池
并发处理的实现将基于Java Executors框架。由于Debezium目前基于Java 11该项目引入的新并发特性即虚拟线程和结构化并发目前无法使用。然而预计将来将切换到虚拟线程一旦Debezium基于Java 21或更高版本。ThreadPoolExecutor将用于创建和管理线程池。ThreadPoolExecutor将由工厂方法Executors.newFixedThreadPool(int nThreads)创建。将有两个线程池一个用于并行运行任务另一个用于处理CDC事件流水线。在本文档的上下文中CDC事件流水线指的是记录转换的链最终还包括记录序列化和由用户提供的消费者进行处理。任务线程池中的工作线程数量将与任务数量相同。CDC事件流水线的工作线程数量将通过配置由用户指定或者使用默认值。默认值将为底层机器的核心数因为运行任务不是CPU密集型的操作。
并行运行源任务
Kafka Connect SourceTask的生命周期将被分成一个独立的任务。这些任务将在由Executors.newSingleThreadExecutor()创建的专用线程中执行。如果出现RetriableException任务将被重新启动。在其他情况下所有其他任务将被优雅地停止并且引擎将因任务异常而失败。
存储偏移量
并行运行任务对于存储偏移量似乎不是一个问题。任务与连接器/数据库的分区之间存在1:1的映射关系例如在SQL Server的情况下每个数据库都是一个单独的分区并且为每个数据库创建一个任务因此每个任务应该在偏移哈希映射中读取或写入自己的键。因此从多个任务读取偏移量不应引入任何并发问题并且对于写入OffsetStorageWriter被明确标记为线程安全。
并发处理CDC事件
当前的DebeziumEngine API部分将事件处理委托给用户提供的ChangeConsumer实现。更具体地说事件以批处理的方式作为列表传递给ChangeConsumer.handleBatch()这在许多情况下都是更有效的方式因为事件通常提交给另一个系统时批处理更高效。但是这导致我们无法构建完整的事件处理流水线这个流水线将在专用线程中运行。在将事件批量传递给ChangeConsumer之前会对记录应用用户定义的单条消息转换。在ConvertingEngineBuilder的情况下事件还将被序列化为SourceRecords然后传递给ChangeConsumer。这两个任务将在上述的ThreadPoolExecutor中并行运行。在将批处理传递给ChangeConsumer之前将等待批处理中所有记录的任务完成。实现还应尝试优化ConvertingEngineBuilder的记录序列化/反序列化。当前记录在传递给ChangeConsumer之前被序列化为SourceRecords然后在ChangeConsumer调用RecordCommitter时再次反序列化。在将记录处理委托给用户提供的ChangeConsumer的情况下似乎没有简单的方法可以避免这个序列化/反序列化过程而不破坏现有的API。下面概述了一个可能的解决方案尽管有点复杂。另一方面如果记录的处理由用户提供的Consumer完成我们可以通过存储原始记录来轻松避免反序列化步骤。记录将被传递给转换链序列化传递给提供的消费者如果成功则将原始存储的记录传递给RecordCommitter无需再次反序列化。
禁用CDC事件的完全排序
从性能上来说可以通过跳过消息排序并按照它们准备好的顺序传递消息来进一步提高速度。这在消息顺序不重要的场景下是有意义的例如在底层数据库仅接收插入操作的场景或者是在接收端应用程序通过消费CDC事件进行排序的情况下。虽然并不经常使用但DebeziumEngine还提供了其他处理更改记录的方法。用户可以提供一个仅处理记录的Consumer函数而不是实现ChangeConsumer。在这种情况下我们可以为处理CDC记录创建一个完整的流水线而且我们不需要以批处理的方式将记录传递给用户的实现。这允许我们单独处理每条消息如果处理一条记录需要更长时间例如由于记录的大小其他记录不需要被阻塞。这将导致消息的总顺序被打破。然而如上所述在某些特定的场景中这可能是有意义且可取的。新的实现应该提供一个选项来禁用记录的完全排序。这只允许在用户不通过ChangeConsumer提供记录处理的情况下使用。当用户不通过ChangeConsumer提供记录处理并且启用了记录的总排序这将是默认情况用于处理CDC记录的流水线将在单独的线程中运行但实现必须确保记录的总排序将被保留。
自定义记录处理器
当用户希望对记录进行更复杂的处理时仅提供简单的事件消费者是不够的用户必须实现ChangeConsumer。然而正如前面提到的这种方法的一个缺点是每个记录在处理后必须反序列化回来才能提交。一个可能的解决方案是完全将记录处理的控制权交给用户包括应用转换链和序列化记录。当然这意味着向用户实现公开一些DebeziumEngine内部对象-即转换链、序列化器和用于可能的并行处理的执行器服务。ChangeConsumer的这种泛化可以如下所示 /*** Generalization of {link DebeziumEngine.ChangeConsumer}, giving complete control over the records processing.* Processor is initialized with all the required engine internals, like chain of transformations, to be able to implement whole record processing chain.* Implementations can provide e.g. serial or parallel processing of the change records.*/Incubatingpublic interface RecordProcessorR {/*** Initialize the processor with object created and managed by {link DebeziumEngine}, which are needed for records processing.** param recordService {link ExecutorService} which allows to run processing of individual records in parallel* param transformations chain of transformations to be applied on every individual record* param serializer converter converting {link SourceRecord} into desired format* param committer implementation of {link DebeziumEngine.RecordCommitter} responsible for committing individual records as well as batches*/void initialize(final ExecutorService recordService, final Transformations transformations, final FunctionSourceRecord, R serializer,final RecordCommitter committer);/*** Processes a batch of records provided by the source connector.* Implementations are assumed to use {link DebeziumEngine.RecordCommitter} to appropriately commit individual records and the batch itself.** param records List of {link SourceRecord} provided by the source connector to be processed.* throws InterruptedException*/void processRecords(final ListSourceRecord records) throws InterruptedException;}在当前的实现中该接口仅在内部使用但如果社区将来需要完全控制记录处理则可以稍后通过 SPI 公开实现。
并行处理记录的选项
总结起来以下是提供给用户的并行处理选项
对于每条记录并行运行转换链等待整个批处理转换完成然后将批处理传递给用户提供的ChangeConsumer。如果用户将ChangeConsumer提供给Builder并且引擎没有提供转换器则选择此选项。并行运行转换链并对每条记录进行序列化等待整个批处理转换完成然后将批处理传递给用户提供的ChangeConsumer。如果用户将ChangeConsumer提供给Builder并且引擎提供了转换器则选择此选项。并行运行记录的转换链。等待结果并按照原始批处理中的顺序一个接一个地将转换后的批处理应用于用户提供的Consumer。如果用户将Consumer提供给Builder并且引擎没有提供转换器则选择此选项。并行运行记录的转换链和序列化。等待结果并按照原始批处理中的顺序一个接一个地将转换后的批处理应用于用户提供的Consumer。如果用户将Consumer提供给Builder并且引擎提供了转换器则选择此选项。并行运行记录的转换链并由用户提供的Consumer进行消费。如果用户将Consumer提供给Builder引擎没有提供转换器并且将选项CONSUME_RECORDS_ASYNC设置为true则选择此选项。并行运行记录的转换链、序列化和由用户提供的Consumer进行消费。如果用户将Consumer提供给Builder引擎提供了转换器并且将选项CONSUME_RECORDS_ASYNC设置为true则选择此选项。
存储偏移量
与任务的并行执行不同在记录的并行处理中提交正确的偏移量非常重要以避免错过任何事件。假设我们有一个事件链由源连接器以记录R1-R2-R3的形式实现我们并行处理它们。如果调度程序先选择处理R2和R3的线程并且处理R1的线程需要等待那么可能会发生例如R3的处理作为第一个完成R3的偏移量被提交的情况。如果此时关闭引擎则下次启动引擎将从R3开始并且R1以及可能的R2将被引擎忽略。这将破坏Debezium提供的至少一次保证。因此我们需要始终只提交其所有前置记录已经处理并已提交的记录的偏移量。可能的记录处理流程在上一段中列出。前两个选项很简单-事件的处理委托给了用户提供的ChangeConsumer因此记录的提交也由它处理正确的提交顺序不是引擎的责任。当用户提供的Consumer按顺序运行时下两个选项也很简单- Consumer按顺序在转换后的批处理上运行并且记录在按顺序由Consumer消耗时逐个提交。剩下的两个选项并行运行整个链。在这些情况下记录提交不能成为链的一部分否则我们可能会像上面描述的那样丢失记录。引擎必须等待第一个事件的处理管道被执行然后提交记录。然后它必须等待第二个记录被处理等等直到整个事件批处理被处理。这将确保至少一次交付。另一方面这可能会增加引擎重新启动后重复记录的数量但是如果需要异步记录处理则用户必须接受这个缺点。
引擎状态和生命周期
引擎的状态将由AtomicReferenceState state变量描述。状态枚举将包含以下元素
CREATING-正在启动引擎这主要意味着引擎对象正在创建或已经创建但尚未调用run()方法。INITIALIZING-在run()方法的开头切换到此状态初始化连接器时处于此状态并在启动连接器本身和调用DebeziumEngine.ConnectorCallback.connectorStarted()回调时处于此状态。CREATING_TASKS-成功启动连接器后切换到此状态正在创建和初始化任务的配置。STARTING_TASKS-任务正在启动每个任务在单独的线程中执行保持在此阶段直到任务启动、任务启动失败或 TASK_MANAGEMENT_TIMEOUT_MS选项指定的时间已过。POLLING_TASKS-任务轮询已经开始这是生成数据的主要阶段引擎会在此阶段保持直到开始关闭或抛出异常。STOPPING-引擎正在停止因为调用了引擎的close()方法或抛出了异常在此阶段存储偏移量停止处理记录的ExecutorService、任务和连接器。STOPPED-引擎已经停止最终状态不能从此状态进一步移动任何在此状态下对引擎对象的调用都应该失败。
可能的状态转换
CREATING - INITIALIZINGINITIALIZING - CREATING_TASKSCREATING_TASKS - STARTING_TASKSSTARTING_TASKS - POLLING_TASKS(CREATING | INITIALIZING | CREATING_TASKS | STARTING_TASKS | POLLING_TASKS) - STOPPINGSTOPPING - STOPPED
防止资源泄漏
需要特别注意的引擎阶段是任务启动阶段。在此阶段正在创建数据库连接如果发生意外情况或在启动任务时关闭引擎可能会导致各种资源泄漏例如未关闭的复制槽。为了防止这种情况发生不可以通过调用引擎的close()方法将其从STARTING_TASKS状态转换为STOPPING状态。此外STARTING_TASKS必须完全完成。即使其中一个线程无法启动其运行的任务主线程引擎线程也必须等待所有其他任务完成无论成功与否后才能进入STOPPING状态。通常情况下STARTING_TASKS - STOPPING的转换是可能的但只能在从启动任务的方法中抛出异常的情况下并且只能在所有启动任务的线程都完成之前发生。
异常处理
可重试的异常会在其发生的位置进行处理相关操作将重试直到 ERRORS_MAX_RETRIES 尝试用尽。与现有的 EmebeddedEngine 实现相反在此时任务不会重新启动TODO重新思考为什么当前实现中任务会重新启动。之后异常会向上传播到堆栈。任何未被捕获以进行重试的异常都会进一步传播。所有异常都应该在引擎 run() 方法的 catch 块中处理。一旦遇到任何异常引擎应该进入 STOPPING 状态并开始关闭引擎。
退出任务轮询循环
任务在以下情况下退出轮询循环
将引擎状态更改为除了 POLLING_TASKS 之外的任何其他状态唯一可能性是更改为 STOPPING 状态从任务的 poll() 方法中抛出异常或在处理记录的批处理过程中抛出异常通过关闭正在处理记录的 ExecutorService 间接关闭引擎时如果提交另一条记录进行处理将会抛出异常然而这不应该发生因为线程应该事先注意到引擎状态已经改变 - 在处理下一批之前
一旦当前批次被处理退出任务轮询循环应该在合理的快速时间内发生。当调用引擎的关闭方法时正在运行记录处理的 ExecutorService 会被优雅地关闭。这意味着当前正在处理的记录将等待处理完成但不会接受任何其他新的记录进行处理即使它们已经被调度。主线程最多等待 POLLING_SHUTDOWN_TIMEOUT_MS 毫秒来等待 ExecutorService 关闭即处理提交的记录然后 ExecutorService 将被立即关闭。因此可以通过将 POLLING_SHUTDOWN_TIMEOUT_MS 设置为零来实现立即关闭而不等待记录被处理。为立即关闭添加专用方法需要添加一个新的公共方法该方法不是 DebeziumEngine API 的一部分目前似乎没有这样的需求因为可以将 POLLING_SHUTDOWN_TIMEOUT_MS 设置为合理的较小值。如果有用户需求可以在将来添加该方法。
引擎关闭
在引擎关闭期间如果引擎至少达到 STARTING_TASKS 状态所有任务都应停止。在调用任务关闭之前会调用并等待用于处理 CDC 记录的 ExecutorService 的关闭。每个任务在调用其关闭之前还应提交一个偏移量并且引擎会等待任务停止。用户可以设置 TASK_MANAGEMENT_TIMEOUT_MS 选项也用于任务启动来调整等待任务关闭的时间。一旦所有任务关闭完成连接器将停止。无论先前的引擎状态如何连接器都应该停止。引擎达到 STOPPED 状态不应再有其他操作。如果用户想要重新启动引擎必须重新创建引擎对象。
辅助接口和对象
为了减少需要传递给各种引擎方法和对象的参数数量创建几个辅助对象是方便的即连接器和任务上下文。上下文应持有对长期存在的对象的引用通常是在创建引擎、连接器或任务时创建的例如OffsetStorageReader或OffsetStorageWriter。辅助的DebeziumSourceConnector和DebeziumSourceTask将持有这些上下文。由于我们长期希望将Debezium引擎与Kafka Connect API解耦这可能是朝着这个方向迈出的第一步这些对象可以作为Kafka Connect对象的替代品。正如在开始时提到的我们不能直接用我们的实现替换这些对象。因此这些对象还将分别包含对Kafka Connect连接器和任务对象的引用并在需要时提供Connect对象。这些接口是高度实验性的可能会在未来进行更改或完全删除。主要目的是探索逐步从Kafka Connect对象中移除的方向是否可行。建议的辅助接口如下
Incubating
public interface DebeziumSourceConnector {/*** Returns the {link DebeziumSourceConnectorContext} for this DebeziumSourceConnector.* return the DebeziumSourceConnectorContext for this connector*/DebeziumSourceConnectorContext context();/*** Initialize the connector with its {link DebeziumSourceConnectorContext} context.* param context {link DebeziumSourceConnectorContext} containing references to auxiliary objects.*/void initialize(DebeziumSourceConnectorContext context);
}Incubating
public interface DebeziumSourceConnectorContext {/*** Returns the {link OffsetStorageReader} for this DebeziumConnectorContext.* return the OffsetStorageReader for this connector.*/OffsetStorageReader offsetStorageReader();/*** Returns the {link OffsetStorageWriter} for this DebeziumConnectorContext.* return the OffsetStorageWriter for this connector.*/OffsetStorageWriter offsetStorageWriter();
}Incubating
public interface DebeziumSourceTask {/*** Returns the {link DebeziumSourceTaskContext} for this DebeziumSourceTask.* return the DebeziumSourceTaskContext for this task*/DebeziumSourceTaskContext context();
}Incubating
public interface DebeziumSourceTaskContext {/*** Gets the configuration with which the task has been started.*/MapString, String config();/*** Gets the {link OffsetStorageReader} for this SourceTask.*/OffsetStorageReader offsetStorageReader();/*** Gets the {link OffsetStorageWriter} for this SourceTask.*/OffsetStorageWriter offsetStorageWriter();/*** Gets the {link OffsetCommitPolicy} for this task.*/OffsetCommitPolicy offsetCommitPolicy();/*** Gets the {link Clock} which should be used with {link OffsetCommitPolicy} for this task.*/Clock clock();/*** Gets the transformations which the task should apply to source events before passing them to the consumer.*/Transformations transformations();
}其他未来可靠的更改
切换到虚拟线程应该很简单只需要切换到适当的ExecutorService即可例如使用Executors.newVirtualThreadPerTaskExecutor()而不是Executors.newFixedThreadPool()。切换到结构化并发应该几乎和切换到虚拟线程一样容易。将SourceTasks分成自包含的任务并在不同的线程中并行运行它们应该为通过gRPC在不同的机器上执行它们提供坚实的基础。主要问题是如何交换在远程机器上运行任务所需的对象而所需的对象可能很复杂甚至事先不知道例如用户提供的ChangeConsumer因此可能很难甚至不可能为其提供protobuf表示。版本3中的Protobuf提供了对映射的支持可以轻松地将所有配置选项传递给远程机器。因此目前最简单的方法似乎是在此引擎中提供几种可以运行的模式之一其中之一是“task-executor”它将推迟初始化和启动并将通过gRPC完成一旦它从引擎获取任务配置。在这种情况下引擎只充当运行任务的其他节点的协调器。这将需要较小的重构主要是引擎的run()方法的实现但考虑到实施这一点可能需要单独的领域驱动设计目前似乎是可以接受的。概念验证应该作为实施的一部分或作为后续任务进行。到目前为止对Debezium operator或UI没有特别的要求。然而将功能分离成细粒度的函数应该可以使将任何引擎功能暴露给外部服务变得平滑和容易。根据Quarkus扩展指南即使在当前实现中EmbeddedEngine应该适用于Quarkus扩展。新的实现应该也能够与Quarkus无缝集成。类似于gRPC概念验证应该作为后续任务进行。
测试
测试套件将改为仅使用DebeziumEngine API。大多数使用DebeziumEngine的测试都继承自AbstractConnectorTest其中创建了一个DebeziumEngine实例。AbstractConnectorTest将包含一个受保护的方法负责创建DebeziumEngine。当切换到新的DebeziumEngine实现时只需要调整这个单一方法即可完成切换。这也允许在需要的情况下在特定的测试中更改引擎实现。将来如果有这种需要测试套件可以通过将DebeziumEngine实现作为参数进行参数化。这将允许我们对多个引擎实现运行测试套件。然而在不久的将来我们不期望会有这种需要所以目前在AbstractConnectorTest中的一个专用方法应该足够了。由于现有的测试套件基于EmbeddedEngine实现该实现提供比DebeziumEngine接口更丰富的API在整个测试套件中无法完全使用DebeziumEngine API。在大多数情况下可以使用DebeziumEngine API或者引入一些辅助方法除了一种情况 - EmbeddedEngine#runWithTask()。它公开了用于测试的引擎Kafka Connect SourceTask。如果我们去掉这个方法将会丢失一些重要的测试。为了保留测试引擎源任务的能力同时不破坏DebeziumEngine的封装将引入一个新的接口TestingDebeziumEngine。它将属于debezium-embedded模块的测试包。接口应该只包含一个方法runWithTask(ConsumerSourceTask consumer)
public interface TestingDebeziumEngineT extends DebeziumEngineT {/*** Run consumer function with engine task, e.g. in case of Kafka with {link SourceTask}.* Effectively expose engine internal task for testing.*/void runWithTask(ConsumerSourceTask consumer);
}将来如果从测试的角度来看公开任何其他DebeziumEngine内部或添加方便的测试方法是有益的我们可能会向此接口添加更多方法。然而这类方法的数量应该尽可能少并且只有在非常充分的理由下才应该添加方法因为如果实现需要使用Debezium测试套件进行测试接口将强制DebeziumEngine实现也实现所有这些方法。由于此项工作的主要驱动因素是性能因此此项工作的重要部分将是为DebeziumEngine开发性能测试。应该有两种类型的测试 - JMH基准测试和更健壮的端到端性能测试。 JMH基准测试可以模仿debezium-microbenchmark-oracle的JMH测试可能使用SimpleSourceConnector或其某些修改。端到端性能测试应该包括从至少一个数据库可能是PostgreSQL流式传输数据并将数据流式传输到“/dev/null”消费者以最小化接收方消费者的影响。数据可能是由在Debezium-performance下开发的工具生成的。