当前位置: 首页 > news >正文

《网站建设与管理》试卷(b)答案网页设计图

《网站建设与管理》试卷(b)答案,网页设计图,wordpress弃用react,如何做微信小程序?通过Spark读取Parquet文件的基本流程 SQL > Spark解析SQL生成逻辑计划树 LogicalPlan > Spark创建扫描表/读取数据的逻辑计划结点 DataSourceV2ScanRelation > Spark优化逻辑计划树,生成物理计划树 SparkPlan > Spark根据不同的属性,将逻辑…

通过Spark读取Parquet文件的基本流程

SQL
==> Spark解析SQL生成逻辑计划树
LogicalPlan
==> Spark创建扫描表/读取数据的逻辑计划结点
DataSourceV2ScanRelation
==> Spark优化逻辑计划树,生成物理计划树
SparkPlan
==> Spark根据不同的属性,将逻辑计划结点DataSourceV2ScanRelation转换成物理计划结点BatchScanExec
BatchScanExec
==> BatchScanExec::inputRDD属性的延迟生成DataSourceRDD实例
DataSourceRDD
==> DataSourceRDD::compute方法创建PartitionReader实例
PartitionReader
==> Iceberg中实现了Spark中的BatchDataReader接口
BatchDataReader
==> BatchDataReader::open方法会创建Parquet文件上的迭代器(Spark中遍历数据的过程都是基于迭代器)
VectorizedParquetReader
==> VectorizedParquetReader::next方法,读取Parquet文件中的内容,并封装成Spark中的ColumnarBatch对象
ColumnarBatch

两种BaseBatchReader的实现类

BaseBatchReader支持以Batch + Vectorized的特性,读取底层的文件。

ColumnarBatchReader

通过VectorizedSparkParquetReaders::build Reader()静态方法创建的读取器,关键特性如下:

  1. 支持读取Delete File
  2. 以Arrow的格式直接读取Parquet文件
  3. 最终返回的数据集的类型为Spark.ColumnarBatch,是Spark中的实现类
  public static ColumnarBatchReader buildReader(Schema expectedSchema,MessageType fileSchema,Map<Integer, ?> idToConstant,DeleteFilter<InternalRow> deleteFilter) {return (ColumnarBatchReader)TypeWithSchemaVisitor.visit(expectedSchema.asStruct(),fileSchema,new ReaderBuilder(expectedSchema,fileSchema,NullCheckingForGet.NULL_CHECKING_ENABLED,idToConstant,ColumnarBatchReader::new,deleteFilter));

ArrowBatchReader

通过ArrowReader::buildReader()静态方法创建的读取器,关键特性如下:

  1. 不支持读取Delete File
  2. 以Arrow的格式直接读取Parquet文件
  3. 返回的最终结果为ColumnarBatch类型,是Iceberg内置的实现类

在Iceberg 1.2.x的版本中,只在测试用例中使用到,因此在这里不再讨论,它的实现比ColumnarBatchReader更简单。

ColumnarBatchReader的创建

DataSourceRDD::compute方法中创建PartitionReader实例

// 在计算RDD数据的过程中,会通过如下的方法创建一个实现了PartitionReader接口的具体类的实例,
// 这里partitionReaderFactory的类型为SparkColumnarReaderFactory,
// SparkColumnarReaderFactory类是Iceberg中的实现,它重写了createColumnarReader(InputPartition)接口
// 以返回一个PartitionReader<ColumnarBatch>的实例。
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)

PartitionReaderFactory.createColumnarReader方法创建BatchDataReader实例

class SparkColumnarReaderFactory implements PartitionReaderFactory {public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition inputPartition) {SparkInputPartition partition = (SparkInputPartition) inputPartition;if (partition.allTasksOfType(FileScanTask.class)) {return new BatchDataReader(partition, batchSize);} else {throw new UnsupportedOperationException("Unsupported task group for columnar reads: " + partition.taskGroup());}}
}

BatchDataReader::open方法创建VectorizedParquetReader迭代器

BatchDataReader::open

