最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

Hadoop:一个目录下的数据只由一个map处理

来源:动视网 责编:小采 时间:2020-11-09 13:05:20
文档

Hadoop:一个目录下的数据只由一个map处理

Hadoop:一个目录下的数据只由一个map处理:有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在
推荐度:
导读Hadoop:一个目录下的数据只由一个map处理:有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在


代码里面按行读取:

 @Override
 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 FileSystem fs = FileSystem.get(context.getConfiguration());
 for (FileStatus status : fs.listStatus(new Path(value.toString()))) {
 // process file
 }
 }

都不能满足需求,还是自己实现一个 OneMapOneDirectoryInputFormat 吧,也很简单:

import java.io.IOException;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
/**
 * 一个map处理一个目录的数据
 */
public abstract class OneMapOneDirectoryInputFormat extends CombineFileInputFormat {
 private static final Log LOG = LogFactory.getLog(OneMapOneDirectoryInputFormat.class);
 @Override
 protected boolean isSplitable(JobContext context, Path file) {
 return false;
 }
 @Override
 public List getSplits(JobContext job) throws IOException {
 // get all the files in input path
 List stats = listStatus(job);
 List splits = new ArrayList();
 if (stats.size() == 0) {
 return splits;
 }
 LOG.info("fileNums=" + stats.size());
 Map> map = new HashMap>();
 for (FileStatus stat : stats) {
 String directory = stat.getPath().getParent().toString();
 if (map.containsKey(directory)) {
 map.get(directory).add(stat);
 } else {
 List fileList = new ArrayList();
 fileList.add(stat);
 map.put(directory, fileList);
 }
 }
 // 设置inputSplit
 long currentLen = 0;
 List pathLst = new ArrayList();
 List offsetLst = new ArrayList();
 List lengthLst = new ArrayList();
 Iterator itr = map.keySet().iterator();
 while (itr.hasNext()) {
 String dir = itr.next();
 List fileList = map.get(dir);
 for (int i = 0; i < fileList.size(); i++) {
 FileStatus stat = fileList.get(i);
 pathLst.add(stat.getPath());
 offsetLst.add(0L);
 lengthLst.add(stat.getLen());
 currentLen += stat.getLen();
 }
 Path[] pathArray = new Path[pathLst.size()];
 CombineFileSplit thissplit = new CombineFileSplit(pathLst.toArray(pathArray),
 getLongArray(offsetLst), getLongArray(lengthLst), new String[0]);
 LOG.info("combineFileSplit(" + splits.size() + ") fileNum(" + pathLst.size()
 + ") length(" + currentLen + ")");
 for (int i = 0; i < pathArray.length; i++) {
 LOG.info(" -> path[" + i + "]=" + pathArray[i].toString());
 }
 splits.add(thissplit);
 pathLst.clear();
 offsetLst.clear();
 lengthLst.clear();
 currentLen = 0;
 }
 return splits;
 }
 private long[] getLongArray(List lst) {
 long[] rst = new long[lst.size()];
 for (int i = 0; i < lst.size(); i++) {
 rst[i] = lst.get(i);
 }
 return rst;
 }
}

这个InputFormat的具体使用方法就不说了。其实与“一个Hadoop程序的优化过程 – 根据文件实际大小实现CombineFileInputFormat”中的MultiFileInputFormat比较类似。

文档

Hadoop:一个目录下的数据只由一个map处理

Hadoop:一个目录下的数据只由一个map处理:有这么个需求:一个目录下的数据只能由一个map来处理。如果多个map处理了同一个目录下的数据会导致数据错乱。 刚开始google了下,以为网上都有现成的InputFormat,找到的答案类似我之前写的 mapreduce job让一个文件只由一个map来处理。 或者是把目录写在
推荐度:
标签: 一个 的数据 处理
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top