当前位置: 首页 > news >正文

专业做pc+手机网站免备案自助建站网站

专业做pc+手机网站,免备案自助建站网站,网站商城网络整合营销,企业网站做多大尺寸文章目录 01 引言02 连接器依赖2.1 kafka连接器依赖2.2 base基础依赖 03 使用方法04 序列化器05 指标监控06 项目源码实战6.1 包结构6.2 pom.xml依赖6.3 配置文件6.4 创建sink作业 01 引言 KafkaSink 可将数据流写入一个或多个 Kafka topic 实战源码地址,一键下载可用#xf… 文章目录 01 引言02 连接器依赖2.1 kafka连接器依赖2.2 base基础依赖 03 使用方法04 序列化器05 指标监控06 项目源码实战6.1 包结构6.2 pom.xml依赖6.3 配置文件6.4 创建sink作业 01 引言 KafkaSink 可将数据流写入一个或多个 Kafka topic 实战源码地址,一键下载可用https://gitee.com/shawsongyue/aurora.git 模块aurora_flink_connector_kafka 主类KafkaSinkStreamingJob02 连接器依赖 2.1 kafka连接器依赖 !--kafka依赖 start--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.2-1.18/version/dependency!--kafka依赖 end--2.2 base基础依赖 若是不引入该依赖项目启动直接报错Exception in thread main java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitterdependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.18.0/version/dependency03 使用方法 Kafka sink 提供了构建类来创建 KafkaSink 的实例 DataStreamString stream ...;KafkaSinkString sink KafkaSink.Stringbuilder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic-name).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();stream.sinkTo(sink);以下属性在构建 KafkaSink 时是必须指定的 Bootstrap servers, setBootstrapServers(String) 消息序列化器Serializer, setRecordSerializer(KafkaRecordSerializationSchema) 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证则需要使用 setTransactionalIdPrefix(String)04 序列化器 构建时需要提供 KafkaRecordSerializationSchema 来将输入数据转换为 Kafka 的 ProducerRecord。Flink 提供了 schema 构建器 以提供一些通用的组件例如消息键key/消息体value序列化、topic 选择、消息分区同样也可以通过实现对应的接口来进行更丰富的控制。 其中消息体value序列化方法和 topic 的选择方法是必须指定的此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器 KafkaRecordSerializationSchema.builder().setTopicSelector((element) - {your-topic-selection-logic}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build();05 容错恢复 KafkaSink 总共支持三种不同的语义保证DeliveryGuarantee。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCEFlink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释DeliveryGuarantee.NONE 不提供任何保证消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失但可能会在 Flink 重启时重复因为 Flink 会重新处理旧数据。DeliveryGuarantee.EXACTLY_ONCE: 该模式下Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此如果 consumer 只读取已提交的数据参见 Kafka consumer 配置 isolation.level在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀transactionIdPrefix对不同的应用是唯一的以保证不同作业的事务 不会互相影响此外强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 最大重启时间否则 Kafka 对未提交事务的过期处理会导致数据丢失。 05 指标监控 Kafka sink 会在不同的范围Scope中汇报下列指标。 范围指标用户变量描述类型算子currentSendTimen/a发送最近一条数据的耗时。该指标反映最后一条数据的瞬时值。Gauge 06 项目源码实战 6.1 包结构 6.2 pom.xml依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xsy/groupIdartifactIdaurora_flink_connector_kafka/artifactIdversion1.0-SNAPSHOT/version!--属性设置--properties!--java_JDK版本--java.version11/java.version!--maven打包插件--maven.plugin.version3.8.1/maven.plugin.version!--编译编码UTF-8--project.build.sourceEncodingUTF-8/project.build.sourceEncoding!--输出报告编码UTF-8--project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding!--json数据格式处理工具--fastjson.version1.2.75/fastjson.version!--log4j版本--log4j.version2.17.1/log4j.version!--flink版本--flink.version1.18.0/flink.version!--scala版本--scala.binary.version2.11/scala.binary.version/properties!--通用依赖--dependencies!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency!--集成外部依赖--!--集成日志框架 start--dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/version/dependency!--集成日志框架 end--!--kafka依赖 start--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.2-1.18/version/dependency!--kafka依赖 end--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.18.0/version/dependency/dependencies!--编译打包--buildfinalName${project.name}/finalName!--资源文件打包--resourcesresourcedirectorysrc/main/resources/directory/resourceresourcedirectorysrc/main/java/directoryincludesinclude**/*.xml/include/includes/resource/resourcespluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludeorg.google.code.flindbugs:jar305/excludeexcludeorg.slf4j:*/excludeexcluderorg.apache.logging.log4j:*/excluder/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassorg.aurora.KafkaStreamingJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins!--插件统一管理--pluginManagementplugins!--maven打包插件--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring.boot.version}/versionconfigurationforktrue/forkfinalName${project.build.finalName}/finalName/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin!--编译打包插件--pluginartifactIdmaven-compiler-plugin/artifactIdversion${maven.plugin.version}/versionconfigurationsource${java.version}/sourcetarget${java.version}/targetencodingUTF-8/encodingcompilerArgsarg-parameters/arg/compilerArgs/configuration/plugin/plugins/pluginManagement/build!--配置Maven项目中需要使用的远程仓库--repositoriesrepositoryidaliyun-repos/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/repository/repositories!--用来配置maven插件的远程仓库--pluginRepositoriespluginRepositoryidaliyun-plugin/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/pluginRepository/pluginRepositories/project6.3 配置文件 1application.properties #kafka集群地址 kafka.bootstrapServerslocalhost:9092 #kafka主题 kafka.topictopic_a #kafka消费者组 kafka.groupaurora_group2log4j2.properties rootLogger.levelINFO rootLogger.appenderRef.console.refConsoleAppender appender.console.nameConsoleAppender appender.console.typeCONSOLE appender.console.layout.typePatternLayout appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n log.fileD:\\tmprootLogger.levelINFO rootLogger.appenderRef.console.refConsoleAppender appender.console.nameConsoleAppender appender.console.typeCONSOLE appender.console.layout.typePatternLayout appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n log.fileD:\\tmp6.4 创建sink作业 package com.aurora;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.ArrayList;/*** author 浅夏的猫* description kafka 连接器使用demo作业* datetime 22:21 2024/2/1*/ public class KafkaSinkStreamingJob {private static final Logger logger LoggerFactory.getLogger(KafkaSinkStreamingJob.class);public static void main(String[] args) throws Exception {//1.获取参数//定义文件路径String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink_connector_kafka\\src\\main\\resources\\application.properties;//方式一:直接使用内置工具类ParameterTool paramsMap ParameterTool.fromPropertiesFile(propertiesFilePath);//2.初始化kafka参数String bootstrapServers paramsMap.get(kafka.bootstrapServers);String topic paramsMap.get(kafka.topic);KafkaSinkString sink KafkaSink.Stringbuilder()//设置kafka地址.setBootstrapServers(bootstrapServers)//设置消息序列号方式.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build())//至少一次.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//4.创建Flink运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ArrayListString listData new ArrayList();listData.add(test);listData.add(java);listData.add(c);DataStreamSourceString dataStreamSource env.fromCollection(listData);//5.数据简单处理SingleOutputStreamOperatorString flatMap dataStreamSource.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String record, CollectorString collector) throws Exception {logger.info(正在处理kafka数据:{}, record);collector.collect(record);}});//数据输出算子flatMap.sinkTo(sink);//6.启动服务//开启flink的checkpoint功能每隔1000ms启动一个检查点设置checkpoint的声明周期env.enableCheckpointing(1000);//checkpoint高级选项设置//设置checkpoint的模式为exactly-once这也是默认值env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间至少有500ms间隔即checkpoint的最小间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//确保检查必须在1min之内完成否则就会被丢弃掉即checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);//同一时间只允许操作一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//程序即使被cancel后也会保留checkpoint数据以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方env.getCheckpointConfig().setCheckpointStorage(file:///E:/flink/checkPoint);env.execute();}}
http://www.hkea.cn/news/14285870/

