当前位置: 首页 > news >正文

网站开发好什么进行界面的优化wordpress微信公众号企业版

网站开发好什么进行界面的优化,wordpress微信公众号企业版,宁波网站建设设计公司,杭州网站建设哪家设计好文章目录 01 引言02 连接器依赖2.1 kafka连接器依赖2.2 base基础依赖 03 使用方法04 序列化器05 指标监控06 项目源码实战6.1 包结构6.2 pom.xml依赖6.3 配置文件6.4 创建sink作业 01 引言 KafkaSink 可将数据流写入一个或多个 Kafka topic 实战源码地址,一键下载可用#xf… 文章目录 01 引言02 连接器依赖2.1 kafka连接器依赖2.2 base基础依赖 03 使用方法04 序列化器05 指标监控06 项目源码实战6.1 包结构6.2 pom.xml依赖6.3 配置文件6.4 创建sink作业 01 引言 KafkaSink 可将数据流写入一个或多个 Kafka topic 实战源码地址,一键下载可用https://gitee.com/shawsongyue/aurora.git 模块aurora_flink_connector_kafka 主类KafkaSinkStreamingJob02 连接器依赖 2.1 kafka连接器依赖 !--kafka依赖 start--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.2-1.18/version/dependency!--kafka依赖 end--2.2 base基础依赖 若是不引入该依赖项目启动直接报错Exception in thread main java.lang.NoClassDefFoundError: org/apache/flink/connector/base/source/reader/RecordEmitterdependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.18.0/version/dependency03 使用方法 Kafka sink 提供了构建类来创建 KafkaSink 的实例 DataStreamString stream ...;KafkaSinkString sink KafkaSink.Stringbuilder().setBootstrapServers(brokers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic-name).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();stream.sinkTo(sink);以下属性在构建 KafkaSink 时是必须指定的 Bootstrap servers, setBootstrapServers(String) 消息序列化器Serializer, setRecordSerializer(KafkaRecordSerializationSchema) 如果使用DeliveryGuarantee.EXACTLY_ONCE 的语义保证则需要使用 setTransactionalIdPrefix(String)04 序列化器 构建时需要提供 KafkaRecordSerializationSchema 来将输入数据转换为 Kafka 的 ProducerRecord。Flink 提供了 schema 构建器 以提供一些通用的组件例如消息键key/消息体value序列化、topic 选择、消息分区同样也可以通过实现对应的接口来进行更丰富的控制。 其中消息体value序列化方法和 topic 的选择方法是必须指定的此外也可以通过 setKafkaKeySerializer(Serializer) 或 setKafkaValueSerializer(Serializer) 来使用 Kafka 提供而非 Flink 提供的序列化器 KafkaRecordSerializationSchema.builder().setTopicSelector((element) - {your-topic-selection-logic}).setValueSerializationSchema(new SimpleStringSchema()).setKeySerializationSchema(new SimpleStringSchema()).setPartitioner(new FlinkFixedPartitioner()).build();05 容错恢复 KafkaSink 总共支持三种不同的语义保证DeliveryGuarantee。对于 DeliveryGuarantee.AT_LEAST_ONCE 和 DeliveryGuarantee.EXACTLY_ONCEFlink checkpoint 必须启用。默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。 以下是对不同语义保证的解释DeliveryGuarantee.NONE 不提供任何保证消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失但可能会在 Flink 重启时重复因为 Flink 会重新处理旧数据。DeliveryGuarantee.EXACTLY_ONCE: 该模式下Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此如果 consumer 只读取已提交的数据参见 Kafka consumer 配置 isolation.level在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀transactionIdPrefix对不同的应用是唯一的以保证不同作业的事务 不会互相影响此外强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 最大重启时间否则 Kafka 对未提交事务的过期处理会导致数据丢失。 05 指标监控 Kafka sink 会在不同的范围Scope中汇报下列指标。 范围指标用户变量描述类型算子currentSendTimen/a发送最近一条数据的耗时。该指标反映最后一条数据的瞬时值。Gauge 06 项目源码实战 6.1 包结构 6.2 pom.xml依赖 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.xsy/groupIdartifactIdaurora_flink_connector_kafka/artifactIdversion1.0-SNAPSHOT/version!--属性设置--properties!--java_JDK版本--java.version11/java.version!--maven打包插件--maven.plugin.version3.8.1/maven.plugin.version!--编译编码UTF-8--project.build.sourceEncodingUTF-8/project.build.sourceEncoding!--输出报告编码UTF-8--project.reporting.outputEncodingUTF-8/project.reporting.outputEncoding!--json数据格式处理工具--fastjson.version1.2.75/fastjson.version!--log4j版本--log4j.version2.17.1/log4j.version!--flink版本--flink.version1.18.0/flink.version!--scala版本--scala.binary.version2.11/scala.binary.version/properties!--通用依赖--dependencies!-- json --dependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion${fastjson.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-scala_2.12/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency!--集成外部依赖--!--集成日志框架 start--dependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-slf4j-impl/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-api/artifactIdversion${log4j.version}/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-core/artifactIdversion${log4j.version}/version/dependency!--集成日志框架 end--!--kafka依赖 start--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion3.0.2-1.18/version/dependency!--kafka依赖 end--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-base/artifactIdversion1.18.0/version/dependency/dependencies!--编译打包--buildfinalName${project.name}/finalName!--资源文件打包--resourcesresourcedirectorysrc/main/resources/directory/resourceresourcedirectorysrc/main/java/directoryincludesinclude**/*.xml/include/includes/resource/resourcespluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.1.1/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludeorg.apache.flink:force-shading/excludeexcludeorg.google.code.flindbugs:jar305/excludeexcludeorg.slf4j:*/excludeexcluderorg.apache.logging.log4j:*/excluder/excludes/artifactSetfiltersfilterartifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformerstransformer implementationorg.apache.maven.plugins.shade.resource.ManifestResourceTransformermainClassorg.aurora.KafkaStreamingJob/mainClass/transformer/transformers/configuration/execution/executions/plugin/plugins!--插件统一管理--pluginManagementplugins!--maven打包插件--plugingroupIdorg.springframework.boot/groupIdartifactIdspring-boot-maven-plugin/artifactIdversion${spring.boot.version}/versionconfigurationforktrue/forkfinalName${project.build.finalName}/finalName/configurationexecutionsexecutiongoalsgoalrepackage/goal/goals/execution/executions/plugin!--编译打包插件--pluginartifactIdmaven-compiler-plugin/artifactIdversion${maven.plugin.version}/versionconfigurationsource${java.version}/sourcetarget${java.version}/targetencodingUTF-8/encodingcompilerArgsarg-parameters/arg/compilerArgs/configuration/plugin/plugins/pluginManagement/build!--配置Maven项目中需要使用的远程仓库--repositoriesrepositoryidaliyun-repos/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/repository/repositories!--用来配置maven插件的远程仓库--pluginRepositoriespluginRepositoryidaliyun-plugin/idurlhttps://maven.aliyun.com/nexus/content/groups/public//urlsnapshotsenabledfalse/enabled/snapshots/pluginRepository/pluginRepositories/project6.3 配置文件 1application.properties #kafka集群地址 kafka.bootstrapServerslocalhost:9092 #kafka主题 kafka.topictopic_a #kafka消费者组 kafka.groupaurora_group2log4j2.properties rootLogger.levelINFO rootLogger.appenderRef.console.refConsoleAppender appender.console.nameConsoleAppender appender.console.typeCONSOLE appender.console.layout.typePatternLayout appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n log.fileD:\\tmprootLogger.levelINFO rootLogger.appenderRef.console.refConsoleAppender appender.console.nameConsoleAppender appender.console.typeCONSOLE appender.console.layout.typePatternLayout appender.console.layout.pattern%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n log.fileD:\\tmp6.4 创建sink作业 package com.aurora;import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory;import java.util.ArrayList;/*** author 浅夏的猫* description kafka 连接器使用demo作业* datetime 22:21 2024/2/1*/ public class KafkaSinkStreamingJob {private static final Logger logger LoggerFactory.getLogger(KafkaSinkStreamingJob.class);public static void main(String[] args) throws Exception {//1.获取参数//定义文件路径String propertiesFilePath E:\\project\\aurora_dev\\aurora_flink_connector_kafka\\src\\main\\resources\\application.properties;//方式一:直接使用内置工具类ParameterTool paramsMap ParameterTool.fromPropertiesFile(propertiesFilePath);//2.初始化kafka参数String bootstrapServers paramsMap.get(kafka.bootstrapServers);String topic paramsMap.get(kafka.topic);KafkaSinkString sink KafkaSink.Stringbuilder()//设置kafka地址.setBootstrapServers(bootstrapServers)//设置消息序列号方式.setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build())//至少一次.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//4.创建Flink运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ArrayListString listData new ArrayList();listData.add(test);listData.add(java);listData.add(c);DataStreamSourceString dataStreamSource env.fromCollection(listData);//5.数据简单处理SingleOutputStreamOperatorString flatMap dataStreamSource.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String record, CollectorString collector) throws Exception {logger.info(正在处理kafka数据:{}, record);collector.collect(record);}});//数据输出算子flatMap.sinkTo(sink);//6.启动服务//开启flink的checkpoint功能每隔1000ms启动一个检查点设置checkpoint的声明周期env.enableCheckpointing(1000);//checkpoint高级选项设置//设置checkpoint的模式为exactly-once这也是默认值env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//确保检查点之间至少有500ms间隔即checkpoint的最小间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);//确保检查必须在1min之内完成否则就会被丢弃掉即checkpoint的超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);//同一时间只允许操作一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);//程序即使被cancel后也会保留checkpoint数据以便根据实际需要恢复到指定的checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//设置statebackend,指定state和checkpoint的数据存储位置(checkpoint的数据必须得有一个可以持久化存储的地方env.getCheckpointConfig().setCheckpointStorage(file:///E:/flink/checkPoint);env.execute();}}
http://www.hkea.cn/news/14315197/

