wap网站快速开发,网站建设自己在家接单,wordpress 导入文章,广告公司名称大全最新作者#xff1a;来自 vivo 互联网服务器团队- Pang Haiyun 介绍 Kafka Streams 的原理架构#xff0c;常见配置以及在监控场景的应用。
一、背景
在当今大数据时代#xff0c;实时数据处理变得越来越重要#xff0c;而监控数据的实时性和可靠性是监控能力建设最重要的一环… 作者来自 vivo 互联网服务器团队- Pang Haiyun 介绍 Kafka Streams 的原理架构常见配置以及在监控场景的应用。
一、背景
在当今大数据时代实时数据处理变得越来越重要而监控数据的实时性和可靠性是监控能力建设最重要的一环。随着监控业务需求的变化和技术的发展需要能够实时处理和分析庞大的数据流。作为一种流式处理平台Kafka Streams 为处理实时数据提供了强大的支持。本文将重点介绍如何利用 Kafka Streams 进行实时数据处理包括其基本原理、功能和实际应用。通过本文的学习读者将能够深入了解 Kafka Streams 的优势、在监控场景的应用及实践。
二、Kafka Streams 的基本概念
Kafka Streams 是一个开源的流式处理框架基于 Kafka 消息队列构建能够处理无限量的数据流。与传统的批处理不同Kafka Streams 允许用户以流式处理的方式实时处理数据而且处理延迟仅为毫秒级。
通过 Kafka Streams 用户可以进行数据的实时转换、聚合、过滤等操作同时能够与 Kafka Connect 和 Kafka Producer/Consumer 无缝集成。Kafka Streams 也是一个客户端程序库用于处理和分析存储在 Kafka 中的数据并将得到的数据写回 Kafka 或发送到外部系统。
Kafka、Storm、Flink 和 Spark 是大数据领域常用的工具和框架。
1、区别 Kafka 是一个分布式消息系统主要用于构建实时数据管道和事件驱动的应用程序。它提供了高吞吐量、持久性、可伸缩性和容错性主要用于数据的发布和订阅。 Storm 是一个分布式实时计算系统用于处理实时数据流。它提供了低延迟、高吞吐量的实时计算能力适用于实时数据处理和流式计算。 Flink 是一个流处理引擎提供了精确一次的状态处理和事件时间处理等特性。它支持流处理和批处理并提供了统一的 API 和运行时环境。 Spark 是一个通用的大数据处理框架提供了批处理和流处理的功能。Spark 提供了丰富的数据处理和计算功能包括 SQL 查询、机器学习、图处理等。
2、Kafka 的优势 持久性和可靠性Kafka 提供了数据持久化的功能能够确保数据不丢失并且支持数据的持久存储和重放。 可伸缩性Kafka 集群可以很容易地进行水平扩展支持大规模数据处理和高并发访问。 灵活性Kafka 可以与各种不同的数据处理框架集成作为数据源或数据目的地使其在实时数据处理的场景中具有广泛的适用性。
总的来说Kafka 的优势在于其高吞吐量、持久性和可靠性以及灵活的集成能力使其成为构建实时数据管道和事件驱动应用程序的理想选择。
2.1 Stream 处理拓扑
2.1.1 流
流是 Kafka Streams 提出的最重要的抽象概念它表示一个无限的不断更新的数据集。流是一个有序的可重放反复的使用不可变的容错序列数据记录的格式是键值对key-value。这里的 key 主要记录的是 value 的索引决定了 Kafka 和 Kafka Streams 中数据的分区即数据如何路由到 Topic 的特定分区。value 是主要后续处理器要处理的数据。 2.1.2 处理器拓扑
处理器拓扑是一个由流边缘连接的流处理节点的图。通过 Kafka Streams 我们可以编写一个或多个的计算逻辑的处理器拓扑用于对数据进行多步骤的处理。
2.1.3 流处理器
流处理器是处理器拓扑中的一个节点它表示一个处理的步骤用来转换流中的数据从拓扑中的上游处理器一次接受一个输入消息并且随后产生一个或多个输出消息到其下游处理器中。
在拓扑中有两个特别的处理器 源处理器Source Processor源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个 Kafka 主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。 sink 处理器Sink Processorsink 处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的 Kafka 主题。 (图片来源: Kafka 官网)
Kafka Streams 提供2种方式来定义流处理器拓扑Kafka Streams DSL 提供了更常用的数据转换操作如 map 和 filter低级别 Processor API 允许开发者定义和连接自定义的处理器以及和状态仓库交互。处理器拓扑仅仅是流处理代码的逻辑抽象。
2.2 时间
在流处理方面有一些重要的时间概念它们是建模和集成一些操作的重要元素例如定义窗口的时间界限。
时间在流中的常见概念如下 事件时间 - 当一个事件或数据记录发生的时间点就是最初创建的“源头”。 处理时间 - 事件或数据消息发生在流处理应用程序处理的时间点。即记录已被消费。处理时间可能是毫秒小时或天等。比原始事件时间要晚。 摄取时间 - 事件或数据记录是 Kafka broker 存储在 topic 分区的时间点。与事件时间的差异是当记录由 Kafka broker 追加到目标 topic 时生成的摄取时间戳而不是消息创建时间“源头”。与处理时间的差异是处理时间是流处理应用处理记录时的时间。比如如果一个记录从未被处理那么就没有处理时间但仍然有摄取时间。
Kafka Streams 通过 TimestampExtractor 接口为每个数据记录分配一个时间戳。该接口的具体实现了基于数据记录的实际内容检索或计算获得时间戳例如嵌入时间戳字段提供的事件时间语义或使用其他的方法比如在处理时返回当前的 wall-clock墙钟时间从而产生了流应用程序的处理时间语义。因此开发者可以根据自己的业务需要选择执行不同的时间。例如每条记录时间戳描述了流的时间增长尽管记录在 stream 中是无序的并利用时间依赖性来操作如 join。
最后当一个 Kafka Streams 应用程序写入记录到 Kafka 时它将分配时间戳到新的消息。时间戳分配的方式取决于上下文 当通过处理一些输入记录例如在 process()函数调用中触发的 context.forward()生成新的输出记录时输出记录时间戳直接从输入记录时间戳继承。 当通过周期性函数如 punctuate()生成新的输出记录时。输出记录时间戳被定义为流任务的当前内部时间通过 context.timestamp() 获取。 对于聚合生成的聚合更新的记录时间戳将被最新到达的输入记录触发更新。
本部分简要介绍了 Kafka Streams 的基本概念下一部分将介绍 Kafka Streams 的在监控场景的应用实践。
三、Kafka Streams 在监控场景的应用
3.1 链路分布示意图 3.2 示例使用 Kafka Streams 来处理实时数据
流式处理引擎如 Kafka Streams与监控数据 ETL 可以为业务运维带来诸多好处例如实时数据分析、实时监控、事件驱动的架构等。在本部分我们将重点介绍 Kafka Streams 与监控数据 ETL 的集成以及如何在监控数据 ETL 中利用 Kafka Streams 进行实时数据处理。
在监控数据ETL架构中Kafka Streams 扮演着举足轻重的角色。它可以作为一个独立的数据处理服务来处理实时的数据流并将处理结果输出到其他存储组件例如ES、VM等中。同时它也可以作为多个数据源之间的数据交换和通信的桥梁扮演着数据总线的角色。Kafka Streams 的高可用性、高吞吐量和流式处理能力使得它成为监控数据ETL架构中的重要组件之一。
下面给出一个示例演示了如何将 Kafka Streams 作为监控数据 ETL 来处理实时的数据。假设我们有一个监控数据流 TopicA我们希望对这些数据进行实时的分析并将分析结果输出到另一个 TopicB。我们可以创建一个 Kafka Streams 来处理这个需求
//创建配置类
Properties props new Properties();
//设置订阅者
props.put(StreamsConfig.APPLICATION_ID_CONFIG, stream-processing-service);
//设置servers地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka-broker1:9092);StreamsBuilder builder new StreamsBuilder();
//构建流
KStreamString, String userActions builder.stream(TopicA);
//对流进行处理
KTableString, Long userClickCounts userActions.filter((key, value) - value.contains(click)).groupBy((key, value) - value.split(:)[0]).count();
//流写回Kafka
userClickCounts.toStream().to(TopicB, Produced.with(Serdes.String(), Serdes.Long()));KafkaStreams streams new KafkaStreams(builder.build(), props);streams.start();
在这个示例中我们创建了一个 Kafka Streams 监控数据 ETL用于处理实时的监控数据流。它对数据进行了过滤、分组和统计分析并将结果输出到 TopicB。通过这个 ETL我们可以很容易地实现实时的数据处理功能并且能够与其他数据源和数据存储组件进行无缝的集成。
3.3 监控 ETL 的流处理示意图 本部分介绍了 Kafka Streams 的在监控场景的应用实践下一部分将深入探讨 Kafka Streams 的运作原理及实时数据处理的常见操作并阐述 Kafka Streams 如何实现这些操作。
四、监控数据 ETL 中 Kafka Streams 的运作原理
4.1 架构
Kafka Streams 通过生产者和消费者并利用 Kafka 自有的能力来提供数据平行性分布式协调性故障容错和操作简单性从而简化了应用程序的开发在本节中我们将描述 Kafka Streams 是如何工作的。
下图展示了 Kafka Streams 应用程序的解剖图让我们来看一下。 (图片来源: Kafka 官网)
Kafka 消费者通过消费1个或多个 Topic 拿到数据形成输入 Kafka 流经过处理器拓扑对数据进行统一处理形成输出 Kafka 流将数据写入1个或多个出流 Topic这是 kafka 流整体的运行流程。
4.1.1 Stream 分区和任务
Kafka 分区数据的消息层用于存储和传输Kafka Streams 分区数据用于处理 在这两种情况下这种分区规划和设计使数据具有弹性可扩展高性能和高容错的能力。Kafka Streams 使用了分区和任务的概念基于 Kafka 主题分区的并行性模型。在并发环境里Kafka Streams 和 Kafka 之间有着紧密的联系 每个流分区是完全有序的数据记录队列并映射到 Kafka 主题的分区。 流的数据消息与主题的消息映射。 数据记录中的 keys 决定了 Kafka 和 Kafka Streams 中数据的分区即如何将数据路由到指定的分区。
应用程序的处理器拓扑通过将其分成多个任务来进行扩展更具体点说Kafka Streams 根据输入流分区创建固定数量的任务其中每个任务分配一个输入流的分区列表即Kafka 主题。分区对任务的分配不会改变因此每个任务是应用程序并行性的固定单位。然后任务可以基于分配的分区实现自己的处理器拓扑他们还可以为每个分配的分区维护一个缓冲并从这些记录缓冲一次一个地处理消息。作为结果流任务可以独立和并行的处理而无需手动干预。
重要的是要理解 Kafka Streams 不是资源管理器而是可在任何地方都能“运行”的流处理应用程序库。多个实例的应用程序在同一台机器上执行或分布多个机器上并且任务可以通过该库自动的分发到这些运行的实例上。分区对任务的分配永远不会改变如果一个应用程式实例失败则这些被分配的任务将自动地在其他的实例重新创建并从相同的流分区继续消费。
下面展示了2个分区每个任务分配了输出流的1个分区。 (图片来源: Kafka 官网)
4.1.2 线程模型
Kafka Streams 允许用户配置线程数可用于平衡处理应用程序的实例。每个线程的处理器拓扑独立的执行一个或多个任务。例如下面展示了一个流线程运行2个流任务。 (图片来源: Kafka 官网)
启动更多的流线程或更多应用程序实例只需复制拓扑逻辑即复制代码到不同的机器上运行达到并行处理处理不同的 Kafka 分区子集的目的。要注意的是这些线程之间不共享状态。因此无需协调内部的线程。这使它非常简单在应用实例和线程之间并行拓扑。Kafka 主题分区的分配是通过 Kafka Streams 利用 Kafka 的协调功能在多个流线程之间透明处理。
如上所述Kafka Streams 扩展流处理应用程序是很容易的你只需要运行你的应用程序实例Kafka Streams 负责在实例中运行的任务之间分配分区。你可以启动多个应用程序线程处理多个输入的 Kafka 主题分区。这样所有运行中的应用实例每个线程即运行的任务至少有一个输入分区可以处理。
4.1.3 故障容错
Kafka Streams 基于 Kafka 分区的高可用和副本故障容错能力。因此当流数据持久到 Kafka即使应用程序故障如果需要重新处理它它也是可用的。Kafka Streams 中的任务利用 Kafka 消费者客户端提供的故障容错的能力来处理故障。如果任务故障Kafka Streams 将自动的在剩余运行中的应用实例重新启动该任务。
此外Kafka Streams 还确保了本地状态仓库对故障的稳定性。对于每个状态仓库都维持一个追踪所有的状态更新的变更日志主题。这些变更日志主题也分区因此每个本地状态存储实例在任务访问仓里都有自己的专用的变更日志分区。变更主题日志也启用了日志压缩以便可以安全的清除旧数据以防止主题无限制的增长。如果任务失败并在其他的机器上重新运行则 Kafka Streams 在恢复新启动的任务进行处理之前重放相应的变更日志主题保障在故障之前将其关联的状态存储恢复。故障处理对于终端用户是完全透明的。
请注意任务重新初始化的成本通常主要取决于通过重放状态仓库变更日志主题来恢复状态的时间。为了减少恢复时间用户可以配置他们的应用程序增加本地状态的备用副本即完全的复制状态。当一个任务迁移发生时Kafka Streams 尝试去分配任务给应用实例提前配置了备用副本的应用实例就可以减少任务重新初始化的成本。
4.2 创建流
记录流KStreams或变更日志流KTable或GlobalkTable可以从一个或多个 Kafka 主题创建源流而 KTable 和 GlobalKTable只能从单个主题创建源流。
KStreamBuilder builder new KStreamBuilder();KStreamString, GenericRecord source1 builder.stream(topic1, topic2);
KTableString, GenericRecord source2 builder.table(topic3, stateStoreName);
GlobalKTableString, GenericRecord source2 builder.globalTable(topic4, globalStoreName);
4.3 流回写 Kafka
在处理结束后开发者可以通过 KStream.to 和 KTable.to 将最终的结果流连续不断的写回 Kafka 主题。
joined.to(topic4);
如果已经通过上面的to方法写入到一个主题中但是如果你还需要继续读取和处理这些消息可以从输出主题构建一个新流Kafka Streams 提供了便利的方法through:
// equivalent to
//
// joined.to(topic4);
// materialized builder.stream(topic4);
KStream materialized joined.through(topic4);
4.4 流程序的配置与启执行
除了定义的 topology开发者还需要在运行它之前在 StreamsConfig 配置他们的应用程序Kafka Streams 配置的完整列表可以在这里找到。
Kafka Streams 中指定配置和生产者、消费者客户端类似通常你创建一个 java.util.Properties设置必要的参数并通过 Properties 实例构建一个 StreamsConfig 实例。
import java.util.Properties;
import org.apache.kafka.streams.StreamsConfig;
Properties settings new Properties();
// Set a few key parameters
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, my-first-streams-application);
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka-broker1:9092);
settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper1:2181);// Any further settings
settings.put(... , ...);// Create an instance of StreamsConfig from the Properties instance
StreamsConfig config new StreamsConfig(settings);
除了 Kafka Streams 自己配置参数你也可以为 Kafka 内部的消费者和生产者指定参数。根据你应用的需要。类似于 Streams 设置你可以通过 StreamsConfig 设置任何消费者和/或生产者配置。请注意一些消费者和生产者配置参数使用相同的参数名。例如用于配置 TCP 缓冲的 send.buffer.bytes 或 receive.buffer.bytes。用于控制客户端请求重试的 request.timeout.ms 和 retry.backoff.ms。如果需要为消费者和生产者设置不同的值可以使用 consumer. 或 producer. 作为参数名称的前缀。
Properties settings new Properties();// Example of a normal setting for Kafka Streams
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka-broker-01:9092);
// Customize the Kafka consumer settings
streamsSettings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
// Customize a common client setting for both consumer and producer
settings.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, 100L);
// Customize different values for consumer and producer
settings.put(consumer. ConsumerConfig.RECEIVE_BUFFER_CONFIG, 1024 * 1024);
settings.put(producer. ProducerConfig.RECEIVE_BUFFER_CONFIG, 64 * 1024);// Alternatively, you can use
settings.put(StreamsConfig.consumerPrefix(ConsumerConfig.RECEIVE_BUFFER_CONFIG), 1024 * 1024);
settings.put(StremasConfig.producerConfig(ProducerConfig.RECEIVE_BUFFER_CONFIG), 64 * 1024);
你可以在应用程序代码中的任何地方使用 Kafka Streams 常见的是在应用程序的 main() 方法中使用。
首先先创建一个 KafkaStreams 实例其中构造函数的第一个参数用于定义一个 topology builderStreams DSL的KStreamBuilder或 Processor API 的 TopologyBuilder。
第二个参数是上面提到的 StreamsConfig 的实例。
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TopologyBuilder;
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
KStreamBuilder builder ...; // when using the Kafka Streams DSL
//
// OR
//
TopologyBuilder builder ...; // when using the Processor API
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config ...;
KafkaStreams streams new KafkaStreams(builder, config);
在这点上内部结果已经初始化但是处理还没有开始。你必须通过调用 start() 方法启动 Kafka Streams 线程
// Start the Kafka Streams instance
streams.start();
捕获任何意外的异常设置 java.lang.Thread.UncaughtExceptionHandler。每当流线程由于意外终止时将调用此处理程序。
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {public uncaughtException(Thread t, throwable e) {// here you should examine the exception and perform an appropriate action!}
);
close() 方法结束程序。
// Stop the Kafka Streams instance
streams.close();
现在运行你的应用程序像其他的 Java 应用程序一样Kafka Sterams 没有任何特殊的要求。同样你也可以打包成 jar通过以下方式运行
# Start the application in class com.example.MyStreamsApp
# from the fat jar named path-to-app-fatjar.jar.
$ java -cp path-to-app-fatjar.jar com.example.MyStreamsApp
当应用程序实例开始运行时定义的处理器拓扑将被初始化成1个或多个流任务可以由实例内的流线程并行的执行。如果处理器拓扑定义了状态仓库则这些状态仓库在初始化流任务期间重新构建。这一点要理解当如上所诉的启动你的应用程序时实际上 Kafka Streams 认为你发布了一个实例。现实场景中更常见的是你的应用程序有多个实例并行运行如其他的 JVM 中或别的机器上。在这种情况下Kafka Streams 会将任务从现有的实例中分配给刚刚启动的新实例。
五、监控数据 ETL 中 Kafka Streams 参数及其调优
5.1 必配参数 bootstrap.servers这是 Kafka 集群的地址列表Kafka Streams 使用它来初始化与 Kafka 的连接。 key.deserializer 和 value.deserializer这些配置定义了流中键和值的序列化和反序列化器。 auto.offset.reset当没有初始偏移量或偏移量无效时这个配置定义了 Kafka Streams 如何处理。 group.id这对于使用 Kafka Streams 的消费者组来说很重要它定义了消费者组的ID。
5.2 基础参数 num.stream.threads定义 Kafka Streams 应用程序中的线程数默认与处理器的逻辑核心数相等。 state.dir定义 Kafka Streams 存储状态的本地目录。 threading.max.instances定义每个主题分区的最大线程实例数默认与分区数相等。 threading.instances定义每个主题分区的线程实例数默认与分区数相等。
5.3 消费者参数 enable.auto.commit自动提交偏移量默认值为true建议设置为false以便更好地控制偏移量的提交。 commit.interval.ms提交偏移量的频率默认值为5000ms可以根据需要进行调整。 max.poll.records一次拉取的消息数量默认值为1000可以根据网络带宽和处理能力进行调整。
5.4 生产者参数 batch.size批量发送消息的大小默认值通常是16384字节可以根据网络带宽和 Kafka 集群的性能进行调整。 linger.ms消息在生产者缓冲区中的最小停留时间默认值为100ms可以根据需要进行调整。 compression.type压缩类型可以提高网络带宽利用率但会增加 CPU 开销。默认值为none可以根据需要设置为gzip、“snappy或lz4”。
对于 Kafka 的调优参数可以根据实际的应用场景和性能需求进行调整以达到最佳的性能和稳定性。
六、监控数据 ETL 中 Kafka Streams 的分区倾斜问题原因和解决方式
6.1 原因
分区倾斜是监控数据 ETL 的 Kafka Streams 在处理大规模数据流时遇到的常见问题。分区倾斜指的是在一个流处理应用程序中某个分区的消息消费速度远远慢于其他分区或某个分区的延迟积压数据远大于其他分区导致 Kafka Streams 的实时性受到限制。
产生分区倾斜的原因可能包括 数据分布不均匀原始数据在 Kafka 主题的分区中分布不均匀导致某些分区的消息量远大于其他分区。 消费者实例数量不足在 Kafka Streams 应用程序中消费者的实例数量不足无法充分处理所有分区的消息。 消费者负载不均衡消费者的负载不均衡包括但不限于某些消费者实例处理的分区数大于其他实例导致某些消费者实例处理的消息量远大于其他实例。 消费者实例负载不均衡消费者实例性能不一致或性能被挤占导致消费能力不均衡消费速率异常小于平均消费速率
6.2 解决方案 数据均衡策略在设计 Kafka 主题分区分配策略时可以采用如轮询Round-robin或范围Range等均衡策略使得数据在各个分区之间均匀分布。 增加消费者实例根据应用程序的实际情况适当增加消费者的实例数量以提高整个系统的处理能力例如扩容。 负载均衡策略在消费者组内部实现负载均衡如使用均匀分配消费者Uniform Distribution Consumer等策略确保消费者实例之间的负载均衡例如重启或剔除倾斜分区实例使 Kafka Streams 的分区进行重新分配。 优化消费者处理逻辑分析消费者处理消息的速度慢的原因优化处理逻辑提高消费者的处理能力。 调整批次大小和窗口函数通过调整 Kafka Streams 的批次大小和窗口函数等参数降低消费者的处理压力。 使用侧输出对于一些处理速度较慢的分区可以考虑使用侧输出将部分消息引流至其他系统处理减轻消费者负载。
七、总结
本文介绍了 Kafka Streams 在监控场景中的应用阐述了 Kafka Streams 的基本概念包括流、处理器拓扑、流处理器、时间概念等举例说明了 Kafka Streams 在监控实时数据ETL中的具体应用并详细解释了 Kafka Streams 的运作原理包括其架构、创建流、流回写 Kafka、流程序配置与启执行等内容。文章还介绍了 Kafka Streams 的参数及其调优方法以及可能出现的分区倾斜问题及其解决方法。
本文意在让读者对于 Kafka 流在监控业务的实际应用有所认识并且了解 Kafka 流的基本概念和原理阅读本文后对构建自己 Kafka 流应用程序有所帮助能够理解在监控数据 ETL 常见分区倾斜的原理和解决方式。 引用Kafka 官网 https://kafka.apache.org/