自定义InputFormat InputFormat主要包括: ? ? ? ? ? InputSplit和RecordReader ? ? InputSplit用于定义Map的数目和确定最合适的执行节点(位置) ? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 ? 一个自定义分片实现要继承与抽
? ? ? ? ? inputsplit和recordreader
? ?InputSplit用于定义Map的数目和确定最合适的执行节点(位置)
? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。
? 一个自定义分片实现要继承与抽象类InputSplit,通过定义输入的长度和位置。分片的位置暗示调度器如何是放置一个分片的执行器(即,选择一个合适的TaskTracker)
? JobTracker处理分片的算法大致是:
? 基于存储机制和执行策略,分片的大小和位置是有不同的意思。例如在HDFS上,一个分片和一个物理数据块是一致的,分片的位置是这个数据块的物理存放位置的一个集合。
? 下面是FileInputFormat工作的机制:
下面是FileInputFormat创建分片的代码:
/**
* Generate the list of files and make them into FileSplits.
* @param job the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws IOException {
Stopwatch sw = new Stopwatch().start();
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: " + splits.size()
+ ", TimeTaken: " + sw.elapsedMillis());
}
return splits;
}
方法创建分片过程主要做了下面几件事:1、首先从Job对象中获取输入文件的状态信息FileStatus。2.然后针对每个文件获取块信息。3.根据文件是否可分割按照分片大小进行切割,如果不能则不分割。
有一种比较常见的MapReduce程序:计算密集型程序。
计算密集型MR程序即指:对于与每一个输入的键值对需要复杂的计算算法去处理,主要特征是每一个map处理函数需要比获取该处理数据更长的时间,至少一个量级。比如人脸识别程序。
Delphi 初级教程步步精通 pdf,简要概括一下内容:Delphi概述、Object Pascal语言基储三种结构的程序设计、数组、过程与函数、自定义类型、Delphi常用组件、多媒体应用编程、DLL的应用、数据库应用基储SQL数据库程序设计等。
0
如果使用默认的FileInputFormat去处理该类型应用的话,很大情况下会出现部门机器cpu负载过高,而其他的则比较闲。(可以通过ganglia监控分析)
默认情况下由于FileInputFormat的实现会根据数据的本地性来创建分片数据。然而对于计算密集型的程序数据的本地性可能不适合了。那我们应该如何做出改变呢?我们获取所有可用服务器各自计算能力的情况,根据服务器来分配创建分片。
因此我们需要重载分片函数。
下面我们重载SequenceFileInputFormat来演示如何实现上述要求:
ComputeIntensiveSequenceFileInputFormat继承SequenceFileInputFormat函数,重载gitSplits函数:
//重写分片函数
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
String[] servers = getActiveServersList(job);
if (servers == null)
return null;
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
int currentServer = 0;
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
if ((length != 0) && isSplitable(job, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
splits.add(new FileSplit(path, length - bytesRemaining,
splitSize, new String[] { servers[currentServer] }));
currentServer = getNextServer(currentServer, servers.length);
bytesRemaining -= splitSize;
}
} else if (length != 0) {
splits.add(new FileSplit(path, 0, length,
new String[] { servers[currentServer] }));
currentServer = getNextServer(currentServer, servers.length);
}
}
// Save the number of input files in the job-conf
job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
return splits;
}
//获取服务器列表
private String[] getActiveServersList(JobContext context) {
String[] servers = null;
try {
JobClient jc = new JobClient((JobConf) context.getConfiguration());
ClusterStatus status = jc.getClusterStatus(true);
Collection<String> atc = status.getActiveTrackerNames();
servers = new String[atc.size()];
int s = 0;
for (String serverInfo : atc) {
StringTokenizer st = new StringTokenizer(serverInfo, ":");
String trackerName = st.nextToken();
StringTokenizer st1 = new StringTokenizer(trackerName, "_");
st1.nextToken();
servers[s++] = st1.nextToken();
}
} catch (IOException e) {
e.printStackTrace();
}
return servers;
}
//选择一个服务器
private static int getNextServer(int current, int max) {
current++;
if (current >= max)
current = 0;
return current;
}
这个类继承于SequenceFileInputFormat,重写了getSplits()函数。计算分片的和FileInputFormat一样。只是原来数据的物理本地性由可用的服务器资源代替。两个主要函数:
getActiveServersList() 查询集群状态,计算一个可用的服务器名字列表
getNextServer() 获取一个服务器
因此我们想到可以把两个策略综合起来。首先放置尽可能多的任务为本地,分发剩下的到其他节点。
下面是实现这个方案的程序ComputeIntensiveLocalizedSequenceFileInputFormat:
@Override
public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> originalSplits = super.getSplits(job);
String[] servers = getActiveServersList(job);
if (servers == null)
return null;
List<InputSplit> splits = new ArrayList<InputSplit>(
originalSplits.size());
int numSplits = originalSplits.size();
int currentServer = 0;
for (int i = 0; i < numSplits; i++, currentServer = getNextServer(
currentServer, servers.length)) {
String server = servers[currentServer]; // Current server
boolean replaced = false;
for (InputSplit split : originalSplits) {
FileSplit fs = (FileSplit) split;
for (String l : fs.getLocations()) {
if (l.equals(server)) {
splits.add(new FileSplit(fs.getPath(), fs.getStart(),
fs.getLength(), new String[] { server }));
originalSplits.remove(split);
replaced = true;
break;
}
}
if (replaced)
break;
}
if (!replaced) {
FileSplit fs = (FileSplit) splits.get(0);
splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs
.getLength(), new String[] { server }));
originalSplits.remove(0);
}
}
return splits;
}
这里第一步利用父类(FileInputFormat))获取分片来确保数据本地性。对于每一个服务器,首先试着去指定本地的split给它。其他没有本地分片的则随机分配剩下的分片。
总结:
MapReduce的输入格式的重写主要要主要两个组件:InputFormat和Recordreader。本文主要讲述InputFormat的原理及怎么来重写InputFormat,根据业务的特点选择创建分片的策略。
原文地址:MR总结(三)-MapReduce组件自定义, 感谢原作者分享。
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号