相关文章:

  • 多个wordpress站点同步上海58同城招聘网最新招聘
  • 东莞企业网站模板建站做电影分享网站违法吗
  • 精品网站建设平台怎样做直播网站app
  • wordpress站点如何添加百度分享代码dede做视频网站
  • 网站建设文化服务郑州搭建网站公司
  • 国外 作品集 网站石排网站仿做
  • 网站建设 个体经营范围网站上传后
  • 无锡网站排名系统微网站开发制作
  • 品牌网站建设费用要多少潍坊网站建设优化
  • 淘宝内部卷网站建设上海小学网站建设招标
  • 做网站的ui如何用代码制作小程序
  • o2o网站建设哪家好福建省漳州市建设厅网站
  • 网站建设相关资料整理的重要性天河区营销型网站建设
  • 做网站怎么设置背景网站建设 南昌
  • 查询做导员的网站定制衣柜设计方案
  • 建设永久网站网站开发的岗位与分工
  • 广州专业网站制作平台沈阳建设学院
  • 万网云服务器怎么上传网站吗网站开发项目实训
  • 延安商城网站开发设计wordpress5.0更新内容
  • cms网站开发教程保定定兴网站建设
  • gps定位网站建设新余 网站建站 设计 公司
  • 河北外贸网站建设妇产科医生免费咨询
  • 公众号开发河北米云自贡网站优化
  • 宁夏固原建设网站网站开发的实例教程
  • 百度信息流代理上海企业网站排名优化
  • 贵州网站建设模板手机企业网站设计理念
  • 网站公司网站建设什么叫网页
  • 网站建设公司清明雨上宁波网站开发
  • 个人网站 如何做推广郑州网站推广 汉狮网络
  • 西宁 网站建设小程序登录入口在哪