class BatchDataReader extends BaseBatchReader<FileScanTask>implements PartitionReader<ColumnarBatch> {@Overrideprotected CloseableIterator<ColumnarBatch> open(FileScanTask task) {// 获取Data File的路径String filePath = task.file().path().toString();LOG.debug("Opening data file {}", filePath);// update the current file for Spark's filename() functionInputFileBlockHolder.set(filePath, task.start(), task.length());Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());// 获取底层文件的句柄InputFile inputFile = getInputFile(filePath);Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");// 获取数据文件对应的Delete FilesSparkDeleteFilter deleteFilter =task.deletes().isEmpty()? null: new SparkDeleteFilter(filePath, task.deletes(), counter());// 返回一个数据文件上的迭代器return newBatchIterable(inputFile,task.file().format(),task.start(),task.length(),task.residual(),idToConstant,deleteFilter).iterator();}
}

BaseBatchReader::newBatchIterable方法创建VectorizedParquetReader实例

VectorizedParquetReader类是最上层的类,它提供了对遍历文件内容的入口。

abstract class BaseBatchReader<T extends ScanTask> extends BaseReader<ColumnarBatch, T> {protected CloseableIterable<ColumnarBatch> newBatchIterable(InputFile inputFile,FileFormat format,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {switch (format) {case PARQUET:// 如果文件的格式是PARQUET,则创建一个Parquet上的迭代器return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter);case ORC:// 忽略,不讨论return newOrcIterable(inputFile, start, length, residual, idToConstant);default:throw new UnsupportedOperationException("Format: " + format + " not supported for batched reads");}}private CloseableIterable<ColumnarBatch> newParquetIterable(InputFile inputFile,long start,long length,Expression residual,Map<Integer, ?> idToConstant,SparkDeleteFilter deleteFilter) {// get required schema if there are deletesSchema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema();return Parquet.read(inputFile).project(requiredSchema).split(start, length)// 指定可以创建BaseBatchReader的实现类的实例的方法.createBatchedReaderFunc(fileSchema ->VectorizedSparkParquetReaders.buildReader(requiredSchema, fileSchema, idToConstant, deleteFilter)).recordsPerBatch(batchSize).filter(residual).caseSensitive(caseSensitive())// Spark eagerly consumes the batches. So the underlying memory allocated could be reused// without worrying about subsequent reads clobbering over each other. This improves// read performance as every batch read doesn't have to pay the cost of allocating memory..reuseContainers().withNameMapping(nameMapping()).build();}
}

ColumnarBatchReader::new方法创建ColumnarBatchReader实例

VectorizedSparkParquetReaders.buildReader()方法见第一大章节的简述。

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {private final boolean hasIsDeletedColumn;private DeleteFilter<InternalRow> deletes = null;private long rowStartPosInBatch = 0;// 只有一个构造器,readers是保存了读取文件中每一个列(字段)的Reader,它们都是实现了VectorizedReader<T>接口的// VectorizedArrowReader<T>的实例public ColumnarBatchReader(List<VectorizedReader<?>> readers) {super(readers);// 遍历每一个字段的Reader类型,看看当前文件中是不是存在内置的列_deleted,它标识着当前当前行是不是被删除了。this.hasIsDeletedColumn =readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);}
}

Parquet文件读取

通过前面的分析,知道对上层(Spark RDD)可见的接口,是由VectorizedParquetReader(一个Iterator的实现类)提供的,
它内部封装了对ColumnarBatchReader的操作。

VectorizedParquetReader::iterator方法,返回Parquet文件上的迭代器

public class VectorizedParquetReader<T> extends CloseableGroup implements CloseableIterable<T> {@Overridepublic CloseableIterator<T> iterator() {FileIterator<T> iter = new FileIterator<>(init());addCloseable(iter);return iter;}
}

FileIterator::next方法,读取数据

由于FilterIterator实现了JAVA中的Iterator接口,因此可以在compute Spark RDD时,通过这个迭代器,获取到文件中的内容,
也就是next()方法返回的ColumnarBatch对象。

  /*** 这里T的类型为ColumnarBatch。*/private static class FileIterator<T> implements CloseableIterator<T> {public T next() {if (!hasNext()) {throw new NoSuchElementException();}if (valuesRead >= nextRowGroupStart) {// 第一次执行时,valuesRead == nextRowGroupStart,表示开始读取一个新的RowGroup// 这里调用advance()后,nextRowGroupStart指向了下一个要读取的RowGroup的起始位置,// 但当前的RowGroup是还没有被读取的,被延迟到了后面的过程。advance();}// batchSize is an integer, so casting to integer is safe// 读取当前RowGroup的数据,其中://   nextRowGroupStart指向的是下一个RowGroup的起始位置,//   valuesRead的值表示一共读取了多少行// 这里必须有nextRowGroupStart >= nextRowGroupStart,而它们的差值就是当前RowGroup剩余的没有被读取的行int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize);// 读取指定数量的行,这里的model就是前面提到的ColumnarBatchReader的实例对象。if (reuseContainers) {this.last = model.read(last, numValuesToRead);} else {this.last = model.read(null, numValuesToRead);}// 累加读取的行数valuesRead += numValuesToRead;return last;}/*** 移动读取指针到下一个RowGroup的起始位置。*/private void advance() {while (shouldSkip[nextRowGroup]) {nextRowGroup += 1;reader.skipNextRowGroup();}PageReadStore pages;try {pages = reader.readNextRowGroup();} catch (IOException e) {throw new RuntimeIOException(e);}// 从绑定的RowGroups信息中,计算下一个RowGroup的起始位置long rowPosition = rowGroupsStartRowPos[nextRowGroup];model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup), rowPosition);nextRowGroupStart += pages.getRowCount();nextRowGroup += 1;}}

ColumnarBatchReader::read

public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {protected final VectorHolder[] vectorHolders;@Overridepublic final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {if (reuse == null) {// 如果指定了不复用当前的VectorHolder来存储数据时,就关闭它们closeVectors();}// 由内部类ColumnBatchLoader负责代理进行真正的读取操作。ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();rowStartPosInBatch += numRowsToRead;return columnarBatch;}
}

ColumnBatchLoader::loadDataToColumnBatch读取数据,封装成ColumnarBatch对象

  private class ColumnBatchLoader {// 读取的数据记录总数private final int numRowsToRead;// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when// there is no deletesprivate int[] rowIdMapping;// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"// metadata columnprivate boolean[] isDeleted;ColumnBatchLoader(int numRowsToRead) {Preconditions.checkArgument(numRowsToRead > 0, "Invalid number of rows to read: %s", numRowsToRead);this.numRowsToRead = numRowsToRead;if (hasIsDeletedColumn) {isDeleted = new boolean[numRowsToRead];}}ColumnarBatch loadDataToColumnBatch() {// 对读取的数据记录进行过滤,得到未删除的数据记录总数int numRowsUndeleted = initRowIdMapping();// 以Arrows格式,读取每一列的数据,表示为Spark.ColumnVector类型ColumnVector[] arrowColumnVectors = readDataToColumnVectors();// 创建一个ColumnarBatch实例,包含所有存活的数据ColumnarBatch newColumnarBatch = new ColumnarBatch(arrowColumnVectors);newColumnarBatch.setNumRows(numRowsUndeleted);if (hasEqDeletes()) {// 如果有等值删除的文件存在,则还需要按值来过滤掉被删除的数据行// 由于基于等值删除的文件过滤数据时,需要知道每一行的实际值,因此只有将数据读取到内存中才知道哪一行要被删除掉applyEqDelete(newColumnarBatch);}if (hasIsDeletedColumn && rowIdMapping != null) {// 如果存在被删除的数据行,则需要重新分配行号,从0开始自然递增// reset the row id mapping array, so that it doesn't filter out the deleted rowsfor (int i = 0; i < numRowsToRead; i++) {rowIdMapping[i] = i;}newColumnarBatch.setNumRows(numRowsToRead);}// 返回return newColumnarBatch;}ColumnVector[] readDataToColumnVectors() {ColumnVector[] arrowColumnVectors = new ColumnVector[readers.length];ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder();for (int i = 0; i < readers.length; i += 1) {vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);int numRowsInVector = vectorHolders[i].numValues();Preconditions.checkState(numRowsInVector == numRowsToRead,"Number of rows in the vector %s didn't match expected %s ",numRowsInVector,numRowsToRead);arrowColumnVectors[i] =columnVectorBuilder.withDeletedRows(rowIdMapping, isDeleted).build(vectorHolders[i], numRowsInVector);}return arrowColumnVectors;}boolean hasEqDeletes() {return deletes != null && deletes.hasEqDeletes();}int initRowIdMapping() {Pair<int[], Integer> posDeleteRowIdMapping = posDelRowIdMapping();if (posDeleteRowIdMapping != null) {rowIdMapping = posDeleteRowIdMapping.first();return posDeleteRowIdMapping.second();} else {rowIdMapping = initEqDeleteRowIdMapping();return numRowsToRead;}}/*** 如果当前文件包含 positions delete files,那么需要建立索引数据结构*/Pair<int[], Integer> posDelRowIdMapping() {if (deletes != null && deletes.hasPosDeletes()) {return buildPosDelRowIdMapping(deletes.deletedRowPositions());} else {return null;}}/*** Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]* [F,F,T,F,F,F,T,F] -- After applying position deletes** @param deletedRowPositions a set of deleted row positions* @return the mapping array and the new num of rows in a batch, null if no row is deleted*/Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {if (deletedRowPositions == null) {return null;}// 为新读取的数据记录,创建一个数组,保存所有没有被删除的行号,从0开始// 基本算法:使用双指针,将所有未删除的行放到队列一端,且有序int[] posDelRowIdMapping = new int[numRowsToRead];int originalRowId = 0; // 指向待判定的行的下标int currentRowId = 0; // 存活行的下标while (originalRowId < numRowsToRead) {if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {// 如果当前行没有被删除,则将其添加到currentRowId指向的位置posDelRowIdMapping[currentRowId] = originalRowId;// currentRowId指向下一个待插入的位置  currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[originalRowId] = true;}deletes.incrementDeleteCount();}originalRowId++;}if (currentRowId == numRowsToRead) {// there is no delete in this batchreturn null;} else {return Pair.of(posDelRowIdMapping, currentRowId);}}int[] initEqDeleteRowIdMapping() {int[] eqDeleteRowIdMapping = null;if (hasEqDeletes()) {eqDeleteRowIdMapping = new int[numRowsToRead];for (int i = 0; i < numRowsToRead; i++) {eqDeleteRowIdMapping[i] = i;}}return eqDeleteRowIdMapping;}/*** Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]* [F,T,T,T,F,F,T,F] -- After applying equality deletes** @param columnarBatch the {@link ColumnarBatch} to apply the equality delete*/void applyEqDelete(ColumnarBatch columnarBatch) {// 对经过position deletes 过滤的数据行,进行按值删除Iterator<InternalRow> it = columnarBatch.rowIterator();int rowId = 0;int currentRowId = 0;while (it.hasNext()) { // 行式遍历InternalRow row = it.next();if (deletes.eqDeletedRowFilter().test(row)) {// the row is NOT deleted// skip deleted rows by pointing to the next undeleted row Id// 更新成员变量rowIdMappingrowIdMapping[currentRowId] = rowIdMapping[rowId];currentRowId++;} else {if (hasIsDeletedColumn) {isDeleted[rowIdMapping[rowId]] = true;}deletes.incrementDeleteCount();}rowId++;}// 更新最新的存活记录数columnarBatch.setNumRows(currentRowId);}}
http://www.hkea.cn/news/978724/

相关文章:

  • 对一个网站做性能测试谷歌paypal官网入口
  • 北京住房投资建设中心网站首页快速排名怎么做
  • 中国网站制作 第一个佛山网站优化
  • thinkphp做的教育网站微商引流推广
  • 做特卖网站手机版电商最好卖的十大产品
  • 怎样做网站平叿trinseo公司
  • 北京大兴最专业的网站建设公司如何推广一个项目
  • 网页设计最牛的网站建设宁波网站优化公司哪家好
  • 建设通查询如何做网站推广及优化
  • 城乡建设网站首页百度seo收录软件
  • 永久免费建个人网站培训网站建设
  • 如何使用jq做弹幕网站好用的磁力搜索引擎
  • 南充营销型网站建设高端品牌网站建设
  • 制作小程序和网站的公司搜狗收录提交入口网址
  • 手机站电影基础建站如何提升和优化
  • 江苏 网站备案百度贴吧官网app下载
  • 网站制作三站湖南网站seo公司
  • 简单做任务赚钱网站企业管理培训课程报名
  • 零点研究咨询集团官方网站建设相似图片在线查找
  • 网站开发需要什么软件关键词app
  • 360全景网站建设做了5天游戏推广被抓了
  • 政府网站建设经验典型材料河源今日头条新闻最新
  • 为什么要进行网站备案佛山市人民政府门户网站
  • 摄影网站开发背景百度app交易平台
  • 吉林网站建设石家庄百度快照优化排名
  • 大学生网站开发总结报告app推广接单发布平台
  • 自己做的网站怎么推广seo顾问培训
  • 怎么做业务网站百度搜索提交入口
  • 网页设计网站图片西安百度推广运营公司
  • 济南网站开发推广网络服务包括