相关文章:

  • 网站建设中的注册和登录页面抖音推广计划
  • 网站建设与管理是什么工作做印刷的网站有哪些
  • 网站开发语言为 php用wordpress做开放的wiki
  • 慈云寺网站建设网页打不开显示404要怎么处理
  • app ui设计欣赏 网站网站后台管理系统 静态页面
  • 深圳电商网站公司wordpress中php代码只能一行一行写
  • 网站设计和平面设计定西市小企业网站建设
  • 淄博网站制作制作专业北京翻译公司
  • 山西笑傲网站建设推广西安做义工网站
  • 基层建设网站海南网站建设制作
  • 安徽盛绿建设网站单位网站开发合同范本
  • 网站开发软件三剑客做饲料机械的网站
  • 做微商建自己的网站有用吗虚拟主机如何做多个网站
  • 石家庄网站建设加王道下拉制作企业网页的公司
  • 淘宝导购网站模版阿里云虚拟主机做多个网站
  • 什么网站可以发布信息如何免费让网站上线
  • 邯郸网站设计哪家专业wordpress插件改图标
  • 网站上怎么做福彩卖家ps制作个人网站首页
  • 网站建设销售工资成都信用
  • 肇庆网站建设咨询如何将WORDPRESS主题换成英文
  • 网站搜索防止攻击wordpress站群管理系统
  • 浙江省建设工程质量管理协会网站邯郸做网站多少钱
  • dede做招聘网站wordpress分站
  • 怎样给网站做推广做网站必须要电脑吗
  • 手机壳在线设计网站建站宝盒里的手机网站
  • 衡阳县做淘宝网站建设机械 网站源码
  • 网上做效果图网站移动网站建设价格便宜
  • dede怎么做网站集团酒店网站建设
  • 徐州制作手机网站建设网站的必要与可行性
  • 网站开发那种语言好判断 摘要wordpress