最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

MR总结(三)-MapReduce组件自定义

来源:懂视网 责编:小采 时间:2020-11-09 12:58:50
文档

MR总结(三)-MapReduce组件自定义

MR总结(三)-MapReduce组件自定义:自定义InputFormat InputFormat主要包括: InputSplit和RecordReader InputSplit用于定义Map的数目和确定最合适的执行节点(位置) RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 一个自定义分片实现
推荐度:
导读MR总结(三)-MapReduce组件自定义:自定义InputFormat InputFormat主要包括: InputSplit和RecordReader InputSplit用于定义Map的数目和确定最合适的执行节点(位置) RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 一个自定义分片实现

自定义InputFormat InputFormat主要包括: ? ? ? ? ? InputSplit和RecordReader ? ? InputSplit用于定义Map的数目和确定最合适的执行节点(位置) ? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 ? 一个自定义分片实现要继承与抽

自定义InputFormat

InputFormat主要包括:

? ? ? ? ? InputSplit和RecordReader

? ?InputSplit用于定义Map的数目和确定最合适的执行节点(位置)

? ?RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。

? 一个自定义分片实现要继承与抽象类InputSplit,通过定义输入的长度和位置。分片的位置暗示调度器如何是放置一个分片的执行器(即,选择一个合适的TaskTracker)

? JobTracker处理分片的算法大致是:

  1. 通过TaskTracker节点的心跳获取可用的map槽资源
  2. 从排队等候的分片中找出那些可用的节点是本地的
  3. 向TaskTracker提交分片

? 基于存储机制和执行策略,分片的大小和位置是有不同的意思。例如在HDFS上,一个分片和一个物理数据块是一致的,分片的位置是这个数据块的物理存放位置的一个集合。

? 下面是FileInputFormat工作的机制:

  1. 继承InputSplit,计算文件的信息。如文件中数据块的开始位置和块的长度
  2. 获取文件的数据块的一个集合
  3. 创建一个数据分片,这个分片的长度和块大小一样,分片位置为这个快的位置。此外还含有文件位置,块位移,长度等信息。

下面是FileInputFormat创建分片的代码:


/**
 * Generate the list of files and make them into FileSplits.
 * @param job the job context
 * @throws IOException
 */
 public List getSplits(JobContext job) throws IOException {
 Stopwatch sw = new Stopwatch().start();
 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 long maxSize = getMaxSplitSize(job);

 // generate splits
 List splits = new ArrayList();
 List files = listStatus(job);
 for (FileStatus file: files) {
 Path path = file.getPath();
 long length = file.getLen();
 if (length != 0) {
 BlockLocation[] blkLocations;
 if (file instanceof LocatedFileStatus) {
 blkLocations = ((LocatedFileStatus) file).getBlockLocations();
 } else {
 FileSystem fs = path.getFileSystem(job.getConfiguration());
 blkLocations = fs.getFileBlockLocations(file, 0, length);
 }

 if (isSplitable(job, path)) {
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);

 long bytesRemaining = length;
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, splitSize,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 bytesRemaining -= splitSize;
 }

 if (bytesRemaining != 0) {
 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
 blkLocations[blkIndex].getHosts(),
 blkLocations[blkIndex].getCachedHosts()));
 }
 } else { // not splitable
 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
 blkLocations[0].getCachedHosts()));
 }
 } else {
 //Create empty hosts array for zero length files
 splits.add(makeSplit(path, 0, length, new String[0]));
 }
 }
 // Save the number of input files for metrics/loadgen
 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
 sw.stop();
 if (LOG.isDebugEnabled()) {
 LOG.debug("Total # of splits generated by getSplits: " + splits.size()
 + ", TimeTaken: " + sw.elapsedMillis());
 }
 return splits;
 }

方法创建分片过程主要做了下面几件事:1、首先从Job对象中获取输入文件的状态信息FileStatus。2.然后针对每个文件获取块信息。3.根据文件是否可分割按照分片大小进行切割,如果不能则不分割。

?案例:实现计算密集型InputFormat

有一种比较常见的MapReduce程序:计算密集型程序。

计算密集型MR程序即指:对于与每一个输入的键值对需要复杂的计算算法去处理,主要特征是每一个map处理函数需要比获取该处理数据更长的时间,至少一个量级。比如人脸识别程序。

如果使用默认的FileInputFormat去处理该类型应用的话,很大情况下会出现部门机器cpu负载过高,而其他的则比较闲。(可以通过ganglia监控分析)

默认情况下由于FileInputFormat的实现会根据数据的本地性来创建分片数据。然而对于计算密集型的程序数据的本地性可能不适合了。那我们应该如何做出改变呢?我们获取所有可用服务器各自计算能力的情况,根据服务器来分配创建分片。

