有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在文
有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。
刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 “mapreduce job让一个文件只由一个map来处理“。
或者是把目录写在文件里面,作为输入:
/path/to/directory1 /path/to/directory2 /path/to/directory3
代码里面按行读取:
所谓数组,就是相同数据类型的元素按一定顺序排列的集合,就是把有限个类型相同的变量用一个名字命名,然后用编号区分他们的变量的集合,这个名字称为数组名,编号称为下标。组成数组的各个变量称为数组的分量,也称为数组的元素,有时也称为下标变量。数组是在程序设计中,为了处理方便, 把具有相同类型的若干变量按有序的形式组织起来的一种形式。这些按序排列的同类数据元素的集合称为数组。 数组应用&二维数组目录 1. 数组的简单应用2. 数组排序3. 数组查找4. 数组的使用思想5. 查表法6. 二维数组7. 数组综合
0
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(context.getConfiguration());
for (FileStatus status : fs.listStatus(new Path(value.toString()))) {
// process file
}
}
都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:
import java.io.IOException;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
* 一个map处理一个目录的数据
*/
public abstract class OneMapOneDirectoryInputFormat extends CombineFileInputFormat {
private static final Log LOG = LogFactory.getLog(OneMapOneDirectoryInputFormat.class);
@Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
@Override
public List getSplits(JobContext job) throws IOException {
// get all the files in input path
List stats = listStatus(job);
List splits = new ArrayList();
if (stats.size() == 0) {
return splits;
}
LOG.info("fileNums=" + stats.size());
Map> map = new HashMap>();
for (FileStatus stat : stats) {
String directory = stat.getPath().getParent().toString();
if (map.containsKey(directory)) {
map.get(directory).add(stat);
} else {
List fileList = new ArrayList();
fileList.add(stat);
map.put(directory, fileList);
}
}
// 设置inputSplit
long currentLen = 0;
List pathLst = new ArrayList();
List offsetLst = new ArrayList();
List lengthLst = new ArrayList();
Iterator itr = map.keySet().iterator();
while (itr.hasNext()) {
String dir = itr.next();
List fileList = map.get(dir);
for (int i = 0; i < fileList.size(); i++) {
FileStatus stat = fileList.get(i);
pathLst.add(stat.getPath());
offsetLst.add(0L);
lengthLst.add(stat.getLen());
currentLen += stat.getLen();
}
Path[] pathArray = new Path[pathLst.size()];
CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
+ ") length(" + currentLen + ")");
for (int i = 0; i < pathArray.length; i++) {
LOG.info(" -> path[" + i + "]=" + pathArray[i].toString());
}
splits.add(thissplit);
pathLst.clear();
offsetLst.clear();
lengthLst.clear();
currentLen = 0;
}
return splits;
}
private long[] getLongArray(List lst) {
long[] rst = new long[lst.size()];
for (int i = 0; i < lst.size(); i++) {
rst[i] = lst.get(i);
}
return rst;
}
}
这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。
原文地址:Hadoop : 一个目录下的数据只由一个map处理, 感谢原作者分享。
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号