当前位置: 首页 > news >正文

工商局网站建设查不到重庆网站建设cqsday

工商局网站建设查不到,重庆网站建设cqsday,网站浮动qq,想做个网站不知道怎么做下面是基于官方优化建议#xff0c;加上自己的一些理解整理。官方地址#xff1a;https://spark.apache.org/docs/2.4.8/tuning.html 任务并行度 Spark会根据每个文件的大小自动设置运行“map”任务的数量#xff0c;而对于分布式的“reduce”操作#xff0c;例如groupBy…下面是基于官方优化建议加上自己的一些理解整理。官方地址https://spark.apache.org/docs/2.4.8/tuning.html 任务并行度 Spark会根据每个文件的大小自动设置运行“map”任务的数量而对于分布式的“reduce”操作例如groupByKey和reduceByKey,它使用最大的父RDD分区数我们也可以为这些算子提供其分区数的参数值或者设置spark.default.parallelism参数推荐是CPU的2-3倍任务数。增加Reduce任务的并行度(sortByKey, groupByKey, reduceByKey, join, etc)减少每一个任务处理的数据集的规模Spark可以200ms内完成一个任务的计算因为spark支持重用executor的JVM并且每个任务的消耗也很低所以可以放心增加任务数。 默认的并行度spark.default.parallelism 在SQL中动态修改分区数SET spark.sql.shuffle.partitions 2; Map端过滤 利用广播变量把Driver的大对象广播到每一个executor端构造一个静态表实现map端join。使用Map端预处理的算子比如RDD的reduceByKey、aggregateByKey、foldByKey、combineByKey都是用了map side combine。而groupByKey却不能使用map side combine因为即使groupByKey实现了map side combine也不会减少shuffle的数据量最终还是需要将所有的map side数据插入到哈希表中从而导致老年代中有更多的对象甚至OutOfMemoryError。 当需要把结果收集到driver端时先filter多余的行-再去除不需要的列-如果有必要再distinct-再collect 缓存和缓存级别 whether to use memory, or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether to replicate the RDD partitions on multiple nodes. class StorageLevel private(private var _useDisk: Boolean,private var _useMemory: Boolean,private var _useOffHeap: Boolean,private var _deserialized: Boolean,private var _replication: Int 1) extends Externalizableobject StorageLevel {val NONE new StorageLevel(false, false, false, false)val DISK_ONLY new StorageLevel(true, false, false, false)val DISK_ONLY_2 new StorageLevel(true, false, false, false, 2)val MEMORY_ONLY new StorageLevel(false, true, false, true)val MEMORY_ONLY_2 new StorageLevel(false, true, false, true, 2)val MEMORY_ONLY_SER new StorageLevel(false, true, false, false)val MEMORY_ONLY_SER_2 new StorageLevel(false, true, false, false, 2)val MEMORY_AND_DISK new StorageLevel(true, true, false, true)val MEMORY_AND_DISK_2 new StorageLevel(true, true, false, true, 2)val MEMORY_AND_DISK_SER new StorageLevel(true, true, false, false)val MEMORY_AND_DISK_SER_2 new StorageLevel(true, true, false, false, 2)val OFF_HEAP new StorageLevel(true, true, true, false, 1)普通的cache等同于persist(Storage.MEMORY_ONLY)在内存不足时数据会被驱逐下次使用时需要重算所以建议使用persist(Storage.MEMORY_AND_DISK_SER)。 缓存数据压缩 SparkSQL使用spark.catalog.cacheTable(“tableName”)或者dataFrame.cache()可以使用内存中的列存储格式。SparkSQL只扫描请求的列并自动调节最小压缩和GC压力之间的平衡。 spark.sql.inMemoryColumnarStorage.compressed默认trueWhen set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data.spark.sql.inMemoryColumnarStorage.batchSize默认10000Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. 使用checkpoint截断logical plan checkpoint两个应用 对RDD做checkpoint切断做checkpoint RDD的依赖关系在计划特别大的时候非常有用将RDD数据保存到可靠存储如HDFS以便数据恢复是应用在spark streaming中使用checkpoint用来保存DStreamGraph以及相关配置信息以便在Driver崩溃重启的时候能够接着之前进度继续进行处理如之前waiting batch的job会在重启后继续处理。 在RDD上做checkpoint和在DF或者DS上做checkpoint有些区别后两者会返回一个新的数据集。默认checkpoint是lazy的但我们默认在DF或者DS上使用的是checkpoint(eager true, reliableCheckpoint true)会立即执行(其内部是通过调用ds的rdd的count方法实现)。checkpoint的过程首先是拷贝现有的RDD对新的RDD进行checkpoint也就是存储到本地然后生成一个新的DS返回这样就切断了依赖。所以后续算子都会基于新的RDD计算那么还有必要先对原有RDD缓存吗另外注意checkpoint到本地的rdd文件只能用于spark恢复不能直接被后续的算子利用。 A RDD can be recovered from a checkpoint files using SparkContext.checkpointFile. You can use SparkSession.internalCreateDataFrame method to (re)create the DataFrame from the RDD of internal binary rows. 数据本地化级别数据和代码的位置关系 数据存储和计算分离可以方便系统的横向扩展但当计算数据的时候往往需要把数据网路传输到计算节点带来网络耗时。所以Spark更喜欢存算不分离的方式。这就是数据本地化有以下几个级别 PROCESS_LOCAL(相同JVM)NODE_LOCAL(同节点不同JVM)NO_PREF(没有偏好)RACK_LOCAL同机架ANY 网络上并且不同机架 通常为了高效需要将序列化代码从一个地方传送到另一个地方比将数据块传送到另一个地方因为代码的大小比数据小得多这都是spark的任务调度来完成的。当数据和代码在不同的节点时Spark通常所做的是等待一段时间希望数据所处的executor有任务结束腾出资源从而调度新的任务。一旦超时到期它就开始将数据从远处移动到空闲CPU比如在同节点的不同executor间移动数据。每个级别之间的回退等待超时可以单独配置。 堆内内存优化 在spark1.6版本之前就是用的静态内存模型。静态模型就是把一个Executor分成三个部分一部分是Storage内存区域一部分是Execution区域还有一部分是其他区域。在spark的configuration中默认的有以下参数控制。 旧版spark.storage.memoryFraction: 默认0.6用于缓存和广播变量。 旧版spark.shuffle.memoryFraction: 默认0.2用于Execution。 在spark2.0版本之后spark新增加一种模型就是统一动态模型。Spark内存结构分为默认是(JVM的堆空间 - 300MB)*60%作为Spark的内存40%用于存储Spark的用户数据结构和Spark的内部元数据。该比例由spark.memory.fraction参数控制。 Spark内存又默认五五分为execution内存和storage内存execution内存用于Shuffle\joins\sorts\aggregations算子使用而storage内存用于在集群间缓存和传播内部数据storage内存是不会被占用的通过spark.memory.storageFraction参数控制。 这种设计确保了几个理想的性能。 首先不使用缓存的应用程序可以使用整个执行空间从而避免不必要的磁盘溢出。 其次使用缓存的应用程序可以保留最小的存储空间®使其数据块不会被移除。 需要注意因为动态占用机制Spark UI上的storage memory是executionstorage的内存另外还包含了堆外内存即其值等于 (spark.executor.memory - 300M) * spark.memory.fraction 堆外内存 堆外内存优化 为了进一步优化内存的使用以及提高 Shuffle 时排序的效率Spark 1.6 引入了堆外Off-heap内存即JVM之外使之可以直接在工作节点的系统内存中开辟空间存储经过序列化的二进制数据。 这种模式不在 JVM 内申请内存而是调用 Java 的 unsafe 相关 API 进行诸如 C 语言里面的 malloc() 直接向操作系统申请内存由于这种方式不经过 JVM 内存管理所以可以避免频繁的 GC这种内存申请的缺点是必须自己编写内存申请和释放的逻辑。如果堆外内存被启动堆外内存也会存在execution和storage内容和On-heap中的execution和storage内存不同的是前者不会被JVM的GC回收。 spark.driver.memoryOverheaddriverMemory * 0.10, with minimum of 384The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). This option is currently supported on YARN and Kubernetes. 相当于spark.memory.offHeap.enabledspark.memory.offHeap.size只是该参数用于告知YARN和K8S来分配内存和spark.memory.offHeap.size同时使用时需要比其大。spark.executor.memoryOverheadexecutorMemory * 0.10, with minimum of 384The amount of off-heap memory to be allocated per executor, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). This option is currently supported on YARN and Kubernetes.spark.memory.offHeap.enabledfalseIf true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then spark.memory.offHeap.size must be positive.spark.memory.offHeap.size0The absolute amount of memory in bytes which can be used for off-heap allocation. This setting has no impact on heap memory usage, so if your executors’ total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly. This must be set to a positive value when spark.memory.offHeap.enabledtrue. Join Strategy Hints BROADCAST, MERGE SHUFFLE_HASH and SHUFFLE_REPLICATE_NL, 当使用BROADCAST hint在表t1上时即使t1表的大小超过了spark.sql.autoBroadcastJoinThresholdSpark也会也会优先考虑把t1作为build side。至于是broadcast hash join 或者broadcast nested loop join要取决于是否有等值连接条件。 当不同的join策略hint用于join两边时Spark使用hint的优先级是BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL。当两端都指定BROADCAST hint或者SHUFFLE_HASH hint时Spark将根据join的类型和表的大小来选择build side。注意Spark不保证一定会使用指定的hint因为某些join策略hint可能不支持所有join类型。 spark.table(src).join(spark.table(records).hint(broadcast), key).show()Join Hints也可以用于SparkSQL BROADCAST的别名为BROADCASTJOIN 、MAPJOIN.MERGESuggests that Spark use shuffle sort merge join. The aliases for MERGE are SHUFFLE_MERGE and MERGEJOIN.SHUFFLE_HASHSuggests that Spark use shuffle hash join. If both sides have the shuffle hash hints, Spark chooses the smaller side (based on stats) as the build side.Hash Join的第一步就是根据两表之中较小的那一个构建哈希表这个小表就叫做build table大表则称为probe table因为需要拿小表形成的哈希表来探测它 SHUFFLE_REPLICATE_NLSuggests that Spark use shuffle-and-replicate nested loop join.Broadcast Join大表和小表被广播的表需要小于 spark.sql.autoBroadcastJoinThreshold 所配置的值默认是10M 或者加了broadcast join的hint基表不能被广播比如 left outer join 时只能广播右表。因为被广播的表首先被collect到driver段然后被冗余分发到每个executor上所以当表比较大时采用broadcast join会对driver端和executor端造成较大的压力。Sort Merge Join大表对大表将两张表按照join keys进行了重新shuffle保证join keys值相同的记录会被分在相应的分区。分区后对每个分区内的数据进行排序排序后再对相应的分区内的记录进行连接。因为两个序列都是有序的从头遍历碰到key相同的就输出如果不同左边小就继续取左边反之取右边(即用即取即丢) Partitioning Hints COALESCE, REPARTITION, and REPARTITION_BY_RANGE SELECT /* COALESCE(3) */ * FROM t; SELECT /* REPARTITION(3) */ * FROM t; SELECT /* REPARTITION(c) */ * FROM t; SELECT /* REPARTITION(3, c) */ * FROM t; SELECT /* REPARTITION_BY_RANGE(c) */ * FROM t; SELECT /* REPARTITION_BY_RANGE(3, c) */ * FROM t; EXPLAIN EXTENDED SELECT /* REPARTITION(100), COALESCE(500), REPARTITION_BY_RANGE(3, c) */ * FROM t;GC优化 堆和栈都是Java用来在RAM中存放数据的地方。 堆 Java的堆是一个运行时数据区类的对象从堆中分配空间。这些对象通过new等指令建立通过垃圾回收器来销毁。堆的优势是可以动态地分配内存空间需要多少内存空间不必事先告诉编译器因为它是在运行时动态分配的。但缺点是由于需要在运行时动态分配内存所以存取速度较慢。 栈栈中主要存放一些基本数据类型的变量byteshortintlongfloatdoublebooleanchar和对象的引用。栈的优势是存取速度比堆快栈数据可以共享重用。但是缺点时栈空间中的数据大小和生存期必须是确定的缺乏灵活性。栈主要存放一些基本类型的变量int, short, long, byte, float, double, boolean, char和对象句柄。对于局部变量如果是基本类型会把值直接存储在栈如果是引用类型比如String s new String(“william”);会把其对象存储在堆而把这个对象的引用指针存储在栈。 堆和栈的区别最主要的区别就是堆内存用来存储Java中的对象无论是成员变量局部变量还是类变量它们指向的对象都存储在堆内存中。栈内存用来存储局部变量和方法调用。栈内存归属于单个线程每个线程都会有一个栈内存其存储的变量只能在其所属线程中可见即栈内存可以理解成线程的私有内存。而堆内存中的对象对所有线程可见和访问。如果是堆内存没有可用的空间存储生成的对象JVM会抛出java.lang.OutOfMemoryError。如果栈内存没有可用的空间存储方法调用和局部变量JVM会抛出java.lang.StackOverFlowError。堆内存远大于栈的内存如果你使用递归的话那么你的栈很快就会充满很可能发生StackOverFlowError问题。我们通过-Xms选项可以设置堆的初始时的大小-Xmx选项可以设置堆的最大值。通过-Xss选项设置栈内存的大小。 JVM的GC日志的主要参数包括如下几个 -XX:PrintGC 输出GC日志-XX:PrintGCDetails 输出GC的详细日志-XX:PrintGCTimeStamps 输出GC的时间戳以基准时间的形式-XX:PrintGCDateStamps 输出GC的时间戳以日期的形式如 2013-05-04T21:53:59.2340800-XX:PrintHeapAtGC 在进行GC的前后打印出堆的信息-Xloggc:…/logs/gc.log 日志文件的输出路径 在较高的层次上管理全GC发生的频率可以帮助减少开销可以通过在作业的配置中设置spark.executor.extraJavaOptions来指定执行器的GC调优标志。通过在Java选项中添加-verbose:gc -XX:PrintGCDetails -XX:PrintGCTimeStamps来实现Spark会在executor端控制台日志中打印的消息。当您的程序存储的rdd有很大的“变动”时JVM垃圾收集可能会成为一个问题。 (对于只读取一次RDD然后在其上运行多次操作的程序来说这通常不是问题。) 当Java需要清除旧对象来为新对象腾出空间时它将需要跟踪所有Java对象并找到未使用的对象。 这里需要记住的要点是垃圾收集的成本与Java对象的数量成比例因此使用对象较少的数据结构(例如使用int数组而不是LinkedList)会大大降低此成本。 一个更好的方法是以序列化的形式持久化对象如上所述:现在每个RDD分区只有一个对象(一个字节数组)。 在尝试其他技术之前如果GC存在问题首先要尝试的是使用序列化缓存。 JAVA堆内部划分为年轻代和老年代年轻代存储短生命周期的对象而老年代存储长生命周期的对象。年轻一代被进一步划分为三个区域: Eden, Survivor1, Survivor2。GC的过程当Eden满时会在Eden上运行一个minor GC并将来自Eden和Survivor1的活对象复制到Survivor2。交换Survivor区域。如果一个对象足够老或Survivor2已满则将其移动到老年代。 最后当老年代接近满时将调用一个完整的GC。 Runtime.getRuntime.maxMemory 是程序能够使用的最大内存其值会比实际配置的执行器内存的值小。这是因为内存分配池的堆部分分为 EdenSurvivor 和 Tenured 三部分空间而这里面一共包含了两个 Survivor 区域而这两个 Survivor 区域在任何时候我们只能用到其中一个所以我们可以使用下面的公式进行描述 ExecutorMemory Eden 2 * Survivor Tenured Runtime.getRuntime.maxMemory Eden Survivor Tenured Spark中GC调优的目标是确保只有长寿命的rdd存储在Old代中而Young代的大小足以存储短寿命的对象。 这将有助于避免完整的gc收集任务执行期间创建的临时对象。 一些可能有用的步骤是: 通过收集GC统计信息来检查垃圾收集是否过多。 如果在任务完成之前多次调用全GC则意味着没有足够的内存可用来执行任务。如果minor collections太多而major gc不多那么为Eden分配更多的内存会有所帮助。 您可以将Eden的大小设置为每个任务所需内存的高估值。 如果Eden的大小被确定为E那么您可以使用选项-Xmn4/3*E设置Young代的大小。 (扩大4/3也是为了考虑survivor 区域所使用的空间。-XX:SurvivorRatio的值默认为8代表Survivor和Eden的比例为18而两个SurvivorEden就是28所以Eden占整个年轻代的4/5。在打印的GC统计中如果OldGen接近满了可以通过降低spark.memory.fraction来减少用于缓存的内存数量因为降低数据缓存比降低任务执行速度要好。 或者考虑减少年轻代的大小。 这意味着如果您将-Xmn设置为上面的值则降低-Xmn如果没有请尝试更改JVM的NewRatio参数的值。-XX:NewRatio的值默认为2代表年轻代和老年代的比值是1:2即老年代占堆内存的2/3。它应该足够大以至于这个分数超过spark.memory.fraction。尝试使用-XX:UseG1GC的G1GC垃圾收集器。 在垃圾收集成为瓶颈的某些情况下它可以提高性能。 注意对于较大的执行器堆大小使用-XX:G1HeapRegionSize增加G1区域大小可能很重要例如如果您的任务正在从HDFS读取数据则可以通过从HDFS读取的数据块大小来估计任务占用的内存大小。 请注意解压后的块的大小通常是块大小的2到3倍。 因此如果我们希望有3或4个任务的工作空间而HDFS的块大小是128MB我们可以估计Eden的大小为43128MB。 mapPartition替换map 可以重复利用变量减少重复定义变量对资源的消耗
http://www.hkea.cn/news/14309690/

