前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MapReduce:N keys,N files(四)终极解决方案

MapReduce:N keys,N files(四)终极解决方案

作者头像
YG
发布2018-12-21 16:55:11
6240
发布2018-12-21 16:55:11
举报
文章被收录于专栏:YG小书屋YG小书屋

在文章 MapReduce:N keys,N files(二) 中提到取消MR的推测执行功能,可以避免每次都产生重复且不完整的orc文件。但其实当reduce任务失败重试时依然会出现这种情况。

【解决方案】

FileOutputCommitter 中有abortJob和abortTask方法,当job失败或者task失败时会回调该方法。我们只要重写这两个方法即可。

我们的目的是重写FileOutputCommitter的abortJob和abortTask方法。因此只需要一个子类继承并重写FileOutputCommitter的abortJob和abortTask方法即可。

FileOutputCommitter是FileOutputFormat的一个属性,但FileOutputFormat中没有设置FileOutputCommitter的方法,其初始化在getOutputCommitter中:

代码语言:javascript
复制
  public synchronized 
     OutputCommitter getOutputCommitter(TaskAttemptContext context
                                        ) throws IOException {
    if (committer == null) {
      Path output = getOutputPath(context);
      committer = new FileOutputCommitter(output, context);
    }
    return committer;
  }
}

我们注意到程序中都是通过这个方法获取到的committer。只要重写该方法即可。完整的解决方案如下:

代码语言:javascript
复制
package is.split;

import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class OrcReNameFileOutputFormat extends OrcOutputFormat {
    static Map<String, OrcMapreduceRecordWriter> recordWriterMap = new HashMap<String, OrcMapreduceRecordWriter>();
    static Map<String, Path> pathMap = new HashMap<String, Path>();

    private OutputCommitter newCommitter;
    @Override
    public RecordWriter<Text, OrcStruct> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        return new OrcReNameMapreduceRecordWriter(taskAttemptContext);
    }

    public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context
    ) throws IOException {
        if(newCommitter == null){
            Path output = getOutputPath(context);
            this.newCommitter = new OrcReNameFileOutputCommitter(output, context);
        }
        return this.newCommitter;
    }

    private class OrcReNameMapreduceRecordWriter extends RecordWriter<Text, OrcStruct>{

        //private OrcMapreduceRecordWriter realWrite ;
        private TaskAttemptContext taskAttemptContext;
        private final String GroupName = "is";
        private final String CounterName = "is_output_record";
        private String curTime ="";
        public OrcReNameMapreduceRecordWriter(TaskAttemptContext taskAttemptContext){
            this.taskAttemptContext = taskAttemptContext;
            this.curTime = ISTool.getCurTime();
        }

        public void write(Text key, OrcStruct value) throws IOException, InterruptedException {
            OrcMapreduceRecordWriter realWrite = recordWriterMap.get(key.toString());
            if (realWrite == null){
                String outputDirPath = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR ) + "/" + key.toString();
                //Path filename = new Path(new Path(outputDirPath), this.curTime );
                Path filename = new Path(new Path(outputDirPath), ISTool.getCurTime() + "_" + ISTool.getLocalIp() + "_" + RandomStringUtils.randomAlphanumeric(8));
                Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(taskAttemptContext.getConfiguration()));
                realWrite = new OrcMapreduceRecordWriter<OrcStruct>(writer);
                recordWriterMap.put(key.toString(), realWrite);
                pathMap.put(key.toString(), filename);
            }
            realWrite.write(NullWritable.get(), value);
            this.taskAttemptContext.getCounter(GroupName, CounterName).increment(1);
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Map.Entry<String, OrcMapreduceRecordWriter> entry : recordWriterMap.entrySet()) {
                if (entry.getValue() != null){
                    entry.getValue().close(context);
                }
            }
            recordWriterMap.clear();
        }
    }


    private class OrcReNameFileOutputCommitter extends FileOutputCommitter {

        public OrcReNameFileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
            super(outputPath, context);
        }

        public void deleteFiles(JobContext context){
            for (Map.Entry<String, Path> entry : pathMap.entrySet()) {
                if (entry.getValue() != null){
                    FileSystem fs = null;
                    try {
                        fs = entry.getValue()
                                .getFileSystem(context.getConfiguration());
                        fs.delete(entry.getValue(), true);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                }
            }
            pathMap.clear();
        }

        @Override
        public void abortJob(JobContext context, JobStatus.State state)
                throws IOException {
            // delete the _temporary folder
            super.abortJob(context,state);
            if (state != JobStatus.State.SUCCEEDED){
                deleteFiles(context);
            }
        }

        @Override
        public void abortTask(TaskAttemptContext context) throws IOException{
            // delete the _temporary folder
            super.abortTask(context);
            deleteFiles(context);

        }
    }
}

