当前位置: 首页 > news >正文

谷歌外贸网站seo怎么做wordpress安装遇到FTP

谷歌外贸网站seo怎么做,wordpress安装遇到FTP,化妆品网站建设思路,网页自动点击软件文章目录 RDD 概述RDD 组成RDD 的作用RDD 算子分类RDD 的创建1.从外部数据源读取2.从已有的集合或数组创建3.从已有的 RDD 进行转换 RDD 常用算子大全转换算子行动算子 RDD 算子综合练习RDD 依赖关系窄依赖宽依赖宽窄依赖算子区分 RDD 血统信息血统信息的作用血统信息的组成代码… 文章目录 RDD 概述RDD 组成RDD 的作用RDD 算子分类RDD 的创建1.从外部数据源读取2.从已有的集合或数组创建3.从已有的 RDD 进行转换 RDD 常用算子大全转换算子行动算子 RDD 算子综合练习RDD 依赖关系窄依赖宽依赖宽窄依赖算子区分 RDD 血统信息血统信息的作用血统信息的组成代码案例血统信息的弊端 RDD 持久化与缓存持久化级别缓存代码案例 RDD 容错机制Checkpoint缓存与检查点的区别代码案例 DAGStage执行流程代码案例 累加器累加器的类型累加器的创建与使用代码案例 广播变量广播变量的创建与使用代码案例 RDD 概述 RDDResilient Distributed Dataset是 Spark 中的核心数据抽象代表着分布式的不可变的数据集合。 RDD 具有以下几个重要的特点和特性 分布式的RDD 将数据分布存储在集群中的多个计算节点上每个节点上都存储着数据的一个分区。这样可以实现数据的并行处理和计算。 不可变的RDD 是不可变的数据集合一旦创建就不能被修改。任何对 RDD 进行的转换操作都会生成一个新的 RDD原始的 RDD 不受影响。 可并行计算的RDD 支持并行计算可以在集群中的多个计算节点上同时进行计算。这样可以充分利用集群中的资源加速数据处理和计算。 容错性RDD 具有容错性可以在计算节点失败或数据丢失时进行恢复。它通过 RDD 的血统Lineage来记录每个 RDD 的来源和依赖关系当某个分区的数据丢失或出错时Spark 可以根据 RDD 的血统重新计算丢失的数据分区保证计算结果的正确性。 RDD 组成 RDD 的重要组成主要属性部分包括数据的分区、计算函数、依赖关系、分区函数和最佳位置等信息。这些关键组成部分共同构成了 RDD 的核心特性和功能为 Spark 提供了高效的数据处理和计算能力。 数据集合 RDD 表示了一个分布式的、不可变的数据集合。这个数据集合可以来自于外部数据源如 HDFS、本地文件系统、HBase、Cassandra 等也可以通过对其他 RDD 进行转换操作生成。 分区列表 RDD 将数据分成多个分区存储在集群中的不同计算节点上每个分区在集群中的一个计算节点上计算。分区列表描述了数据集被划分为多少个分区以及分区之间的关系。 计算函数 每个 RDD 都有一个计算函数用于描述数据的转换过程。转换操作会生成新的 RDD新的 RDD 依赖于原始的 RDD。计算函数定义了 RDD 如何通过转换操作来生成新的 RDD。 依赖关系 依赖关系描述了 RDD 之间的依赖关系即一个 RDD 如何依赖于其他 RDD。Spark 中的转换操作会生成新的 RDD新的 RDD 依赖于原始的 RDD。依赖关系通过 RDD 的血统Lineage来表示用于容错和数据恢复。 分区函数 分区函数定义了 RDD 中数据如何分布到各个分区中。默认情况下Spark 使用 hash 分区函数将数据均匀地分布到各个分区中但用户也可以根据需要自定义分区函数根据数据的特性进行分区。 最佳位置 最佳位置指的是 RDD 中数据的最佳存储位置即计算节点上离数据所在位置最近的节点。Spark 会尽量将计算任务调度到数据所在的节点上执行以减少数据传输和网络通信开销提高计算性能。 RDD 的作用 RDD 在 Spark 中扮演着多种角色包括数据抽象、并行计算、数据处理和转换以及数据持久化等为用户提供了一个高效、通用、可扩展且易用的大数据处理平台。 数据抽象RDD 是 Spark 中的核心数据抽象代表着分布式的不可变的数据集合。它可以从各种数据源创建如HDFS、HBase、本地文件系统、数据库等同时也可以通过对现有 RDD 进行转换操作生成新的 RDD。这种灵活的数据抽象使得开发者可以轻松地处理和分析各种类型的数据。 并行计算RDD 支持并行计算在集群中的多个计算节点上同时进行计算充分利用集群的资源加速数据处理和分析过程。通过 RDD 的分区机制Spark 可以将数据分配到不同的计算节点上进行并行计算提高了数据处理的效率。 数据处理和转换RDD 提供了丰富的转换操作如map、filter、reduceByKey、join等用于对数据进行转换和处理。这些转换操作可以将原始数据集合转换成各种形式的数据集合实现复杂的数据处理和分析任务。 数据持久化RDD 支持数据持久化可以将中间计算结果缓存到内存或磁盘中以加速迭代计算和交互式查询。通过持久化操作Spark 可以在迭代算法中复用中间计算结果避免重复计算提高计算性能。 RDD 算子分类 算子Operator通常是指对数据进行操作的一种函数或方法。在 Spark 中算子是指对 RDD 或 DataFrame 等数据集进行转换或行动操作的函数或方法。 在 RDD 中支持两种算子类型的操作 转换操作算子Transformations转换操作是对现有的 RDD 进行转换生成一个新的 RDD新的数据集。常见的转换操作包括map、filter、flatMap、reduceByKey、join等。转换操作是惰性的不会立即执行而是在遇到行动操作时才会触发实际的计算。 行动操作算子Actions行动操作是对 RDD 进行实际的计算并返回结果。常见的行动操作包括collect、count、reduce、saveAsTextFile、foreach等。行动操作会触发 Spark 作业的执行从而在集群中进行数据处理和计算。 RDD 的创建 1.从外部数据源读取 使用 SparkContext 或 SparkSession 中提供的方法从外部数据源如文本文件、JSON 文件、CSV 文件等读取数据创建 RDD。这种方式适用于从文件系统或数据库等外部数据源中读取数据。 package com.jsu.rdd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext;import java.util.List;public class rddTest {public static void main(String[] args) {// 1.创建 Spark 配置对象SparkConf conf new SparkConf().setAppName(rddTest).setMaster(local[*]);// 2.创建 Spark 内容对象JavaSparkContext sc new JavaSparkContext(conf);// 3.读取外部数据源JavaRDDString stringRDD sc.textFile(src/main/resources/data/name.txt,1);// 4.获取元素内容打印输出ListString collect stringRDD.collect();for (String s : collect) {System.out.println(s);}// 5.释放资源sc.stop();}}textFile 方法是 Spark 中用于从文本文件创建 RDD 的函数。它会将文本文件的每一行作为 RDD 中的一个元素拥有两个参数 path指定要读取的文本文件的路径。可以是本地文件系统的路径也可以是分布式文件系统如 HDFS的路径。 minPartitions可选参数指定最小的分区数。如果不指定Spark 会根据文件的大小自动确定分区数。 从源码中可以看到它拥有两个重载的方法传递参数不同需要注意的是读取的文本文件编码格式必须是 UTF-8。 注本系列均采用 Java 版 Spark 3.3.1与 Scala 版 Spark 中的 API 有些许差异但逻辑是一样的。 2.从已有的集合或数组创建 使用 SparkContext 的 parallelize 方法将已有的集合或数组转换为 RDD。这种方式适用于将内存中的数据集合转换为 RDD。 package com.jsu.rdd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext;import java.util.ArrayList; import java.util.Arrays; import java.util.List;public class rddTest2 {public static void main(String[] args) {// 1.创建 Spark 配置对象SparkConf conf new SparkConf().setAppName(rddTest2).setMaster(local[*]);// 2.创建 Spark 内容对象JavaSparkContext sc new JavaSparkContext(conf);// 3.创建集合ListInteger list Arrays.asList(1, 2, 3, 4, 5);// 4.通过集合创建 RDDJavaRDDInteger rdd sc.parallelize(list);// 5.获取元素内容打印输出System.out.println(rdd.take(5));// 6.释放资源sc.stop();}}parallelize 方法是 Spark 中用于从一个已有的集合创建 RDD 的方法。该方法将集合中的元素分发到集群中的各个计算节点上形成一个分布式的数据集方便后续进行分布式计算。参数 list要并行化的集合通常是一个 List 或者数组。 numSlices指定要将数据划分为多少个分区默认值为默认并行度即集群中可用的处理器数目。 3.从已有的 RDD 进行转换 通过对已有的 RDD 进行转换操作生成新的 RDD这种方式适用于对现有数据集进行进一步处理或分析。 package com.jsu.rdd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext;import java.util.Arrays; import java.util.List;public class rddTest3 {public static void main(String[] args) {// 1.创建 Spark 配置对象SparkConf conf new SparkConf().setAppName(rddTest3).setMaster(local[*]);// 2.创建 Spark 内容对象JavaSparkContext sc new JavaSparkContext(conf);// 3.创建集合ListInteger list Arrays.asList(1, 2, 3, 4, 5);// 4.创建RDDJavaRDDInteger rdd1 sc.parallelize(list);// 5.通过现有RDD创建新RDDJavaRDDInteger rdd2 rdd1.map(el - el 1);// 6.获取元素内容打印输出System.out.println(rdd2.take(5));// 7.释放资源sc.stop();}}map 算子是一个转换算子它不会立即执行计算而是在遇到行动算子时才会触发实际的计算。它对每个元素逐一遍历执行转换操作形成新的 RDD。 RDD 常用算子大全 转换算子 算子作用说明示例代码map对 RDD 中的每个元素应用一个函数返回一个新的 RDD。rdd.map(x - x * 2)flatMap类似于 map但每个输入元素可以被映射为 0 个或多个输出元素返回一个扁平化的 RDD。rdd.flatMap(x - Arrays.asList(x, x * 2).iterator())filter对 RDD 中的每个元素应用一个函数返回一个只包含函数返回值为 true 的元素的新 RDD。rdd.filter(x - x 2)mapPartitions类似于 map但在 RDD 的每个分区上运行一个函数返回一个新的 RDD。rdd.mapPartitions(iterator - iterator.map(x - x * 2))mapToPair将 RDD 中的每个元素映射为一个 (K, V) 键值对返回一个新的 Pair RDD。rdd.mapToPair(x - new Tuple2(x, x * 2))mapValues只对 (K, V) 键值对 RDD 的每个 V 值应用一个函数返回一个新的 (K, V) RDD。pairRdd.mapValues(x - x * 2)mapPartitionsWithIndex类似于 mapPartitions但函数同时接收分区索引。rdd.mapPartitionsWithIndex((index, iterator) - iterator.map(x - (index, x)))repartition随机地将数据重新分区可以增加或减少分区数。rdd.repartition(10)coalesce减少分区数且尽量避免数据的移动。适用于减少分区数时使用。rdd.coalesce(2)partitionBy通过一个分区器Partitioner重新分区适用于 (K, V) 键值对 RDD。常见的分区器有 HashPartitioner 和 RangePartitioner。pairRdd.partitionBy(new HashPartitioner(3))sample从 RDD 中以指定的随机采样方式抽取样本返回一个新的 RDD。rdd.sample(false, 0.1)union返回两个 RDD 的并集。rdd1.union(rdd2)intersection返回两个 RDD 的交集。rdd1.intersection(rdd2)distinct返回一个新的 RDD只包含唯一的元素去重。rdd.distinct()groupByKey对 (K, V) 键值对 RDD 进行分组返回一个 (K, Iterable) 形式的 RDD。pairRdd.groupByKey()reduceByKey对 (K, V) 键值对 RDD 中每个键使用指定的二元操作进行聚合返回一个新的 (K, V) RDD。pairRdd.reduceByKey((x, y) - x y)aggregateByKey类似于 reduceByKey但允许返回的值与输入的值类型不同。pairRdd.aggregateByKey(zeroValue, seqOp, combOp)sortByKey对 (K, V) 键值对 RDD 按键进行排序返回一个新的 RDD。pairRdd.sortByKey()join对两个 RDD 进行内连接返回一个 (K, (V, W)) 形式的 RDD。rdd1.join(rdd2)cogroup对两个 (K, V) 和 (K, W) 键值对 RDD 进行分组返回一个 (K, (Iterable, Iterable)) 形式的 RDD。rdd1.cogroup(rdd2)cartesian返回两个 RDD 的笛卡尔积。rdd1.cartesian(rdd2)pipe将 RDD 的每个分区的内容作为输入传递给外部程序并将输出作为一个新的 RDD 返回。rdd.pipe(script.sh)zipWithIndex为 RDD 中的每个元素分配一个唯一的索引值返回一个 (元素, 索引) 的键值对形式的新 RDD。rdd.zipWithIndex() 行动算子 算子作用说明示例代码collect将 RDD 中的所有元素作为一个数组返回到驱动程序中。rdd.collect()count返回 RDD 中的元素个数。rdd.count()take返回 RDD 的前 n 个元素。rdd.take(5)top返回 RDD 中的前 n 个元素按照默认的或自定义的顺序。rdd.top(5)reduce对 RDD 的元素使用指定的二元操作进行聚合返回一个单一的结果。rdd.reduce((x, y) - x y)fold与 reduce 类似但提供了一个初始值。rdd.fold(0, (x, y) - x y)aggregate与 fold 类似但允许返回的值与输入的值类型不同。rdd.aggregate(zeroValue, seqOp, combOp)foreach对 RDD 中的每个元素应用一个函数通常用于触发执行。rdd.foreach(x - System.out.println(x))countByKey对 (K, V) 键值对 RDD 中每个键进行计数返回一个 Map。pairRdd.countByKey()saveAsTextFile将 RDD 保存到指定目录中的文本文件。rdd.saveAsTextFile(output/path)saveAsSequenceFile将 RDD 保存为 Hadoop 序列文件。pairRdd.saveAsSequenceFile(output/path)saveAsObjectFile将 RDD 以 Java 对象序列化的形式保存到指定路径。rdd.saveAsObjectFile(output/path)takeSample返回 RDD 的一个随机采样子集。rdd.takeSample(false, 5) RDD 算子综合练习 假设现有一个包含服务器访问日志的文本文件 logs.txt每一行表示一个访问记录格式如下 timestamp ip_address url response_code response_time 2023-05-01 12:34:56 192.168.0.1 /index.html 200 123 2023-05-01 12:35:01 192.168.0.2 /about.html 404 56 2023-05-01 12:35:05 192.168.0.1 /index.html 200 78 2023-05-01 12:35:10 192.168.0.3 /contact.html 200 150 2023-05-01 12:35:15 192.168.0.4 /products.html 200 200 2023-05-01 12:35:20 192.168.0.5 /index.html 200 300 2023-05-01 12:35:25 192.168.0.6 /about.html 500 450 2023-05-01 12:35:30 192.168.0.2 /index.html 200 90 2023-05-01 12:35:35 192.168.0.3 /contact.html 404 30 2023-05-01 12:35:40 192.168.0.7 /products.html 200 100 2023-05-01 12:35:45 192.168.0.8 /index.html 200 60 2023-05-01 12:35:50 192.168.0.9 /about.html 404 50 2023-05-01 12:35:55 192.168.0.10 /contact.html 200 80 2023-05-01 12:36:00 192.168.0.1 /products.html 200 120 2023-05-01 12:36:05 192.168.0.2 /index.html 200 110 2023-05-01 12:36:10 192.168.0.3 /about.html 200 200 2023-05-01 12:36:15 192.168.0.4 /contact.html 404 70 2023-05-01 08:36:20 192.168.0.5 /products.html 500 250 2023-05-01 11:36:25 192.168.0.6 /index.html 200 90 2023-05-01 09:36:30 192.168.0.7 /about.html 200 60 2023-05-01 13:36:35 192.168.0.8 /contact.html 200 180 2023-05-01 11:36:40 192.168.0.9 /products.html 200 170 2023-05-01 10:36:45 192.168.0.10 /index.html 200 220 2023-05-01 12:36:50 192.168.0.1 /about.html 404 140 2023-05-01 12:36:55 192.168.0.2 /contact.html 200 130 2023-05-01 12:37:00 192.168.0.3 /products.html 200 190 2023-05-01 12:37:05 192.168.0.4 /index.html 200 260 2023-05-01 12:37:10 192.168.0.5 /about.html 404 160 2023-05-01 19:37:15 192.168.0.6 /contact.html 200 150 2023-05-01 12:37:20 192.168.0.7 /products.html 200 80 2023-05-01 18:37:25 192.168.0.8 /index.html 200 140 2023-05-01 12:37:30 192.168.0.9 /about.html 500 210 2023-05-02 12:37:35 192.168.0.10 /contact.html 200 170 2023-05-02 11:37:05 192.168.0.4 /index.html 200 260 2023-05-02 11:37:10 192.168.0.5 /about.html 404 160 2023-05-02 11:37:15 192.168.0.6 /contact.html 200 150 2023-05-02 11:37:20 192.168.0.7 /products.html 200 80 2023-05-02 11:37:25 192.168.0.8 /index.html 200 140 2023-05-02 13:37:30 192.168.0.9 /about.html 500 210 2023-05-02 14:37:35 192.168.0.10 /contact.html 200 170读取日志文件进行解析完成下列需求 1.统计不同 URL 的访问次数 计算每个 URL 的访问次数按访问次数降序排序输出前 10 个 URL 及其访问次数。 2.计算每个 IP 地址的平均响应时间 计算每个 IP 地址的平均响应时间输出前 10 个平均响应时间最长的 IP 地址及其平均响应时间。 3.计算每小时的访问量 计算每个小时的访问量输出每小时的访问量。 4.保存结果 将以上统计结果分别保存到文件路径中。 package com.jsu.rdd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;public class rddTask {public static void main(String[] args) {// 1.创建 spark 对象SparkConf conf new SparkConf().setAppName(rddTask).setMaster(local[*]);JavaSparkContext spark new JavaSparkContext(conf);// 2.读取日志文件内容过滤首行表头数据JavaPairRDDString, Long rdd spark.textFile(src/main/resources/data/logs.txt).zipWithIndex().filter(el - el._2 1);// 3.对数据进行分割JavaRDDString[] splitRdd rdd.map(Tuple2::_1).map(el - el.split( ));// 4.统计不同 URL 的访问次数JavaPairRDDString, Integer urlRdd splitRdd.mapToPair(el - new Tuple2(el[3], 1)).reduceByKey(Integer::sum);urlRdd.foreach(x - System.out.println(x));System.out.println();urlRdd.repartition(1).saveAsTextFile(src/main/resources/res1);// 5.计算每个 IP 地址的平均响应时间JavaPairRDDString, Integer ipTimeRDD splitRdd.mapToPair(el - new Tuple2(el[2], new Tuple2(Integer.parseInt(el[5]), 1))).reduceByKey((x, y) - new Tuple2(x._1 y._1, x._2 x._2)).mapValues(sumCount - sumCount._1 / sumCount._2); ipTimeRDD.foreach(x - System.out.println(x));urlRdd.repartition(1).saveAsTextFile(src/main/resources/res2);// 6.计算每小时的访问量JavaPairRDDString, Integer rdd4 splitRdd.mapToPair(el - new Tuple2(el[0] el[1].substring(0, 2), 1)).reduceByKey(Integer::sum);System.out.println();rdd4.sortByKey().foreach(x - System.out.println(x));rdd4.repartition(1).saveAsTextFile(src/main/resources/res3);// 7.释放资源spark.stop();}}代码释义 1.创建 Spark 对象 初始化 Spark 配置 SparkConf 并创建 JavaSparkContext 对象设置应用程序名称为 “rddTask”运行模式为本地 (local[*])使用所有可用 CPU 核心。 2.读取日志文件内容并过滤表头数据 使用 spark.textFile 读取日志文件内容并通过 zipWithIndex 给每一行数据加上索引。过滤掉索引为 0 的表头数据。 3.对数据进行分割 通过 map 方法对每行数据进行分割将其转换为字符串数组。 4.统计不同 URL 的访问次数 使用 mapToPair 将每行数据映射为 (URL, 1) 的键值对并使用 reduceByKey 对相同的 URL 进行计数。将统计结果输出到控制台并保存到文件中。 5.计算每个 IP 地址的平均响应时间 将每行数据映射为 (IP地址, (响应时间, 1)) 的键值对。使用 reduceByKey 聚合相同 IP 地址的响应时间和计数。计算每个 IP 地址的平均响应时间并输出到控制台和文件中。 6.计算每小时的访问量 将每行数据映射为 (日期 小时, 1) 的键值对。使用 reduceByKey 计算每小时的访问次数。对结果按时间排序并输出到控制台和文件中。 7.释放资源 调用 spark.stop() 释放 Spark 资源。 RDD 依赖关系 RDD 的依赖关系描述了一个 RDD 是如何从一个或多个 RDD 派生出来的。这些依赖关系有助于 Spark 在发生失败时能够恢复丢失的分区数据并且能够高效地执行集群计算任务。 在 RDD 中的依赖关系主要有两种类型窄依赖和宽依赖。 图片来源 —— Spark 的宽依赖和窄依赖 窄依赖 窄依赖表示父 RDD 的每一个分区只被子 RDD 中的一个分区依赖使用属于一对一或者多对一。窄依赖不会引发 Shuffle 操作效率高。 为什么也属于多对一呢 因为不同父 RDD 中的某一个分区可以提供给相同的子 RDD 中的一个分区使用。 这种依赖关系在发生故障时可以更快地恢复因为只需要重新计算少量的分区即可。 案例map 和 filter import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.util.Arrays; import java.util.List;public class NarrowDependencyExample {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(NarrowDependencyExample).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);ListInteger data Arrays.asList(1, 2, 3, 4, 5);JavaRDDInteger rdd sc.parallelize(data);// 窄依赖: map 操作JavaRDDInteger mappedRdd rdd.map(x - x * 2);// 窄依赖: filter 操作JavaRDDInteger filteredRdd mappedRdd.filter(x - x 5);filteredRdd.collect().forEach(System.out::println);sc.stop();} }在这个例子中map 和 filter 操作都属于窄依赖因为 mappedRdd 的每个分区仅依赖于 rdd 的一个分区而 filteredRdd 的每个分区仅依赖于 mappedRdd 的一个分区。 宽依赖 宽依赖表示父 RDD 中至少有一个分区对应子 RDD 的多个分区属于一对多。会涉及 Shuffle 阶段在执行计算任务时需要跨节点的数据交换效率低。 案例reduceByKey 和 join import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;import java.util.Arrays; import java.util.List;public class WideDependencyExample {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(WideDependencyExample).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);ListTuple2String, Integer data Arrays.asList(new Tuple2(a, 1),new Tuple2(b, 2),new Tuple2(a, 3),new Tuple2(b, 4));JavaPairRDDString, Integer pairRDD sc.parallelizePairs(data);// 宽依赖: reduceByKey 操作JavaPairRDDString, Integer reducedRdd pairRDD.reduceByKey(Integer::sum);reducedRdd.collect().forEach(System.out::println);sc.stop();} }在这个例子中reduceByKey 操作涉及 shuffle 阶段因为需要跨分区对相同的 key 进行合并。这意味着 reducedRdd 的每个分区可能依赖于 pairRDD 的多个分区。 宽窄依赖算子区分 只有转换算子会有宽依赖和窄依赖的划分行动算子没有这种划分因为行动算子的作用是触发计算并生成结果而不是对 RDD 进行转换和依赖构建。 我们在编写程序时要避免使用宽依赖算子在满足业务需求的情况下尽量使用窄依赖算子。 宽依赖算子会引起 Shuffle 操作的算子例如join、reduceByKey、groupByKey、sortByKey、partitionBy、distinct、intersection、repartition 等 窄依赖算子不会引发 Shuffle 操作的算子例如map、filter、flatMap、mapPartitions、mapValues、coalesce、mapPartitionsWithIndex、mapToPair、union等 宽依赖会增加 Spark 作业中的 stage阶段这是因为宽依赖的算子需要进行 Shuffle 操作而 Shuffle 操作涉及到数据在不同节点之间的重新分配和传输导致需要重新划分计算任务。 当一个 RDD 的计算依赖于前一个 RDD 的多个分区即宽依赖这就意味着需要将数据从前一个阶段的多个分区重新分配到下一个阶段的新分区中。因此每次遇到宽依赖算子时Spark 会将作业拆分为两个阶段 Stage 1执行宽依赖算子之前的所有窄依赖算子 Shuffle Operation执行宽依赖算子进行数据重分配Shuffle Stage 2执行宽依赖算子之后的所有算子。 RDD 血统信息 血统信息Lineage是指 RDD 之间的依赖关系图用于记录如何从初始输入数据通过一系列转换生成新的 RDD使 Spark 能够在任何一步出错时根据血统信息重新计算数据而不需要重新读取整个数据集。 血统信息的作用 故障恢复如果在计算过程中某个节点发生故障Spark 可以根据血统信息重新计算丢失的数据块。这样可以确保计算的正确性和完整性而不需要重新启动整个作业。 懒计算RDD 是惰性求值的只有在触发行动操作action时才会真正计算。血统信息帮助 Spark 记录转换过程只有在需要时才计算最终结果。 优化执行Spark 可以根据血统信息进行优化如合并多个窄依赖的转换步骤以减少数据传输和计算开销。 血统信息的组成 血统信息包含了 RDD 的所有依赖关系及其转换操作主要包含以下内容 初始 RDD从文件系统如 HDFS或其他数据源读取的数据。 转换操作如 map、filter、flatMap 等描述了如何从一个 RDD 生成另一个 RDD。 依赖关系表示一个 RDD 如何依赖于其他 RDD分为窄依赖和宽依赖。 代码案例 假设我们有以下代码片段 // 17 行 JavaRDDString lines sc.textFile(src/main/resources/data/logs.txt); // 18 行 JavaRDDString errors lines.filter(line - line.contains(ERROR));在这个例子中血统信息将包含以下内容 初始 RDD从文件中读取的 lines RDD。 转换操作errors RDD 由 lines RDD 通过 filter 转换生成条件是包含 ERROR。 依赖关系errors 依赖于 lines属于窄依赖。 我们在代码中可以通过 rdd.toDebugString() 方法获取指定 RDD 的血统信息如下所示 System.out.println(errors.toDebugString());输出结果 (2) MapPartitionsRDD[2] at filter at rddTestLineage.java:18 []| src/main/resources/data/logs.txt MapPartitionsRDD[1] at textFile at rddTestLineage.java:17 []| src/main/resources/data/logs.txt HadoopRDD[0] at textFile at rddTestLineage.java:17 []从后往前看 HadoopRDD[0] 表示这是从 HDFS 或本地文件系统中读取的初始 RDD称为 HadoopRDD。 MapPartitionsRDD[1] 表示这是 textFile 方法创建的 MapPartitionsRDD它表示对读取的数据进行了初步分区。 MapPartitionsRDD[2] 表示这是 errors RDD通过对 lines RDD 进行 filter 操作生成的 MapPartitionsRDD。 血统信息的弊端 虽然血统信息在 Spark 中用于跟踪 RDD 的生成和转换过程能够应对故障避免重复计算是 RDD 容错机制的基础但是过长的血统信息会带来一些弊端和问题。 1.计算开销增加 如果 RDD 血统链过长每次行动操作如 collect、count 等都需要从最初的 RDD 开始重头计算所有依赖链条中的转换操作导致计算开销和延迟显著增加。 2.容错开销增加 如果某个分区丢失Spark 会根据血统信息重新计算该分区的数据。过长的血统链会导致重新计算的步骤繁多增加恢复数据的时间和资源消耗。 3.内存和存储开销增加 维护长链条的血统信息需要占用更多的内存和存储资源尤其是对于大量中间结果和复杂计算的应用。 4.调试困难 血统链条过长会使调试过程变得复杂难以追踪数据的流动和转换尤其是在复杂的计算流程中。 那么如何解决血统信息所带来的弊端呢 这就要靠下面介绍的持久化与缓存以及检查点来进行处理了接着奏乐接着舞Lets go~ RDD 持久化与缓存 在 Spark 中RDD 是不可变且惰性求值的。默认情况下RDD 的每次计算都是从头开始的。如果一个 RDD 被多次使用为了避免重复计算可以将 RDD 进行持久化或缓存。 持久化是将 RDD 存储在内存中或者磁盘上以便后续重用时可以直接访问存储的数据而不需要重新计算。 持久化级别 MEMORY_ONLY将 RDD 以序列化的形式存储在 JVM 堆内存中。 MEMORY_AND_DISK如果内存不足则将 RDD 以序列化的形式部分存储在内存中部分存储在磁盘上。 MEMORY_ONLY_SER将 RDD 以序列化的形式存储在内存中节省空间但序列化和反序列化的开销较大。 MEMORY_AND_DISK_SER将 RDD 以序列化的形式部分存储在内存中部分存储在磁盘上节省空间但序列化和反序列化的开销较大。 DISK_ONLY将 RDD 只存储在磁盘上。 OFF_HEAP将 RDD 存储在堆外内存中适用于管理大数据集时减少 JVM 垃圾回收的影响。 在 Spark 中通过 persist 方法调用例如rdd.persist(StorageLevels.MEMORY_ONLY)。 缓存 缓存是持久化的一种简化方式通过调用 rdd.cache() 方法实现默认情况下等同持久化级别中的 MEMORY_ONLY将 RDD 以序列化的形式存储在 JVM 堆内存中。 代码案例 下面是一个展示 RDD 持久化与缓存的具体案例 package com.example.spark;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.StorageLevels; import scala.Tuple2;import java.util.Arrays;public class RDDCacheExample {public static void main(String[] args) {// 创建 Spark 配置和上下文对象SparkConf conf new SparkConf().setAppName(RDDCacheExample).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);// 创建一个 RDDJavaRDDString lines sc.parallelize(Arrays.asList(2023-05-01 12:34:56 192.168.0.1 /index.html 200 123,2023-05-01 12:35:01 192.168.0.2 /about.html 404 56,2023-05-01 12:35:05 192.168.0.1 /index.html 200 78));// 分割日志数据JavaRDDString[] splitLines lines.map(line - line.split( ));// 缓存 RDDsplitLines.cache();//splitLines.persist(StorageLevels.MEMORY_ONLY);// 计算不同 URL 的访问次数JavaPairRDDString, Integer urlCounts splitLines.mapToPair(fields - new Tuple2(fields[3], 1)).reduceByKey(Integer::sum);urlCounts.foreach(x - System.out.println(URL: x._1 , Count: x._2));// 释放资源sc.stop();} }在需要多次使用同一个 RDD 时建议对 RDD 进行缓存或持久化以提高计算效率。 其它场景可以根据数据规模和资源限制选择合适的存储级别例如内存不足时可以选择 MEMORY_AND_DISK。 RDD 容错机制 Spark 中的 RDD 具有内置的容错机制通过血统Lineage信息追踪 RDD 的生成方式可以在节点失败时重算丢失的分区。但在复杂的计算过程中重算代价可能很高。为了优化这个问题Spark 提供了 Checkpoint 检查点机制。 Checkpoint Checkpoint 是将 RDD 的数据保存到可靠存储系统如 HDFS上。这会切断 RDD 的血统信息从而避免复杂计算步骤的重复执行。 在以下情况使用 Checkpoint 非常有用 血统图Lineage非常长且复杂。 需要容忍频繁的节点故障。 需要保存中间结果避免重复计算。 使用 Checkpoint 也非常简单仅需两步 设置 Checkpoint 目录 调用 checkpoint() 方法对 RDD 进行 Checkpoint 操作。 缓存与检查点的区别 特性缓存Cache检查点Checkpoint存储位置内存默认或磁盘可靠存储系统如 HDFS血统信息保留血统信息切断血统信息使用场景需要多次访问同一个 RDD重复计算代价高血统图长且复杂需要高容错性防止数据丢失恢复方式通过血统信息重新计算通过检查点存储恢复性能影响内存利用率高适合快速重用性能开销较大但提高容错性适合长时间运行和复杂计算 在实际应用中可以根据具体需求选择合适的机制例如在长血统图和复杂计算中使用检查点而在需要快速重用数据时使用缓存。 当然我们也可以同时使用 Cache 和 Checkpoint。在进行 Checkpoint 操作之前提前对 RDD 进行缓存避免在 Checkpoint 操作期间重复计算 RDD可以有效提升 Spark 应用的性能和容错能力。 代码案例 package com.example.spark;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;import java.util.Arrays;public class RDDCheckpointExample {public static void main(String[] args) {// 创建 Spark 配置和上下文对象SparkConf conf new SparkConf().setAppName(RDDCheckpointExample).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);// 设置 Checkpoint 目录sc.setCheckpointDir(src/main/resources/checkpoint);// 创建一个 RDDJavaRDDString lines sc.parallelize(Arrays.asList(2023-05-01 12:34:56 192.168.0.1 /index.html 200 123,2023-05-01 12:35:01 192.168.0.2 /about.html 404 56,2023-05-01 12:35:05 192.168.0.1 /index.html 200 78));// 分割日志数据JavaRDDString[] splitLines lines.map(line - line.split( ));// 缓存 RDDsplitLines.cache();// 进行 Checkpoint 操作splitLines.checkpoint();// 统计不同 URL 的访问次数JavaPairRDDString, Integer urlCounts splitLines.mapToPair(fields - new Tuple2(fields[3], 1)).reduceByKey(Integer::sum);urlCounts.foreach(x - System.out.println(URL: x._1 , Count: x._2));// 释放资源sc.stop();} }DAG DAG 是有向无环图Directed Acyclic Graph的简称。在计算中DAG 是一种图形结构包含一组顶点和有向边其中没有任何一个顶点可以通过一系列边回到自身。 DAG 通常用于表示依赖关系例如任务调度、表达式求值、工作流和数据处理流水线。 Spark 中的 DAG 在 Spark 中DAG 是用来表示一系列操作如转换和行动之间的依赖关系的基础结构。 当我们对 RDD 进行一系列转换如 map、filter 等时这些转换操作会形成一个逻辑上的 DAG代表数据从输入到输出的流动过程。行动操作如 count、collect 等会触发 Spark 实际执行这些操作。 DAG 划分 逻辑 DAG在用户编写 Spark 程序时转换操作会形成一个逻辑上的 DAG。这是用户代码中操作的有序集合还没有实际执行。 物理 DAG当行动操作触发时逻辑 DAG 会被转换成物理执行计划其中包含具体的执行步骤这是由 Spark 调度器生成的。 Stage 在 Spark 中DAG 被分解成多个 Stage每个 Stage 由一系列可以并行执行的任务组成。一个 Stage 通常对应于 RDD 依赖关系中的一个宽依赖如 reduceByKey、join 等而窄依赖如 map、filter 等通常可以在同一个 Stage 内完成。 Stage 的划分 窄依赖Narrow Dependency一个 RDD 的每个分区仅依赖于前一个 RDD 的一个分区。例如map 和 filter 操作。窄依赖的转换操作通常在同一个 Stage 内完成。 宽依赖Wide Dependency一个 RDD 的分区依赖于多个上一个 RDD 的分区。例如reduceByKey 和 groupByKey 操作。宽依赖的转换操作会导致一个新的 Stage 的开始。 执行流程 当一个行动操作被调用时Spark 会根据逻辑 DAG 生成物理执行计划并划分阶段。 执行过程如下 生成逻辑 DAG根据用户的转换操作生成逻辑 DAG。 划分阶段根据依赖关系窄依赖和宽依赖划分成多个 Stage。 任务调度每个 Stage 被分解成多个任务这些任务在集群的不同节点上并行执行。 执行任务被提交给集群中的节点执行节点会将数据加载到内存中进行计算。 结果返回行动操作的结果会返回给驱动程序对于 collect 等操作或者保存到存储系统中对于 saveAsTextFile 等操作。 代码案例 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2;public class SparkDAGExample {public static void main(String[] args) {SparkConf conf new SparkConf().setAppName(Spark DAG Example).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);JavaRDDString lines sc.textFile(src/main/resources/data/logs.txt);JavaRDDString errors lines.filter(line - line.contains(ERROR));JavaRDDString warnings lines.filter(line - line.contains(WARN));JavaRDDString allIssues errors.union(warnings);JavaRDDString formattedIssues allIssues.map(issue - Issue: issue);formattedIssues.saveAsTextFile(output/issues.txt);sc.stop();} }在这个程序中 textFile 读取数据形成第一个 RDD linesHadoopRDD。 filter 操作生成两个新的 RDD errors、warningsMapPartitionsRDD。 union 操作生成一个新的 RDD allIssuesUnionRDD这是一个宽依赖会触发一个新的 Stage。 map 操作生成一个新的 RDD formattedIssuesMapPartitionsRDD这是一个窄依赖。 saveAsTextFile 触发行动操作。 Stage 阶段划分 Stage 0包括 textFile、filter、union、map、saveAsTextFile 操作这些是窄依赖。 在 IDEA 中运行上面这段代码时如果没有关闭日志我们可以清楚的看到程序日志的输出过程从中可以看出这段代码的确只有一个阶段因为我们没有使用任何宽依赖算子。 累加器 累加器Accumulator是 Spark 提供的一种变量用于在并行计算中进行累加操作。它可以在所有节点之间进行数值累加操作并将结果返回给驱动程序Driver。 累加器最主要的作用是对分布式数据进行聚合。由于 Spark 中的数据处理是分布在多个节点上的累加器提供了一种方式可以跨任务累加数值从而在全局范围内进行数据聚合。 累加器的特点 累加器只能在 Spark 的转换算子如 map、filter 等中进行累加操作不能在行动算子如 collect、count 等中读取值。 累加器的值只能在驱动程序端读取并且是线程安全的。 虽然累加器可以用于其他类型的数据但最常用的还是数值累加。 累加器应用场景 在处理大规模数据时我们经常需要收集一些统计信息比如 处理了多少条记录。 有多少记录符合某些条件。 出现了多少次错误或警告。 累加器可以用来方便地收集这些统计信息并在驱动程序中进行汇总和输出。 累加器的类型 Spark 提供了几种常用的累加器类型 数值累加器LongAccumulator、DoubleAccumulator用于累加数值类型的数据。 集合累加器CollectionAccumulator用于累加集合类型的数据。 自定义累加器用户可以定义自己的累加器类型实现特定的数据累加逻辑。 累加器的创建与使用 在 Spark 中使用累加器共分为三步 创建累加器通过 SparkContext 上下文对象调用驱动程序中的累加器方法创建累加器返回一个累加器对象。 使用累加器在转换算子中对累加器进行累加操作。 获取累加值通过累加器对象调用 value() 方法获取累加器最终的值。 代码案例 以下是一个使用数值累加器统计日志文件中错误行数的示例 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.util.LongAccumulator;public class SparkAccumulatorExample {public static void main(String[] args) {// 1. 创建 Spark 配置和上下文SparkConf conf new SparkConf().setAppName(Spark Accumulator Example).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);// 2. 创建数值累加器LongAccumulator errorAccumulator sc.sc().longAccumulator(Error Lines);// 3. 读取日志文件JavaRDDString lines sc.textFile(src/main/resources/data/logs.txt);// 4. 在转换算子中使用累加器JavaRDDString errors lines.filter(line - {if (line.contains(ERROR)) {errorAccumulator.add(1);return true;} else {return false;}});// 5. 触发行动errors.collect();// 6. 打印累加器的值System.out.println(Number of error lines: errorAccumulator.value());// 7. 关闭 Spark 上下文sc.stop();} }这里主要说明一下创建与使用累加器的核心代码 创建数值累加器 LongAccumulator errorAccumulator sc.sc().longAccumulator(Error Lines);其中 sc.sc() 表示获取到 Spark 的驱动程序然后调用 longAccumulator 方法为其创建数值累加器其中传入的字符串 Error Lines 表示这个累加器的名字最终返回一个累加器对象 errorAccumulator。 使用累加器 JavaRDDString errors lines.filter(line - {if (line.contains(ERROR)) {errorAccumulator.add(1);return true;} else {return false;}});使用 filter 转换操作过滤包含 ERROR 的行并在每次遇到 ERROR 时累加器加 1。 触发行动 errors.collect();使用 collect 操作触发转换算子的执行。此时累加器会对所有分区的结果进行累加。 获取累加器的值 System.out.println(Number of error lines: errorAccumulator.value());通过创建累加器返回的对象 errorAccumulator 直接调用 value() 方法即可获得累加器最终的值默认值为 0。 广播变量 广播变量Broadcast Variable是 Spark 中的一种机制用于将一个只读变量缓存到每一个节点上使得任务可以高效地访问该变量而无需在每个任务中传输该变量的副本。 这对于需要在所有节点上共享大数据集例如查找表、配置数据特别有用因为它避免了重复传输数据从而节省了网络开销提高了性能。 广播变量的作用 减少数据传输通过将变量广播到各个节点只需一次传输而不是每个任务传输一次减少了网络开销。 提高效率任务可以直接从节点的内存中读取广播变量而不需要从驱动程序获取数据提高了访问速度。 确保一致性所有任务访问的都是相同的广播变量的副本保证了一致性。 广播变量的创建与使用 在 Spark 中使用广播变量同样分为三步 通过 SparkContext 上下文对象调用 broadcast 方法将要进行广播的变量传入其中生成一个 Broadcast 对象。 在算子中使用广播变量。在执行 RDD 的转换操作如 map、filter 等时可以使用广播变量通过 Broadcast 对象的 value 方法来访问广播的数据。 通过行动算子如 collect、count 等触发计算完成计算任务。 注意 广播到各个节点的数据应尽量保持不变因为广播变量是只读的。 代码案例 假设我们有一个日志文件每行记录了用户的活动我们需要根据用户 ID 在另一个数据集中查找用户的详细信息并进行处理。 import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast;import java.util.HashMap; import java.util.Map;public class SparkBroadcastExample {public static void main(String[] args) {// 创建 Spark 配置和上下文SparkConf conf new SparkConf().setAppName(Spark Broadcast Example).setMaster(local[*]);JavaSparkContext sc new JavaSparkContext(conf);// 模拟用户详细信息数据集MapString, String userDetails new HashMap();userDetails.put(1, Alice);userDetails.put(2, Bob);userDetails.put(3, Cathy);// 创建广播变量BroadcastMapString, String broadcastUserDetails sc.broadcast(userDetails);// 读取日志文件JavaRDDString lines sc.textFile(src/main/resources/data/logs.txt);// 根据用户 ID 查找用户详细信息JavaRDDString userActivities lines.map(line - {String[] parts line.split( );String userId parts[0];String activity parts[1];String userName broadcastUserDetails.value().get(userId);return userName did activity;});// 打印结果userActivities.foreach(System.out::println);// 关闭 Spark 上下文sc.stop();} }代码释义 1.模拟用户详细信息数据集 MapString, String userDetails new HashMap();userDetails.put(1, Alice);userDetails.put(2, Bob);userDetails.put(3, Cathy);模拟一个用户详细信息的数据集以 Map 的形式存储键为用户 ID值为用户名。 2.创建广播变量 BroadcastMapString, String broadcastUserDetails sc.broadcast(userDetails);使用 sc.broadcast 方法将用户详细信息数据集广播到每个节点。 3.读取日志文件 JavaRDDString lines sc.textFile(src/main/resources/data/logs.txt);使用 textFile 方法读取日志文件返回一个包含每行日志的 RDD。 4.根据用户 ID 查找用户详细信息 JavaRDDString userActivities lines.map(line - {String[] parts line.split( );String userId parts[0];String activity parts[1];String userName broadcastUserDetails.value().get(userId);return userName did activity;});使用 map 转换操作处理每行日志根据用户 ID 在广播变量中查找用户名并构建新的字符串表示用户活动。 5.打印结果 userActivities.foreach(System.out::println);使用 foreach 操作打印每个用户的活动信息。
http://www.hkea.cn/news/14341135/

相关文章:

  • 网站建设是属于虚拟产品吗网站风格
  • 校园网站网站第三方统计工具
  • 个人网站名称大全wordpress流媒体
  • 东莞找公司网站做淘宝店铺标志的网站
  • 商务网站建设策划思路如何建设好网站
  • 网站开发软件排名河南网站建站系统平台
  • 电子商城网站建设流程家电电商平台排名
  • 企业网站系统源码做什麽网站有前景
  • 网站安全监测预警平台建设成效东莞哪里能学建设网站
  • 公司网站开发题目来源常州网站建设制作
  • 本机电脑怎么做网站免费网站申请域名com
  • 专业营销型网站建设费用郴州58网站
  • 北京天海网站建设公司平面设计师用的网站
  • 企业微信小程序登录入口关键词优化是什么意思?
  • 申请付费网站违法网站做网站的人会受罚嘛
  • 云南省建设工程造价管理协会网站怎样把网站做的更好
  • 海外seo网站建设手机网站和微信网站的区别
  • 承德市建设局网站温江做网站的公司
  • 加强网站建设的意见海南建设局相关网站
  • 企业网站优化哪家好顺德搜索seo网络推广
  • 上传网站安装教程视频教程优秀旅游网站设计
  • 住房和城乡建设部科技发展促进中心网站建筑企业
  • 网站申请服务器空间宜宾做网站公司
  • 网站开发的著作权归谁做一个网站的费用构成
  • 福州市网站建设站长网站素材
  • 网站开发难点分析设计公司logo需要多少钱
  • pc端网站开发建筑工程机械人才网
  • 广州万网建网站做外链的博客网站
  • 哈尔滨cms模板建站便宜网站建设模板网站
  • 公司制作网站需要百姓网二手拖拉机