相关文章:

  • 方案策划网站wordpress 购买按钮
  • 网站制作的重要性内页网站地图 权重
  • 南京建设工程招聘信息网站河北搜索引擎推广服务
  • 郑州网站开发汉狮保山网站开发服务
  • 建设网站培训如何自己做网站站长
  • es网站建设从网站栏目看网站功能
  • 云南网站建设快速优化开发app需要什么技术人才
  • 门户网站的设计网络营销渠道
  • 垂直行业门户网站建设方案wordpress get_template_part
  • 秦皇岛吧贴吧网站关键词优化代理
  • 哈尔滨网站建设设计公司怎么做网站内部链接的优化
  • 做的好点的外贸网站有哪些Ext做网站
  • 做网站国外网站淮安网站建设公司电话
  • 定制旅游网站建设方案万网域名管理控制台
  • 龙泉市旅游门户网站建设好用吗
  • 广州新业建设管理有限公司网站网站制作一键生成
  • 福建建设执业资格中心网站做自适应网站
  • 南山网站制作联系电话国内做的比较大的外贸电商网站
  • idea网站开发教程wordpress登录加验证码
  • 公司网站建设与维护工作计划win7 建网站
  • 网站建设找金手指排名一条龙网站建设价格
  • 网站刚做好怎么做优化企业运营策划公司
  • 门户网站管理流程写作网站后台账号密码忘了怎么办
  • 临西网站建设网站到期不想续费
  • 转包网站建设做非法事情婚庆公司logo
  • 做任务送科比网站开源多用户商城系统细节
  • 备案需要网站空间网站友情链接有什么用
  • 企业网站模板 下载 论坛软件界面设计app
  • wordpress 网站图标设置方法2022年全球3月解封
  • 做兼职上哪个网站西安市建设工程信息网平台