【疑问】

1、reduce任务失败时,如果是reduce内部处理失败,可以回调abortTask进行文件清理。但若是这个reduce节点挂掉了呢?

答案:若是节点挂掉,也就是说执行task的container挂掉, task attempt会回调该函数.

TaskAttemptImpl的StateMachineFactory中添加了Container fail和kill的事件,添加了TaskCleanupTransition监听器。CommitterEventHandler会处理添加的TaskCleanupEvent事件,回调abortTask方法。

代码语言:javascript
复制
    // Transitions from FAIL_CONTAINER_CLEANUP state.
     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
         TaskAttemptState.FAIL_TASK_CLEANUP,
         TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
      // Ignore-able events
     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
         EnumSet.of(TaskAttemptEventType.TA_KILL,
             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
             TaskAttemptEventType.TA_UPDATE,
             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
             TaskAttemptEventType.TA_COMMIT_PENDING,
             TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
             TaskAttemptEventType.TA_DONE,
             TaskAttemptEventType.TA_FAILMSG,
             TaskAttemptEventType.TA_TIMED_OUT))
      // Transitions from KILL_CONTAINER_CLEANUP
     .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
         TaskAttemptState.KILL_TASK_CLEANUP,
         TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())


  private static class TaskCleanupTransition implements
      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
    @Override
    public void transition(TaskAttemptImpl taskAttempt, 
        TaskAttemptEvent event) {
      TaskAttemptContext taskContext =
        new TaskAttemptContextImpl(taskAttempt.conf,
            TypeConverter.fromYarn(taskAttempt.attemptId));
      taskAttempt.eventHandler.handle(new TaskCleanupEvent(
          taskAttempt.attemptId,
          taskAttempt.committer,
          taskContext));
    }
  }
  
    //上面的handler是CommitterEventHandler,CommitterEventHandler将接收到的时间存入Queue,启动后台线程依次处理事件。
    
  public void run() {
      LOG.info("Processing the event " + event.toString());
      switch (event.getType()) {
      case JOB_SETUP:
        handleJobSetup((CommitterJobSetupEvent) event);
        break;
      case JOB_COMMIT:
        handleJobCommit((CommitterJobCommitEvent) event);
        break;
      case JOB_ABORT:
        handleJobAbort((CommitterJobAbortEvent) event);
        break;
      case TASK_ABORT:
        handleTaskAbort((CommitterTaskAbortEvent) event);
        break;
      default:
        throw new YarnRuntimeException("Unexpected committer event "
            + event.toString());
      }
    }
    
    //handleTaskAbort的过程
      protected void handleTaskAbort(CommitterTaskAbortEvent event) {
      try {
        committer.abortTask(event.getAttemptContext());
      } catch (Exception e) {
        LOG.warn("Task cleanup failed for attempt " + event.getAttemptID(), e);
      }
      context.getEventHandler().handle(
          new TaskAttemptEvent(event.getAttemptID(),
              TaskAttemptEventType.TA_CLEANUP_DONE));
    }

至此终于解决了问题。。

本文参与?腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.11.30 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客?前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与?腾讯云自媒体同步曝光计划? ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 【解决方案】
  • 【疑问】
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
http://www.vxiaotou.com