动漫风格网站,企业网站制作报价单,网站建设跟pc官网一样吗,美容视频视频网站建设个人博客地址#xff1a;Sqoop源码修改#xff1a;增加落地HDFS文件数与MapTask数量一致性检查 | 一张假钞的真实世界
本篇是对记录一次Sqoop从MySQL导入数据到Hive问题的排查经过的补充。
Sqoop 命令通过 bin 下面的脚本调用#xff0c;调用如下#xff1a;
exec ${HAD…个人博客地址Sqoop源码修改增加落地HDFS文件数与MapTask数量一致性检查 | 一张假钞的真实世界
本篇是对记录一次Sqoop从MySQL导入数据到Hive问题的排查经过的补充。
Sqoop 命令通过 bin 下面的脚本调用调用如下
exec ${HADOOP_COMMON_HOME}/bin/hadoop org.apache.sqoop.Sqoop $
org.apache.sqoop.Sqoop 是 Sqoop 的入口类在此主要是解析参数及初始化工具类然后通过 org.apache.hadoop.util.ToolRunner 类调用对应的工具完成操作。Sqoop 的 Import 操作对应的是 org.apache.sqoop.tool.ImportTool 类。
在 ImportTool 类的 return 代码前增加以下代码
int numMappers options.getNumMappers();String hDbName options.getHCatDatabaseName();
String hTableName options.getHCatTableName();
String hPartKeys options.getHCatalogPartitionKeys();
String hPartVals options.getHCatalogPartitionValues();if(isStringNotEmpty(hDbName) isStringNotEmpty(hTableName) isStringNotEmpty(hPartKeys) isStringNotEmpty(hPartVals)) {String[] partKeys hPartKeys.split(,);String[] partVals hPartVals.split(,);String partPathStr ;if(partKeys.length 0 partVals.length partKeys.length) {for(int i 0; i partKeys.length; i) {partPathStr partKeys[i] partVals[i] /;}}String targetDir /user/hive/warehouse/ hDbName .db/ hTableName / partPathStr;targetDir targetDir.toLowerCase();LOG.info(---------targetDir targetDir);try {FileSystem fs FileSystem.get(options.getConf());RemoteIteratorLocatedFileStatus rIter fs.listFiles(new Path(targetDir), false);int fileCount 0;while(rIter.hasNext()) {fileCount;rIter.next();}LOG.info(---------------fileCount fileCount);if(numMappers ! fileCount) {LOG.error(files number in hdfs not equals mapper task number !);return 2;}} catch (IOException e) {LOG.error(count files number from hdfs error !);e.printStackTrace();return 3;}
}
改动只针对 Sqoop 集成 HCatalog 方式导入 ORC 格式的情况。因为我们的数据仓库中都采用的是这种方式。 优化当 MySQL 中记录数特别少时如少于 4 条记录则默认 Sqoop 的 MapTask 数量为 4 但其实际执行时因为原始记录数不够则实际执行的 MapTask 数量会跟实际的记录数一致此时 split 数量跟落地 HDFS 的文件数量一致。所以可以根据 Sqoop 对应 MR 的实际 split 数量进行判断文件数量。