专业做pc+手机网站,免备案自助建站网站,网站商城网络整合营销,企业网站做多大尺寸文章目录 01 引言02 连接器依赖2.1 kafka连接器依赖2.2 base基础依赖 03 使用方法04 序列化器05 指标监控06 项目源码实战6.1 包结构6.2 pom.xml依赖6.3 配置文件6.4 创建sink作业 01 引言
KafkaSink 可将数据流写入一个或多个 Kafka topic
实战源码地址,一键下载可用#xf… 文章目录 01 引言02 连接器依赖2.1 kafka连接器依赖2.2 base基础依赖 03 使用方法04 序列化器05 指标监控06 项目源码实战6.1 包结构6.2 pom.xml依赖6.3 配置文件6.4 创建sink作业 01 引言
KafkaSink 可将数据流写入一个或多个 Kafka topic
实战源码地址,一键下载可用https://gitee.com/shawsongyue/aurora.git
模块aurora_flink_connector_kafka
主类KafkaSinkStreamingJob02 连接器依赖
2.1 kafka连接器依赖 !--kafka依赖 start--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.2-1.18/version/dependency!--kafka依赖 end--2.2 base基础依赖 若是不引入该依赖项目启动直接报错Exception in thread main java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitterdependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.18.0/version/dependency03 使用方法
Kafka sink 提供了构建类来创建 KafkaSink 的实例
DataStreamString stream ...;KafkaSinkString sink KafkaSink.Stringbuilder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic-name).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();stream.sinkTo(sink);以下属性在构建 KafkaSink 时是必须指定的
Bootstrap servers, setBootstrapServers(String)
消息序列化器Serializer, setRecordSerializer(KafkaRecordSerializationSchema)
如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证则需要使用 setTransactionalIdPrefix(String)04 序列化器 构建时需要提供 KafkaRecordSerializationSchema 来将输入数据转换为 Kafka 的 ProducerRecord。Flink 提供了 schema 构建器 以提供一些通用的组件例如消息键key/消息体value序列化、topic 选择、消息分区同样也可以通过实现对应的接口来进行更丰富的控制。 其中消息体value序列化方法和 topic 的选择方法是必须指定的此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器
KafkaRecordSerializationSchema.builder().setTopicSelector((element) - {your-topic-selection-logic}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build();05 容错恢复
KafkaSink 总共支持三种不同的语义保证DeliveryGuarantee。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCEFlink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释DeliveryGuarantee.NONE 不提供任何保证消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失但可能会在 Flink 重启时重复因为 Flink 会重新处理旧数据。DeliveryGuarantee.EXACTLY_ONCE: 该模式下Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此如果 consumer 只读取已提交的数据参见 Kafka consumer 配置 isolation.level在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀transactionIdPrefix对不同的应用是唯一的以保证不同作业的事务 不会互相影响此外强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 最大重启时间否则 Kafka 对未提交事务的过期处理会导致数据丢失。
05 指标监控
Kafka sink 会在不同的范围Scope中汇报下列指标。
范围指标用户变量描述类型算子currentSendTimen/a发送最近一条数据的耗时。该指标反映最后一条数据的瞬时值。Gauge
06 项目源码实战
6.1 包结构 6.2 pom.xml依赖
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xsy/groupIdartifactIdaurora_flink_connector_kafka/artifactIdversion1.0-SNAPSHOT/version!--属性设置--properties!--java_JDK版本--java.version11/java.version!--maven打包插件--maven.plugin.version3.8.1/maven.plugin.version!--编译编码UTF-8--project.build.sourceEncodingUTF-8/project.build.sourceEncoding!--输出报告编码UTF-8--project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding!--json数据格式处理工具--fastjson.version1.2.75/fastjson.version!--log4j版本--log4j.version2.17.1/log4j.version!--flink版本--flink.version1.18.0/flink.version!--scala版本--scala.binary.version2.11/scala.binary.version/properties!--通用依赖--dependencies!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency!--集成外部依赖--!--集成日志框架 start--dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/version/dependency!--集成日志框架 end--!--kafka依赖 start--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.2-1.18/version/dependency!--kafka依赖 end--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.18.0/version/dependency/dependencies!--编译打包--buildfinalName${project.name}/finalName!--资源文件打包--resourcesresourcedirectorysrc/main/resources/directory/resourceresourcedirectorysrc/main/java/directoryincludesinclude**/*.xml/include/includes/resource/resourcespluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludeorg.google.code.flindbugs:jar305/excludeexcludeorg.slf4j:*/excludeexcluderorg.apache.logging.log4j:*/excluder/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassorg.aurora.KafkaStreamingJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins!--插件统一管理--pluginManagementplugins!--maven打包插件--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring.boot.version}/versionconfigurationforktrue/forkfinalName${project.build.finalName}/finalName/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin!--编译打包插件--pluginartifactIdmaven-compiler-plugin/artifactIdversion${maven.plugin.version}/versionconfigurationsource${java.version}/sourcetarget${java.version}/targetencodingUTF-8/encodingcompilerArgsarg-parameters/arg/compilerArgs/configuration/plugin/plugins/pluginManagement/build!--配置Maven项目中需要使用的远程仓库--repositoriesrepositoryidaliyun-repos/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/repository/repositories!--用来配置maven插件的远程仓库--pluginRepositoriespluginRepositoryidaliyun-plugin/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/pluginRepository/pluginRepositories/project6.3 配置文件
1application.properties
#kafka集群地址
kafka.bootstrapServerslocalhost:9092
#kafka主题
kafka.topictopic_a
#kafka消费者组
kafka.groupaurora_group2log4j2.properties
rootLogger.levelINFO
rootLogger.appenderRef.console.refConsoleAppender
appender.console.nameConsoleAppender
appender.console.typeCONSOLE
appender.console.layout.typePatternLayout
appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.fileD:\\tmprootLogger.levelINFO
rootLogger.appenderRef.console.refConsoleAppender
appender.console.nameConsoleAppender
appender.console.typeCONSOLE
appender.console.layout.typePatternLayout
appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.fileD:\\tmp6.4 创建sink作业
package com.aurora;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;/*** author 浅夏的猫* description kafka 连接器使用demo作业* datetime 22:21 2024/2/1*/
public class KafkaSinkStreamingJob {private static final Logger logger LoggerFactory.getLogger(KafkaSinkStreamingJob.class);public static void main(String[] args) throws Exception {//1.获取参数//定义文件路径String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink_connector_kafka\\src\\main\\resources\\application.properties;//方式一:直接使用内置工具类ParameterTool paramsMap ParameterTool.fromPropertiesFile(propertiesFilePath);//2.初始化kafka参数String bootstrapServers paramsMap.get(kafka.bootstrapServers);String topic paramsMap.get(kafka.topic);KafkaSinkString sink KafkaSink.Stringbuilder()//设置kafka地址.setBootstrapServers(bootstrapServers)//设置消息序列号方式.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build())//至少一次.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//4.创建Flink运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ArrayListString listData new ArrayList();listData.add(test);listData.add(java);listData.add(c);DataStreamSourceString dataStreamSource env.fromCollection(listData);//5.数据简单处理SingleOutputStreamOperatorString flatMap dataStreamSource.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String record, CollectorString collector) throws Exception {logger.info(正在处理kafka数据:{}, record);collector.collect(record);}});//数据输出算子flatMap.sinkTo(sink);//6.启动服务//开启flink的checkpoint功能每隔1000ms启动一个检查点设置checkpoint的声明周期env.enableCheckpointing(1000);//checkpoint高级选项设置//设置checkpoint的模式为exactly-once这也是默认值env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间至少有500ms间隔即checkpoint的最小间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//确保检查必须在1min之内完成否则就会被丢弃掉即checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);//同一时间只允许操作一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//程序即使被cancel后也会保留checkpoint数据以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方env.getCheckpointConfig().setCheckpointStorage(file:///E:/flink/checkPoint);env.execute();}}