当前位置: 首页 > news >正文

烟台网站建设团队做网站时搜索的代码是什么

烟台网站建设团队,做网站时搜索的代码是什么,长沙有哪些做网站的公司,墨星写作网站文章目录 01 KafkaSink 版本导言02 KafkaSink 基本概念03 KafkaSink 工作原理1.初始化连接2.定义序列化模式3.创建KafkaSink算子4.创建数据源5.将数据流添加到KafkaSink6.内部工作机制 04 KafkaSink参数配置05 KafkaSink 应用依赖06 KafkaSink 快速入门6.1 包结构6.2 项目… 文章目录 01 KafkaSink 版本导言02 KafkaSink 基本概念03 KafkaSink 工作原理1.初始化连接2.定义序列化模式3.创建KafkaSink算子4.创建数据源5.将数据流添加到KafkaSink6.内部工作机制 04 KafkaSink参数配置05 KafkaSink 应用依赖06 KafkaSink 快速入门6.1 包结构6.2 项目配置6.3 pom文件6.4 Flink集成KafkaSink作业6.5 验证 07 总结 01 KafkaSink 版本导言 Flink版本 本文主要是基于Flink1.14.4 版本 导言 Apache Flink 作为流式处理领域的先锋为实时数据处理提供了强大而灵活的解决方案。其中KafkaSink 是 Flink 生态系统中的关键组件之一扮演着将 Flink 处理的数据可靠地发送到 Kafka 主题的角色。本文将深入探讨 KafkaSink 的工作原理、配置和最佳实践帮助读者全面掌握在 Flink 中使用 KafkaSink 的技巧和方法。 02 KafkaSink 基本概念 KafkaSink 是 Apache Flink 提供的用于将流式数据发送到 Kafka 的连接器。它允许 Flink 应用程序将经过处理的数据以高效和可靠的方式传输到 Kafka 主题从而实现流处理与消息队列的无缝集成。 特性和优势 Exactly-Once 语义 KafkaSink 提供 Exactly-Once 语义确保数据不会丢失也不会重复写入 Kafka 主题。这是通过 Flink 提供的端到端一致性保障的一部分。高性能 KafkaSink 被设计为高性能的组件能够处理大规模的数据流并以低延迟将数据发送到 Kafka。其底层使用 Kafka 生产者 API充分利用 Kafka 的并发性和批量处理能力。配置灵活 用户可以通过配置参数定制 KafkaSink 的行为包括 Kafka 服务器地址、主题名称、生产者配置等。这种灵活性使得 KafkaSink 可以适应不同场景和需求。Exactly-Once Sink Semantics KafkaSink 通过 Kafka 生产者的事务支持确保在发生故障时能够保持数据的一致性即使在 Flink 任务重新启动后也能继续从上次中断的地方进行。 03 KafkaSink 工作原理 KafkaSink是Apache Flink中用于将流式数据写入Apache Kafka的关键组件。其工作原理涉及几个主要步骤同时我将介绍一些源码片段以解释其内部实现。 1.初始化连接 用户需要配置Kafka连接属性包括Kafka服务器地址、序列化器等。在Flink中这通常通过创建Properties对象来完成。 // 创建KafksSink配置Properties properties new Properties();properties.setProperty(ProducerConfig.ACKS_CONFIG, 1);properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, 0);properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);2.定义序列化模式 KafkaRecordSerializationSchema 是 Apache Flink 中用于将数据流转换为 Kafka 记录record的序列化模式Serialization Schema。它允许将 Flink 数据流中的元素转换为 Kafka 生产者记录并定义了如何序列化元素的逻辑。 在 Flink 中当你想要将数据发送到 Kafka 主题需要一个序列化模式来将 Flink 数据流中的元素序列化为 Kafka 记录。而 KafkaRecordSerializationSchema 就是为此目的而设计的。 // 序列化模式 KafkaRecordSerializationSchemaString recordSerializer KafkaRecordSerializationSchema.builder()//设置对哪个主题进行序列化.setTopic(topic_a)//设置数据值序列化方式.setValueSerializationSchema(new SimpleStringSchema())//设置数据key序列化方式.setKeySerializationSchema(new SimpleStringSchema()).build();3.创建KafkaSink算子 使用Flink提供的KafkaSink类创建一个Kafka生产者实例。以下是简化的源码片段展示了如何创建实例 注意如果传递保证选择Exactly Once (精确一次)需要设置 客户端的超时时间 否则会报错 Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms)需要设置 transaction.timeout.ms 小于15分钟后续会专门出一篇关于这个传递保证的博客讲述。 // 创建KafkaSink算子 KafkaSinkString kafkaSink KafkaSink.Stringbuilder()//设置kafka各种参数.setKafkaProducerConfig(properties)//设置序列化模式.setRecordSerializer(recordSerializer)//设置传递保证//At Most Once (至多一次) 系统保证消息要么被成功传递一次要么根本不被传递。这种保证意味着消息可能会丢失但不会被传递多//At Least Once (至少一次) 系统保证消息至少会被传递一次但可能会导致消息的重复传递。这种保证确保了消息的不丢失但应用//Exactly Once (精确一次) 系统保证消息会被确切地传递一次而没有任何重复。这是最高级别的传递保证确保消息不会丢失且不会.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//设置集群地址.setBootstrapServers(127.0.0.1:9092)//设置事务前缀.setTransactionalIdPrefix(flink_).build();4.创建数据源 创建数据源每隔1000ms下发一笔数据 // 生成一个数据流 SourceFunctionString sourceFunction new SourceFunctionString() {Overridepublic void run(SourceContextString sourceContext) throws Exception {while (true) {String id UUID.randomUUID().toString();sourceContext.collect( id);logger.info(正在下发数据:{},id);Thread.sleep(1000);}}Overridepublic void cancel() {}// 创建数据源 DataStreamSourceString dataStreamSource env.addSource(sourceFunction).setParallelism(1);5.将数据流添加到KafkaSink 在Flink应用程序中通过addSink()方法将要写入Kafka主题数据流添加到KafkaSink以下是一个简化的示例 // 数据流数据通过KafkaSink算子写入kafka dataStreamSource.sinkTo(kafkaSink).setParallelism(1);// 执行任务 env.execute(KafkaSinkStreamJobDemo);6.内部工作机制 KafkaSink会将接收到的数据流分区为若干个并行的数据流每个并行数据流由一个Kafka生产者实例负责向Kafka主题写入数据。这样可以提高写入的吞吐量和并行度。 以下是源码中的一部分展示了KafkaSink是如何将数据发送到Kafka的 Override public void invoke(IN value, Context context) throws Exception {// 将数据发送到Kafka主题producer.send(new ProducerRecord(topic, value.toString())); }KafkaSink的源码相对复杂涉及到与Kafka的交互、并行处理、容错等方面的实现。 总的来说KafkaSink通过整合Flink和Kafka的功能提供了一种高效、可靠的方式将流式数据写入Kafka主题适用于各种实时数据处理场景。 04 KafkaSink参数配置 需要根据具体的安全需求和环境配置 Kafka 的安全性参数。建议查阅最新版本的 Kafka 文档以获取详细的安全配置指南https://kafka.apache.org/documentation/#producerconfigs 在 Apache Flink 中ProducerConfig 是用于配置 Kafka 生产者的类它是 Kafka 客户端库中的一部分。下面是一些常见的配置选项及其解释 bootstrap.servers 集群的地址列表用于初始化连接。生产者会从这些服务器中选择一个 broker 进行连接。 public static final String BOOTSTRAP_SERVERS_CONFIG bootstrap.servers;metadata.max.age.ms 元数据的最大缓存时间。在此时间内生产者将重复使用已经获取的元数据而不会向服务器发送新的元数据请求 public static final String METADATA_MAX_AGE_CONFIG metadata.max.age.ms;batch.size 控制批量发送到 Kafka 的消息大小。当消息积累到一定大小时生产者会将它们一起发送到 Kafka 以提高效率 public static final String BATCH_SIZE_CONFIG batch.size; acks 消息确认机制控制生产者收到确认的方式。可以是“all”所有副本都确认“1”至少一个副本确认或“0”不需要确认 public static final String ACKS_CONFIG acks; linger.ms 生产者在发送批量消息前等待的时间以使更多的消息聚合成一个批次。默认是0表示立即发送 public static final String LINGER_MS_CONFIG linger.ms; request.timeout.ms 发送请求到 Kafka 服务器的超时时间 public static final String REQUEST_TIMEOUT_MS_CONFIG request.timeout.ms; delivery.timeout.ms 这个参数在 Kafka 生产者的配置中是存在的它表示生产者在发送消息后等待生产者确认的最大时间。如果在这段时间内没有收到确认生产者将重试发送消息或者抛出异常具体取决于 retries 参数的配置 public static final String DELIVERY_TIMEOUT_MS_CONFIG delivery.timeout.ms; client.id 用于区分不同生产者实例的客户端 ID public static final String CLIENT_ID_CONFIG client.id; send.buffer.bytes Kafka 消费者用于网络 socket 发送数据的缓冲区大小 public static final String SEND_BUFFER_CONFIG send.buffer.bytes; receive.buffer.bytes Kafka 消费者用于网络 socket 接收数据的缓冲区大小 public static final String RECEIVE_BUFFER_CONFIG receive.buffer.bytes; max.request.size 单个请求发送的最大字节数 public static final String MAX_REQUEST_SIZE_CONFIG max.request.size; reconnect.backoff.ms 用于控制在与 Kafka 服务器连接断开后重新连接的时间间隔。具体来说它定义了在发起重新连接尝试之间等待的时间量以毫秒为单位。如果连接失败生产者将在此时间间隔之后尝试重新连接到 Kafka 服务器 public static final String RECONNECT_BACKOFF_MS_CONFIG reconnect.backoff.ms; reconnect.backoff.max.ms 用于控制重新连接的最大退避时间。具体来说它定义了在发起重新连接尝试之间等待的最长时间量以毫秒为单位。如果连接失败生产者将在此时间间隔之后尝试重新连接到 Kafka 服务器 public static final String RECONNECT_BACKOFF_MAX_MS_CONFIG reconnect.backoff.max.ms; max.block.ms 当 Kafka 队列已满时生产者将阻塞的最长时间毫秒超时后会抛出异常 public static final String MAX_BLOCK_MS_CONFIG max.block.ms; buffer.memory 生产者用于缓冲等待发送到服务器的消息的内存大小。默认是33554432字节32MB public static final String BUFFER_MEMORY_CONFIG buffer.memory; retries 生产者发送失败后的重试次数。默认是0表示不重试 public static final String RETRIES_CONFIG retries; key.serializer 用于序列化消息键的序列化器类。通常是指实现了Serializer接口的类的全限定名 public static final String KEY_SERIALIZER_CLASS_CONFIG key.serializer; value.serializer 用于序列化消息值的序列化器类 public static final String VALUE_SERIALIZER_CLASS_CONFIG value.serializer; connections.max.idle.ms 客户端与服务器保持空闲连接的最长时间毫秒。默认值为 540000即 9 分钟。例如900000 表示客户端与服务器保持空闲连接的最长时间为 15 分钟 public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG connections.max.idle.ms; partitioner.class 用于指定消息将被发送到哪个分区的算法即分区器的实现类。Kafka 中的主题topic通常被划分为多个分区每个分区都包含有序的消息序列。分区器决定了生产者发送的消息应该被分配到哪个分区中。 通过配置 partitioner.class用户可以自定义分区算法以满足特定的业务需求。Kafka 提供了默认的分区器也允许用户根据自己的逻辑实现自定义的分区器。 例如以下是配置 partitioner.class 的示例 partitioner.classcom.example.CustomPartitioner 在这个示例中com.example.CustomPartitioner 是用户自定义的分区器类的全限定名。该类必须实现 Kafka 提供的 org.apache.kafka.clients.producer.Partitioner 接口该接口定义了确定消息应该被发送到哪个分区的方法。 自定义分区器可以根据消息的内容、键如果有、以及其他上下文信息灵活地决定消息应该被发送到哪个分区。这样的自定义分区策略可以帮助实现一些特定的业务逻辑例如确保相关的消息被发送到相同的分区以提高消费的局部性。 在没有显式配置 partitioner.class 的情况下Kafka 使用默认的分区器该分区器根据消息的键如果有或者采用轮询的方式将消息平均分配到所有分区。 public static final String PARTITIONER_CLASS_CONFIG partitioner.class; interceptor.classes 用于指定一组拦截器类。拦截器类是实现 Kafka 接口 org.apache.kafka.clients.producer.ProducerInterceptor 或者 org.apache.kafka.clients.consumer.ConsumerInterceptor 的类用于在生产者或消费者发送或接收消息之前或之后对消息进行处理。 拦截器允许用户对消息进行自定义的预处理或后处理。这些操作可以包括但不限于 对消息进行加工、转换、过滤。在消息发送或接收之前或之后记录日志。对消息的时间戳或键进行修改。 通过配置 interceptor.classes 参数可以指定一组拦截器类并且它们将按顺序应用于每个消息。这样的拦截器链使得在消息处理过程中可以执行多个不同的操作。 例如以下是配置 interceptor.classes 的示例 interceptor.classescom.example.MyProducerInterceptor, com.example.MyConsumerInterceptor 在这个示例中com.example.MyProducerInterceptor 和 com.example.MyConsumerInterceptor 是用户定义的拦截器类的全限定名。这两个类必须分别实现 Kafka 提供的 org.apache.kafka.clients.producer.ProducerInterceptor 和 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口。 需要注意的是拦截器类的顺序很重要。拦截器将按照它们在 interceptor.classes 参数中声明的顺序依次应用于每个消息。如果需要确保拦截器按照特定的顺序应用可以通过配置参数来指定顺序。 拦截器提供了一种灵活的方式来实现特定的消息处理逻辑同时也允许用户对消息进行监控和记录。 public static final String INTERCEPTOR_CLASSES_CONFIG interceptor.classes; enable.idempotence public static final String ENABLE_IDEMPOTENCE_CONFIG enable.idempotence; transaction.timeout.ms public static final String TRANSACTION_TIMEOUT_CONFIG transaction.timeout.ms; transactional.id 用于启用生产者的幂等性。幂等性是指对于同一个生产者实例无论消息发送多少次最终只会产生一条副本实际上是一个幂等序列的性质。这可以防止由于网络错误、重试或者生产者重新启动等情况导致的重复消息。 启用生产者的幂等性可以通过设置 enable.idempotence 参数为 true 来实现。例如 enable.idempotencetrue 启用幂等性会自动设置一些与幂等性相关的配置例如 acks 配置将被设置为 “all”确保所有的 ISRIn-Sync Replicas都已经接收到消息。max.in.flight.requests.per.connection 将被设置为 1以确保在一个连接上只有一个未确认的请求。 幂等性对于确保消息传递的精确一次语义非常重要。在启用幂等性的情况下生产者会为每条消息分配一个唯一的序列号以便在重试发生时 Broker 能够正确地识别并去重重复的消息。 需要注意的是启用幂等性会对性能产生一些开销因为它引入了额外的序列号和一些额外的网络开销。在生产环境中需要仔细评估幂等性对性能的影响并根据实际需求权衡性能和可靠性。 public static final String TRANSACTIONAL_ID_CONFIG transactional.id; security.providers 参数已经被 Kafka 移除了。在较早的 Kafka 版本中这个参数可能被用于指定安全性相关的提供者。然而从 Kafka 2.0 开始Kafka 已经采用了基于 JAASJava Authentication and Authorization Service的身份验证和授权机制这个参数不再被使用。 现在Kafka 的安全性配置主要包括以下几个方面 身份验证机制Authentication MechanismsKafka 支持多种身份验证机制如SSL/TLS、SASLSimple Authentication and Security Layer、OAuth等。通过配置 security.protocol 参数选择所需的身份验证机制。授权机制Authorization MechanismsKafka 使用 ACLAccess Control Lists来控制对主题和分区的访问权限。可以通过配置 authorizer.class.name 参数选择 ACL 的实现类。加密通信Encryption可以通过配置 SSL/TLS 来对 Kafka 通信进行加密以保护数据在传输过程中的安全性。客户端配置Client Configuration客户端需要根据服务端的安全配置进行相应的配置如设置 SSL/TLS 的信任证书、SASL 的认证信息等。 需要根据具体的安全需求和环境配置 Kafka 的安全性参数。建议查阅最新版本的 Kafka 文档以获取详细的安全配置指南。 public static final String SECURITY_PROVIDERS_CONFIG security.providers; retry.backoff.ms 用于定义在发生可重试的发送错误后生产者在进行重试之前等待的时间间隔以毫秒为单位。 当生产者发送消息到 Kafka 时可能会遇到一些可重试的错误例如网络问题、Kafka 服务器繁忙等。retry.backoff.ms 允许在出现这些可重试错误后等待一段时间然后再次尝试发送消息以避免频繁的重试。这样的设计有助于在短时间内解决暂时性的问题而不至于对 Kafka 服务器造成额外的负担。 具体而言如果发生了可重试的错误生产者将等待 retry.backoff.ms 指定的时间间隔然后进行下一次重试。如果重试依然失败生产者可能会继续进行更多的重试每次之间间隔逐渐增加以避免过度压力和频繁的连接尝试。 默认情况下retry.backoff.ms 的值通常是 100 毫秒但可以根据实际需求和环境进行调整 public static final String RETRY_BACKOFF_MS_CONFIG retry.backoff.ms; compression.type 控制发送到 Kafka 的消息是否压缩。可以是“none”、“gzip”、“snappy”或“lz4” public static final String COMPRESSION_TYPE_CONFIG compression.type; metrics.sample.window.ms 用于配置 Kafka Broker 的参数用于定义度量指标metrics的采样窗口的时间跨度以毫秒为单位。 具体来说这个参数指定了度量指标的采样窗口的持续时间。在这个时间段内Kafka Broker 会收集和计算各种指标比如吞吐量、延迟、请求处理时间等。然后这些度量指标可以被监控工具或者外部系统使用以便实时地监控 Kafka Broker 的运行状态和性能指标。 通过调整 metrics.sample.window.ms 这个参数可以改变度量指标采样的时间窗口长度以适应不同的监控和性能分析需求。较短的采样窗口可以提供更加实时的性能指标但也会增加系统资源的开销而较长的采样窗口则可以减少资源开销但会牺牲一些实时性。 默认情况下metrics.sample.window.ms 的值通常是 30000 毫秒30秒但根据具体的 Kafka 集群配置和监控需求可以进行调整。 public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG metrics.sample.window.ms; metrics.num.samples 用于配置 Kafka Broker 的参数用于指定在每个度量指标采样窗口中收集的样本数量。 具体来说度量指标metrics是用于监视 Kafka Broker 运行状态和性能的关键数据比如吞吐量、延迟、请求处理时间等。而 metrics.num.samples 参数则控制了在每个采样窗口内收集多少个样本。这些样本可以用于计算度量指标的平均值、最大值、最小值等统计信息。 通过调整 metrics.num.samples 这个参数可以平衡度量指标的准确性和资源消耗之间的权衡。较大的样本数量可以提供更加准确的度量指标统计信息但会增加系统资源的开销而较小的样本数量则可以减少资源消耗但可能会牺牲一些准确性。 默认情况下metrics.num.samples 的值通常是 2但根据具体的 Kafka 集群配置和监控需求可以进行调整。 public static final String METRICS_NUM_SAMPLES_CONFIG metrics.num.samples; metrics.recording.level 用于配置度量指标metrics的记录级别。这个参数决定了哪些度量指标会被记录和汇报。 具体来说metrics.recording.level 可以设置为以下几个级别之一 INFO记录常规的度量指标如吞吐量、延迟等。DEBUG记录更详细的度量指标信息可能包括更多的细节和较低级别的度量指标。TRACE记录非常详细的度量指标信息包括所有细节和最低级别的度量指标。 通过调整 metrics.recording.level 这个参数可以灵活地控制记录的度量指标的级别以满足不同场景下的监控和分析需求。例如在生产环境中通常会将记录级别设置为 INFO 或者 DEBUG以便实时监控 Kafka 集群的运行状态和性能指标而在调试或者故障排查时可以将记录级别设置为 TRACE以获取更详细的信息。 默认情况下metrics.recording.level 的值通常是 INFO但可以根据具体的需求和环境进行调整。 public static final String METRICS_RECORDING_LEVEL_CONFIG metrics.recording.level; metric.reporters 用于指定要使用的度量指标metrics报告器。度量指标报告器负责将 Kafka Broker 收集到的度量指标信息发送到指定的位置以供监控和分析使用。 具体来说metric.reporters 参数接受一个逗号分隔的报告器类名列表这些报告器类名必须实现 Kafka 的 org.apache.kafka.common.metrics.MetricsReporter 接口。通过配置这个参数可以启用不同的度量指标报告器并将度量指标信息发送到不同的目的地比如日志、JMX、Graphite、InfluxDB 等。 例如可以使用以下配置启用 JMX 报告器和日志报告器 metric.reportersjmx, kafka.metrics.KafkaMetricsReporter 这样配置后Kafka Broker 将同时使用 JMX 报告器和日志报告器将度量指标信息发送到 JMX 和日志中。 默认情况下metric.reporters 参数为空表示不使用任何度量指标报告器。在实际部署中根据监控和分析需求可以配置不同的度量指标报告器来收集和报告度量指标信息。 public static final String METRIC_REPORTER_CLASSES_CONFIG metric.reporters; max.in.flight.requests.per.connection 用于控制在任何给定时间内允许向单个 Broker 发送的未确认请求的最大数量。 在 Kafka 中生产者发送消息到 Broker 时可以选择等待服务器确认acknowledgement消息发送成功后再发送下一条消息或者继续发送下一条消息而不等待前一条消息的确认。当生产者选择继续发送下一条消息时这些未确认的消息就会处于 “in-flight” 状态。 max.in.flight.requests.per.connection 参数就是用来限制在这种情况下的未确认请求的数量。如果未确认请求的数量达到了这个限制生产者将会阻塞直到有一些请求被确认才会继续发送新的请求。 通过调整 max.in.flight.requests.per.connection 参数可以平衡生产者的吞吐量和消息传递的可靠性之间的权衡。较大的值可以提高生产者的吞吐量因为它允许更多的消息在未确认状态下发送而较小的值可以提高消息传递的可靠性因为它限制了未确认请求的数量从而减少了消息丢失的风险。 默认情况下max.in.flight.requests.per.connection 的值是 5。根据应用程序的要求和实际情况可以适当地调整这个参数的值。 public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION max.in.flight.requests.per.connection;05 KafkaSink 应用依赖 !-- Flink kafka 连接器依赖 start -- dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_2.12/artifactIdversion1.14.4/version /dependency !-- Flink kafka 连接器依赖 end --06 KafkaSink 快速入门 6.1 包结构 6.2 项目配置 log4j2.properties rootLogger.levelINFO rootLogger.appenderRef.console.refConsoleAppender appender.console.nameConsoleAppender appender.console.typeCONSOLE appender.console.layout.typePatternLayout appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n log.fileD:\\tmproot Logger.levelINFO6.3 pom文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.aurora/groupIdartifactIdaurora_kafka_connector/artifactIdversion1.0-SNAPSHOT/version!--属性设置--properties!--java_JDK版本--java.version1.8/java.version!--maven打包插件--maven.plugin.version3.8.1/maven.plugin.version!--编译编码UTF-8--project.build.sourceEncodingUTF-8/project.build.sourceEncoding!--输出报告编码UTF-8--project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding!--json数据格式处理工具--fastjson.version1.2.75/fastjson.version!--log4j版本--log4j.version2.17.1/log4j.version!--flink版本--flink.version1.14.4/flink.version!--scala版本--scala.binary.version2.12/scala.binary.version/properties!--依赖管理--dependencies!-- fastJson工具类依赖 start --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- fastJson工具类依赖 end --!-- log4j日志框架依赖 start --dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/version/dependency!-- log4j日志框架依赖 end --!-- Flink基础依赖 start --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency!-- Flink基础依赖 end --!-- Flink kafka 连接器依赖 start --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka_${scala.binary.version}/artifactIdversion${flink.version}/version/dependency!-- Flink kafka 连接器依赖 end --/dependencies!--编译打包--buildfinalName${project.name}/finalName!--资源文件打包--resourcesresourcedirectorysrc/main/resources/directory/resourceresourcedirectorysrc/main/java/directoryincludesinclude**/*.xml/include/includes/resource/resourcespluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludeorg.google.code.flindbugs:jar305/excludeexcludeorg.slf4j:*/excludeexcluderorg.apache.logging.log4j:*/excluder/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformerimplementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClasscom.aurora.demo,ElasticsearchSinkStreamingJobDemo/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins!--插件统一管理--pluginManagementplugins!--maven打包插件--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring.boot.version}/versionconfigurationforktrue/forkfinalName${project.build.finalName}/finalName/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin!--编译打包插件--pluginartifactIdmaven-compiler-plugin/artifactIdversion${maven.plugin.version}/versionconfigurationsource${java.version}/sourcetarget${java.version}/targetencodingUTF-8/encodingcompilerArgsarg-parameters/arg/compilerArgs/configuration/plugin/plugins/pluginManagement/build/project6.4 Flink集成KafkaSink作业 package com.aurora;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.kafka.clients.producer.ProducerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.Properties; import java.util.UUID;/*** 描述Flink集成kafkaSink实现数据流写入Kafka集群** author 浅夏的猫* version 1.0.0* date 2024-02-18 20:52:25*/ public class KafkaSinkStreamJobDemo {private static final Logger logger LoggerFactory.getLogger(KafkaSinkStreamJobDemo.class);public static void main(String[] args) {try {logger.info(开始启动作业!!!);// 创建Flink运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 创建KafksSink配置Properties properties new Properties();properties.setProperty(ProducerConfig.ACKS_CONFIG, 1);properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, 0);properties.setProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);// 序列化模式KafkaRecordSerializationSchemaString recordSerializer KafkaRecordSerializationSchema.builder()//设置对哪个主题进行序列化.setTopic(topic_a)//设置数据值序列化方式.setValueSerializationSchema(new SimpleStringSchema())//设置数据key序列化方式.setKeySerializationSchema(new SimpleStringSchema()).build();// 创建KafkaSink算子KafkaSinkString kafkaSink KafkaSink.Stringbuilder()//设置kafka各种参数.setKafkaProducerConfig(properties)//设置序列化模式.setRecordSerializer(recordSerializer)//设置传递保证//At Most Once (至多一次) 系统保证消息要么被成功传递一次要么根本不被传递。这种保证意味着消息可能会丢失但不会被传递多次。//At Least Once (至少一次) 系统保证消息至少会被传递一次但可能会导致消息的重复传递。这种保证确保了消息的不丢失但应用程序需要能够处理重复消息的情况。//Exactly Once (精确一次) 系统保证消息会被确切地传递一次而没有任何重复。这是最高级别的传递保证确保消息不会丢失且不会被重复.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)//设置集群地址.setBootstrapServers(127.0.0.1:9092)//设置事务前缀.setTransactionalIdPrefix(flink_).build();// 生成一个数据流SourceFunctionString sourceFunction new SourceFunctionString() {Overridepublic void run(SourceContextString sourceContext) throws Exception {while (true) {String id UUID.randomUUID().toString();sourceContext.collect( id);logger.info(正在下发数据:{},id);Thread.sleep(1000);}}Overridepublic void cancel() {}};// 创建数据源DataStreamSourceString dataStreamSource env.addSource(sourceFunction).setParallelism(1);// 数据流数据通过KafkaSink算子写入kafkadataStreamSource.sinkTo(kafkaSink).setParallelism(1);// 执行任务env.execute(KafkaSinkStreamJobDemo);} catch (Exception e) {e.printStackTrace();}} } 6.5 验证 构建并运行 Flink 应用确保应用能够成功发送数据到 Kafka 主题。你可以通过 Kafka Consumer 来验证是否成功接收到了消息。 这个简单的示例展示了如何使用 Kafka Sink 集成到流处理系统中并且它是可运行的。在实际应用中你可以根据需要配置更多参数例如序列化器、acks 级别、以及其他相关的生产者和 Kafka 配置。 通过kafka命令启动一个消费者观察是否实时消费到数据 #windows kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic topic_a#linux kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_a07 总结 Kafka Sink 是实现流处理到 Kafka 集群的关键组件之一。通过上述示例你可以开始使用 Kafka Sink 将你的流处理数据发送到 Kafka从而实现可靠的消息传递。在实际应用中确保根据业务需求和性能要求调整配置参数以获得最佳的性能和稳定性。
http://www.hkea.cn/news/14307982/

