邦邻网站建设熊掌号,WordPress文章怎么折叠,个人简历电子版填写免费模板,太原做手机网站建设RDD Cache缓存 RDD通过Cache或者Persist方法将前面的计算结果缓存#xff0c;默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存#xff0c;而是触发后面的action时#xff0c;该RDD将会被缓存在计算节点的内存中#xff0c;并供…RDD Cache缓存 RDD通过Cache或者Persist方法将前面的计算结果缓存默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存而是触发后面的action时该RDD将会被缓存在计算节点的内存中并供后面重用。 1代码实现
object cache01 {def main(args: Array[String]): Unit {//1.创建SparkConf并设置App名称val conf: SparkConf new SparkConf().setAppName(SparkCoreTest).setMaster(local[*])//2.创建SparkContext该对象是提交Spark App的入口val sc: SparkContext new SparkContext(conf)//3. 创建一个RDD读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] sc.textFile(input1)//3.1.业务逻辑val wordRdd: RDD[String] lineRdd.flatMap(line line.split( ))val wordToOneRdd: RDD[(String, Int)] wordRdd.map {word {println(************)(word, 1)}}//3.5 cache操作会增加血缘关系不改变原有的血缘关系println(wordToOneRdd.toDebugString)//3.4 数据缓存。wordToOneRdd.cache()//3.6 可以更改存储级别// wordToOneRdd.persist(StorageLevel.MEMORY_AND_DISK_2)//3.2 触发执行逻辑wordToOneRdd.collect()println(-----------------)println(wordToOneRdd.toDebugString)//3.3 再次触发执行逻辑wordToOneRdd.collect()//4.关闭连接sc.stop()}
}
2源码解析
mapRdd.cache()
def cache(): this.type persist()
def persist(): this.type persist(StorageLevel.MEMORY_ONLY)object StorageLevel {val NONE new StorageLevel(false, false, false, false)val DISK_ONLY new StorageLevel(true, false, false, false)val DISK_ONLY_2 new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 new StorageLevel(true, true, false, false, 2)val OFF_HEAP new StorageLevel(true, true, true, false, 1) 注意默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。 缓存有可能丢失或者存储于内存的数据由于内存不足而被删除RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换丢失的数据会被重算由于RDD的各个Partition是相对独立的因此只需要计算丢失的部分即可并不需要重算全部Partition。
3自带缓存算子 Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是在实际使用的时候如果想重用数据仍然建议调用persist或cache。
object cache02 {def main(args: Array[String]): Unit {//1.创建SparkConf并设置App名称val conf: SparkConf new SparkConf().setAppName(SparkCoreTest).setMaster(local[*])//2.创建SparkContext该对象是提交Spark App的入口val sc: SparkContext new SparkContext(conf)//3. 创建一个RDD读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] sc.textFile(input1)//3.1.业务逻辑val wordRdd: RDD[String] lineRdd.flatMap(line line.split( ))val wordToOneRdd: RDD[(String, Int)] wordRdd.map {word {println(************)(word, 1)}}// 采用reduceByKey自带缓存val wordByKeyRDD: RDD[(String, Int)] wordToOneRdd.reduceByKey(__)//3.5 cache操作会增加血缘关系不改变原有的血缘关系println(wordByKeyRDD.toDebugString)//3.4 数据缓存。//wordByKeyRDD.cache()//3.2 触发执行逻辑wordByKeyRDD.collect()println(-----------------)println(wordByKeyRDD.toDebugString)//3.3 再次触发执行逻辑wordByKeyRDD.collect()//4.关闭连接sc.stop()}
} 访问http://localhost:4040/jobs/页面查看第一个和第二个job的DAG图。说明增加缓存后血缘依赖关系仍然有但是第二个job取的数据是从缓存中取的。 RDD CheckPoint检查点
1检查点是通过将RDD中间结果写入磁盘。
2为什么要做检查点 由于血缘依赖过长会造成容错成本过高这样就不如在中间阶段做检查点容错如果检查点之后有节点出现问题可以从检查点开始重做血缘减少了开销。
3检查点存储路径Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统
4检查点数据存储格式为二进制的文件
5检查点切断血缘在Checkpoint的过程中该RDD的所有依赖于父RDD中的信息将全部被移除。
6检查点触发时间对RDD进行checkpoint操作并不会马上被执行必须执行Action操作才能触发。但是检查点为了数据安全会从血缘关系的最开始执行一遍 7设置检查点步骤
1设置检查点数据存储路径sc.setCheckpointDir(./checkpoint1)
2调用检查点方法wordToOneRdd.checkpoint()
8代码实现 object checkpoint01 {def main(args: Array[String]): Unit {//1.创建SparkConf并设置App名称val conf: SparkConf new SparkConf().setAppName(SparkCoreTest).setMaster(local[*])//2.创建SparkContext该对象是提交Spark App的入口val sc: SparkContext new SparkContext(conf)// 需要设置路径否则抛异常Checkpoint directory has not been set in the SparkContextsc.setCheckpointDir(./checkpoint1)//3. 创建一个RDD读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] sc.textFile(input1)//3.1.业务逻辑val wordRdd: RDD[String] lineRdd.flatMap(line line.split( ))val wordToOneRdd: RDD[(String, Long)] wordRdd.map {word {(word, System.currentTimeMillis())}}//3.5 增加缓存,避免再重新跑一个job做checkpoint
// wordToOneRdd.cache()//3.4 数据检查点针对wordToOneRdd做检查点计算wordToOneRdd.checkpoint()//3.2 触发执行逻辑wordToOneRdd.collect().foreach(println)// 会立即启动一个新的job来专门的做checkpoint运算//3.3 再次触发执行逻辑wordToOneRdd.collect().foreach(println)wordToOneRdd.collect().foreach(println)Thread.sleep(10000000)//4.关闭连接sc.stop()}
}
9执行结果 访问http://localhost:4040/jobs/页面查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明检查点切断了血缘依赖关系。 1只增加checkpoint没有增加Cache缓存打印
第1个job执行完触发了checkpoint第2个job运行checkpoint并把数据存储在检查点上。第3、4个job数据从检查点上直接读取。
(hadoop,1577960215526)
。。。。。。
(hello,1577960215526)
(hadoop,1577960215609)
。。。。。。
(hello,1577960215609)
(hadoop,1577960215609)
。。。。。。
(hello,1577960215609)
2增加checkpoint也增加Cache缓存打印
第1个job执行完数据就保存到Cache里面了第2个job运行checkpoint直接读取Cache里面的数据并把数据存储在检查点上。第3、4个job数据从检查点上直接读取。
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225)
(hadoop,1577960642223)
。。。。。。
(hello,1577960642225) 缓存和检查点区别
1Cache缓存只是将数据保存起来不切断血缘依赖。Checkpoint检查点切断血缘依赖。
2Cache缓存的数据通常存储在磁盘、内存等地方可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统可靠性高。
3建议对checkpoint()的RDD使用Cache缓存这样checkpoint的job只需从Cache缓存中读取数据即可否则需要再从头计算一次RDD。
4如果使用完了缓存可以通过unpersist方法释放缓存
检查点存储到HDFS集群
如果检查点数据存储到HDFS集群要注意配置访问集群的用户名。否则会报访问权限异常。
object checkpoint02 {def main(args: Array[String]): Unit {// 设置访问HDFS集群的用户名System.setProperty(HADOOP_USER_NAME,atguigu)//1.创建SparkConf并设置App名称val conf: SparkConf new SparkConf().setAppName(SparkCoreTest).setMaster(local[*])//2.创建SparkContext该对象是提交Spark App的入口val sc: SparkContext new SparkContext(conf)// 需要设置路径.需要提前在HDFS集群上创建/checkpoint路径sc.setCheckpointDir(hdfs://hadoop102:9000/checkpoint)//3. 创建一个RDD读取指定位置文件:hello atguigu atguiguval lineRdd: RDD[String] sc.textFile(input1)//3.1.业务逻辑val wordRdd: RDD[String] lineRdd.flatMap(line line.split( ))val wordToOneRdd: RDD[(String, Long)] wordRdd.map {word {(word, System.currentTimeMillis())}}//3.4 增加缓存,避免再重新跑一个job做checkpointwordToOneRdd.cache()//3.3 数据检查点针对wordToOneRdd做检查点计算wordToOneRdd.checkpoint()//3.2 触发执行逻辑wordToOneRdd.collect().foreach(println)//4.关闭连接sc.stop()}
}