因此我们需要重载分片函数。

下面我们重载SequenceFileInputFormat来演示如何实现上述要求:

ComputeIntensiveSequenceFileInputFormat继承SequenceFileInputFormat函数,重载gitSplits函数:

//重写分片函数
 @Override
 public List getSplits(JobContext job) throws IOException {
 String[] servers = getActiveServersList(job);
 if (servers == null)
 return null;
 List splits = new ArrayList();
 List files = listStatus(job);
 int currentServer = 0;
 for (FileStatus file : files) {
 Path path = file.getPath();
 long length = file.getLen();
 if ((length != 0) && isSplitable(job, path)) {
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
 long bytesRemaining = length;
 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
 splits.add(new FileSplit(path, length - bytesRemaining,
 splitSize, new String[] { servers[currentServer] }));
 currentServer = getNextServer(currentServer, servers.length);
 bytesRemaining -= splitSize;
 }
 } else if (length != 0) {
 splits.add(new FileSplit(path, 0, length,
 new String[] { servers[currentServer] }));
 currentServer = getNextServer(currentServer, servers.length);
 }
 }
 // Save the number of input files in the job-conf
 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
 return splits;
 }
 //获取服务器列表
 private String[] getActiveServersList(JobContext context) {
 String[] servers = null;
 try {
 JobClient jc = new JobClient((JobConf) context.getConfiguration());
 ClusterStatus status = jc.getClusterStatus(true);
 Collection atc = status.getActiveTrackerNames();
 servers = new String[atc.size()];
 int s = 0;
 for (String serverInfo : atc) {
 StringTokenizer st = new StringTokenizer(serverInfo, ":");
 String trackerName = st.nextToken();
 StringTokenizer st1 = new StringTokenizer(trackerName, "_");
 st1.nextToken();
 servers[s++] = st1.nextToken();
 }
 } catch (IOException e) {
 e.printStackTrace();
 }
 return servers;
 }
 //选择一个服务器
 private static int getNextServer(int current, int max) {
 current++;
 if (current >= max)
 current = 0;
 return current;
 }

这个类继承于SequenceFileInputFormat,重写了getSplits()函数。计算分片的和FileInputFormat一样。只是原来数据的物理本地性由可用的服务器资源代替。两个主要函数:
getActiveServersList() 查询集群状态,计算一个可用的服务器名字列表
getNextServer() 获取一个服务器

优化:
上面方案是否就已经很完美,没有问题了呢?答案是否定的。
在上面我们替换了数据物理本地性这个特性,那么会导致更多的数据传输的问题,会给网络io带来压力,影响io性能。

因此我们想到可以把两个策略综合起来。首先放置尽可能多的任务为本地,分发剩下的到其他节点。
下面是实现这个方案的程序ComputeIntensiveLocalizedSequenceFileInputFormat:

@Override
 public List getSplits(JobContext job) throws IOException {
 List originalSplits = super.getSplits(job);
 String[] servers = getActiveServersList(job);
 if (servers == null)
 return null;
 List splits = new ArrayList(
 originalSplits.size());
 int numSplits = originalSplits.size();
 int currentServer = 0;
 for (int i = 0; i < numSplits; i++, currentServer = getNextServer(
 currentServer, servers.length)) {
 String server = servers[currentServer]; // Current server
 boolean replaced = false;
 for (InputSplit split : originalSplits) {
 FileSplit fs = (FileSplit) split;
 for (String l : fs.getLocations()) {
 if (l.equals(server)) {
 splits.add(new FileSplit(fs.getPath(), fs.getStart(),
 fs.getLength(), new String[] { server }));
 originalSplits.remove(split);
 replaced = true;
 break;
 }
 }
 if (replaced)
 break;
 }
 if (!replaced) {
 FileSplit fs = (FileSplit) splits.get(0);
 splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs
 .getLength(), new String[] { server }));
 originalSplits.remove(0);
 }
 }
 return splits;
 }

这里第一步利用父类(FileInputFormat))获取分片来确保数据本地性。对于每一个服务器,首先试着去指定本地的split给它。其他没有本地分片的则随机分配剩下的分片。

总结:

MapReduce的输入格式的重写主要要主要两个组件:InputFormat和Recordreader。本文主要讲述InputFormat的原理及怎么来重写InputFormat,根据业务的特点选择创建分片的策略。

文档

MR总结(三)-MapReduce组件自定义

MR总结(三)-MapReduce组件自定义:自定义InputFormat InputFormat主要包括: InputSplit和RecordReader InputSplit用于定义Map的数目和确定最合适的执行节点(位置) RecordReader负责从输入文件里读取数据记录,并把数据提交给Mapper处理。 一个自定义分片实现
推荐度:
标签: 自定义 自定 map
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top