相关文章:

  • 宁波网站推广优化外包公司怎么上传网站源码
  • 招聘网站建设策划书网页小游戏4933
  • 个人什么取消网站备案四平市城市建设档案馆网站
  • 网站建设美工的职位要求域名比价网
  • 南开做网站的公司上海推广网站公司
  • aspnet网站开发的书籍香奈儿网站建设的目标
  • google网站建设龙岗网站 建设深圳信科
  • 休闲食品网站建设目的网页培训
  • 在网站如何做在ps软件做界面厚昌营销网站建设
  • 网站源码路径河南建设工程信息网官方网站
  • 灰色网站设计wordpress 手机 登陆
  • 网站建设前端网络工程专业是做什么工作的
  • 为网站网站做代理wordpress 百度熊掌号
  • 深圳龙岗做网站公司网站设计培训成都哪家好
  • jsp网站开发职位要求做网站到哪里做
  • 盘锦网站开发百度云网盘资源搜索引擎
  • 个人名下公司查询网网站推广优化怎么做最好
  • 溧水做网站做时尚网站取个名字
  • 网站建设基础课程携程网站官网
  • php ajax网站开发典型实例pdfphp网站开发师招聘
  • 网站多语言 设计做个外贸网站大概多少钱
  • 网站推广优化张店html5单页面网站
  • 购物网站二级店铺mvc南宁网站建设产品
  • 为什么推荐企业做网站中文域名转换英文域名
  • 马来西亚做公路投标网站自己做的网站如何发布
  • 晋中网站公司网站与维护
  • 沈北新区建设局网站深汕特别合作区
  • 网站制作报价开南宁网站托管
  • 中国城乡与建设部网站中国建设银行网站
  • aspx网站如何架设上海网站建设-中国互联