网站项目整体思路,长沙手机网站设计公司,南充建网站,卖营销软件的网站目录
一、Checkpoint 剖析
State 与 Checkpoint 概念区分
设置 Checkpoint 实战
执行代码所需的服务与遇到的问题
二、重启策略解读
重启策略意义
代码示例与效果展示
三、SavePoint
与 Checkpoint 异同
操作步骤详解
四、总结 在大数据流式处理领域#xff0c;Ap…目录
一、Checkpoint 剖析
State 与 Checkpoint 概念区分
设置 Checkpoint 实战
执行代码所需的服务与遇到的问题
二、重启策略解读
重启策略意义
代码示例与效果展示
三、SavePoint
与 Checkpoint 异同
操作步骤详解
四、总结 在大数据流式处理领域Apache Flink 凭借其卓越的性能和强大的功能占据重要地位。而理解 Flink 中的 Checkpoint检查点、重启策略以及 SavePoint保存点这些关键概念对于保障流处理任务的稳定性、容错性以及可维护性至关重要。本文将深入剖析它们的原理、用法并结合实际代码示例展示其效果希望能帮助大家更好地掌握 Flink 相关知识。 一、Checkpoint 剖析 State 与 Checkpoint 概念区分 State状态 在 Flink 中State 代表某一个 Operator算子在某一时刻的状态像常见的聚合算子 maxBy、sum 等操作过程中就会维护状态信息。比如在对数据流按某个字段做 sum 聚合时它需要记住历史数据以便持续累加计算并且这些状态数据默认存于内存之中为算子的持续、准确运行提供依据。 Checkpoint检查点 / 快照点 它是 Flink 中所有有状态的 Operator 在某一个特定时刻的 State 快照信息汇总也就是 State 的存档记录。可以简单理解为对整个作业运行时状态拍了一张 “照片”定格所有相关算子彼时的状态方便后续在故障恢复等场景使用。 设置 Checkpoint 实战
以下是一段设置 Checkpoint 的 Flink Java 代码示例 package com.bigdata.day06;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class _01CheckPointDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 在windows运行将数据提交hdfs,会出现权限问题使用这个语句解决。System.setProperty(HADOOP_USER_NAME, root);// 在这个基础之上添加快照// 第一句开启快照每隔1s保存一次快照env.enableCheckpointing(1000);// 第二句设置快照保存的位置env.setStateBackend(new FsStateBackend(hdfs://bigdata01:9820/flink/checkpoint));// 第三句 通过webui的cancel按钮取消flink的job时不删除HDFS的checkpoint目录env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2. source-加载数据DataStreamSourceString dataStreamSource env.socketTextStream(localhost, 9999);SingleOutputStreamOperatorTuple2String, Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String s) throws Exception {String[] arr s.split(,);return Tuple2.of(arr[0], Integer.valueOf(arr[1]));}});//3. transformation-数据处理转换SingleOutputStreamOperatorTuple2String, Integer result mapStream.keyBy(0).sum(1);result.print();//4. sink-数据输出//5. execute-执行env.execute();}
} 执行代码所需的服务与遇到的问题
启动本地的nc 启动hdfs服务。
启动代码发现有权限问题
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: useradmin, accessWRITE, inode/flink:root:supergroup:drwxr-xr-x
解决方案
System.setProperty(HADOOP_USER_NAME, root);在设置检查点之前设置一句这样带权限的语句如果是集群运行中不存在该问题。可以不设置 查看快照情况 运行刷新查看checkpoint保存的数据它会先生成一个新的文件夹然后再删除老的文件夹在某一时刻会出现两个文件夹同时存在的情况。 启动HDFS、Flink
start-dfs.sh
start-cluster.sh
数据是保存了但是并没有起作用想起作用需要在集群上运行以下演示集群上的效果
第一次运行的时候
在本地先clean, 再package 再Wagon一下 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jarflink run -c com.bigdata.day06._01CheckPointDemo /opt/app/FlinkDemo-1.0-SNAPSHOT.jar记得先启动nc ,再启动任务否则报错
通过nc -lk 9999 输入以下内容 想查看运行结果可以通过使用的slot数量判断一下 取消flink job的运行 查看一下这次的单词统计到哪个数字了 第二次运行的时候
flink run -c 全类名 -s hdfs://hadoop10:8020/flink-checkpoint/293395ef7e496bda2eddd153a18d5212/chk-34 /opt/app/flink-test-1.0-SNAPSHOT.jar启动
flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar
-s 指定从checkpoint目录恢复状态数据 注意每个人都不一样
从上一次离开时截止的checkpoint目录 观察数据输入一个hello,1 得到新的结果hello,8 二、重启策略解读 重启策略意义 流式数据如同永不干涸的河流持续流淌一旦因某条错误数据致使程序异常退出后续海量数据丢失风险极高对企业而言这意味着数据资产受损、业务分析结果偏差等严重后果重启策略应运而生。它作为独立策略与 Checkpoint 虽无必然绑定关系即便没配置 Checkpoint 也能单独配置重启策略却在保障程序持续运行层面协同发挥关键作用。 一个流在运行过程中假如出现了程序异常问题可以进行重启比如在代码中人为添加一些异常 进行wordcount时输入了一个bug,1 人为触发异常。 注意此时如果有checkpoint ,是不会出现异常的需要将checkpoint的代码关闭再重启程序。会发现打印了异常那为什么checkpoint的时候不打印因为并没有log4j的配置文件需要搞一个这样的配置文件才行。
程序中添加log4j.properties的代码
# Global logging configuration
# Debug info warn error
log4j.rootLoggerdebug, stdout
# MyBatis logging configuration...
log4j.logger.org.mybatis.example.BlogMapperTRACE
# Console output...
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%5p [%t] - %m%n 开启检查点之后报错了程序还在运行是因为开启检查点之后程序会进行自动重启无限重启【程序错了才重启】。
//开启checkpoint默认是无限重启可以设置为不重启
//env.setRestartStrategy(RestartStrategies.noRestart());//重启3次重启时间间隔是10s
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));//2分钟内重启3次重启时间间隔是5s
env.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS))
);env.execute(checkpoint自动重启); //最后一句execute可以设置jobName,显示在8081界面
程序如果上传至服务器端运行可以看到重启状态 代码示例与效果展示 import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.concurrent.TimeUnit;public class Demo02 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);// 代码中不能有checkpoint不是说checkpoint不好而是太好了它已经自带重试机制了。而且是无限重启的// 通过如下方式可将重试机制关掉// env.setRestartStrategy(RestartStrategies.noRestart());//// 两种办法// 第一种办法重试3次每一次间隔10S//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 第二种写法在2分钟内重启3次每次间隔10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(2,TimeUnit.MINUTES),Time.of(5,TimeUnit.SECONDS)));//2. source-加载数据DataStreamSourceString streamSource env.socketTextStream(bigdata01, 8899);streamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] arr value.split(,);String word arr[0];if(word.equals(bug)){throw new Exception(有异常服务会挂掉.....);}// 将一个字符串变为int类型int num Integer.valueOf(arr[1]);// 第二种将字符串变为数字的方法System.out.println(Integer.parseInt(arr[1]));Tuple2String, Integer tuple2 new Tuple2(word,num);// 还有什么方法 第二种创建tuple的方法Tuple2String, Integer tuple2_2 Tuple2.of(word,num);return tuple2;}}).keyBy(tuple-tuple.f0).sum(1).print();//3. transformation-数据处理转换//4. sink-数据输出//5. execute-执行env.execute();}
} 在此代码中人为在 map 函数里设置异常触发点输入包含 “bug” 的数据时抛出异常。若开启 Checkpoint因它自带重试机制默认无限重启异常可能被掩盖需关闭 Checkpoint 相关代码才能看到异常打印情况。同时要完整看到重启策略效果如按设定的次数、间隔重启需打包代码上传至集群运行本地测试难以呈现完整现象且提交时务必确认使用的类名准确无误。 三、SavePoint 与 Checkpoint 异同 相同点 本质都是对 Flink 作业状态的一种保存方式以便后续恢复作业时复用状态保障数据处理连贯性。 不同点 Checkpoint 是 Flink 自动按设定规则周期性完成 State 快照保存旨在应对故障自动恢复场景而 SavePoint 是手动触发的快照操作提供更灵活的作业状态管理时机比如在版本升级、业务规则调整需暂停并后续重启作业场景发挥优势。 操作步骤详解 提交作业并输入数据 提交含重启策略代码打包成的 jar 包运行作业类似 flink run -c 全类名 /opt/app/flink-test-1.0-SNAPSHOT.jar输入数据观察单词对应数字变化。 执行 SavePoint 操作
以下是 -- 停止flink job并且触发savepoint操作
flink stop --savepointPath hdfs://bigdata01:9820/flink-savepoint 152e493da9cdeb327f6cbbad5a7f8e41后面的序号为Job 的ID以下是 -- 不会停止flink的job只是完成savepoint操作
flink savepoint 79f53c5c0bb3563b6b6ed3011176c411 hdfs://bigdata01:9820/flink-savepoint 备注如何正确停止一个 flink 的任务 flink stop 6a27b580aa5c6b57766ae6241d9270ce任务编号 查看与重启作业 查看最近完成作业对应的 SavePoint之后依据之前保存路径重启作业flink run -c 全类名 -s hdfs://hadoop10:8020/flink-savepoint/savepoint-79f53c-64b5d94771eb /opt/app/flink-test-1.0-SNAPSHOT.jar再次输入数据可看到基于之前状态的累加效果。 此外在集群运行 Flink 程序时默认并行度常为 1它不会按照机器的CPU核数而是按照配置文件中的一个默认值运行的。比如flink-conf.yaml web-ui 界面提交作业 这个图形化界面跟我们使用如下命令是一个效果 flink run -c com.bigdata.day06._01CheckPointDemo -s hdfs://bigdata01:9820/flink/checkpoint/bf416df7225b264fc34f8ff7e3746efe/chk-603 /opt/app/FlinkDemo-1.0-SNAPSHOT.jar 四、总结 通过对 Flink 中 Checkpoint、重启策略和 SavePoint 的详细解读与代码实践展示我们明晰它们各自在保障流处理任务稳定、容错与灵活运维层面的独特价值。合理运用这些机制能助我们打造更健壮、高效的 Flink 大数据处理应用从容应对复杂多变的业务需求与运行环境挑战后续大家可在实际项目中深入实践优化挖掘其更大潜力。