/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.net.URI;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.MapTask;
import org.apache.hadoop.mapred.ReduceTask;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.YarnChild;
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;

public class LocalContainerLauncher
extends AbstractService
implements ContainerLauncher {
    private static final File curDir = new File(".");
    private static final Log LOG = LogFactory.getLog(LocalContainerLauncher.class);
    private FileContext curFC = null;
    private Set<File> localizedFiles = new HashSet<File>();
    private final AppContext context;
    private final TaskUmbilicalProtocol umbilical;
    private final ClassLoader jobClassLoader;
    private ExecutorService taskRunner;
    private Thread eventHandler;
    private byte[] encryptedSpillKey = new byte[]{0};
    private BlockingQueue<ContainerLauncherEvent> eventQueue = new LinkedBlockingQueue<ContainerLauncherEvent>();

    public LocalContainerLauncher(AppContext context, TaskUmbilicalProtocol umbilical) {
        this(context, umbilical, null);
    }

    public LocalContainerLauncher(AppContext context, TaskUmbilicalProtocol umbilical, ClassLoader jobClassLoader) {
        super(LocalContainerLauncher.class.getName());
        this.context = context;
        this.umbilical = umbilical;
        this.jobClassLoader = jobClassLoader;
        try {
            this.curFC = FileContext.getFileContext((URI)curDir.toURI());
        }
        catch (UnsupportedFileSystemException ufse) {
            LOG.error("Local filesystem " + curDir.toURI().toString() + " is unsupported?? (should never happen)");
        }
        File[] curLocalFiles = curDir.listFiles();
        if (curLocalFiles != null) {
            HashSet<File> lf = new HashSet<File>(curLocalFiles.length);
            for (int j = 0; j < curLocalFiles.length; ++j) {
                lf.add(curLocalFiles[j]);
            }
            this.localizedFiles = Collections.unmodifiableSet(lf);
        }
    }

    public void serviceStart() throws Exception {
        this.taskRunner = HadoopExecutors.newSingleThreadExecutor((ThreadFactory)new ThreadFactoryBuilder().setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
        this.eventHandler = new Thread((Runnable)new EventHandler(), "uber-EventHandler");
        if (this.jobClassLoader != null) {
            LOG.info("Setting " + this.jobClassLoader + " as the context classloader of thread " + this.eventHandler.getName());
            this.eventHandler.setContextClassLoader(this.jobClassLoader);
        } else {
            LOG.info("Context classloader of thread " + this.eventHandler.getName() + ": " + this.eventHandler.getContextClassLoader());
        }
        this.eventHandler.start();
        super.serviceStart();
    }

    public void serviceStop() throws Exception {
        if (this.eventHandler != null) {
            this.eventHandler.interrupt();
        }
        if (this.taskRunner != null) {
            this.taskRunner.shutdownNow();
        }
        super.serviceStop();
    }

    @Override
    public void handle(ContainerLauncherEvent event) {
        try {
            this.eventQueue.put(event);
        }
        catch (InterruptedException e) {
            throw new YarnRuntimeException(e);
        }
    }

    public void setEncryptedSpillKey(byte[] encryptedSpillKey) {
        if (encryptedSpillKey != null) {
            this.encryptedSpillKey = encryptedSpillKey;
        }
    }

    @VisibleForTesting
    protected static MapOutputFile renameMapOutputForReduce(JobConf conf, TaskAttemptId mapId, MapOutputFile subMapOutputFile) throws IOException {
        LocalFileSystem localFs = FileSystem.getLocal((Configuration)conf);
        Path mapOut = subMapOutputFile.getOutputFile();
        FileStatus mStatus = localFs.getFileStatus(mapOut);
        Path reduceIn = subMapOutputFile.getInputFileForWrite(TypeConverter.fromYarn(mapId).getTaskID(), mStatus.getLen());
        Path mapOutIndex = subMapOutputFile.getOutputIndexFile();
        Path reduceInIndex = new Path(reduceIn.toString() + ".index");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Renaming map output file for task attempt " + mapId.toString() + " from original location " + mapOut.toString() + " to destination " + reduceIn.toString());
        }
        if (!localFs.mkdirs(reduceIn.getParent())) {
            throw new IOException("Mkdirs failed to create " + reduceIn.getParent().toString());
        }
        if (!localFs.rename(mapOut, reduceIn)) {
            throw new IOException("Couldn't rename " + mapOut);
        }
        if (!localFs.rename(mapOutIndex, reduceInIndex)) {
            throw new IOException("Couldn't rename " + mapOutIndex);
        }
        return new RenamedMapOutputFile(reduceIn);
    }

    private static class RenamedMapOutputFile
    extends MapOutputFile {
        private Path path;

        public RenamedMapOutputFile(Path path) {
            this.path = path;
        }

        @Override
        public Path getOutputFile() throws IOException {
            return this.path;
        }

        @Override
        public Path getOutputFileForWrite(long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getOutputFileForWriteInVolume(Path existing) {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getOutputIndexFile() throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getOutputIndexFileForWrite(long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getOutputIndexFileForWriteInVolume(Path existing) {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getSpillFile(int spillNumber) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getSpillFileForWrite(int spillNumber, long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getSpillIndexFile(int spillNumber) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getSpillIndexFileForWrite(int spillNumber, long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getInputFile(int mapId) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public Path getInputFileForWrite(TaskID mapId, long size) throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public void removeAll() throws IOException {
            throw new UnsupportedOperationException();
        }
    }

    private class EventHandler
    implements Runnable {
        private boolean doneWithMaps = false;
        private int finishedSubMaps = 0;
        private final Map<TaskAttemptId, Future<?>> futures = new ConcurrentHashMap();

        EventHandler() {
        }

        @Override
        public void run() {
            ContainerLauncherEvent event = null;
            final HashMap localMapFiles = new HashMap();
            while (!Thread.currentThread().isInterrupted()) {
                Future<?> future;
                try {
                    event = (ContainerLauncherEvent)LocalContainerLauncher.this.eventQueue.take();
                }
                catch (InterruptedException e) {
                    LOG.warn("Returning, interrupted : " + e);
                    break;
                }
                LOG.info("Processing the event " + event.toString());
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
                    final ContainerRemoteLaunchEvent launchEv = (ContainerRemoteLaunchEvent)event;
                    future = LocalContainerLauncher.this.taskRunner.submit(new Runnable(){

                        @Override
                        public void run() {
                            EventHandler.this.runTask(launchEv, localMapFiles);
                        }
                    });
                    this.futures.put(event.getTaskAttemptID(), future);
                    continue;
                }
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
                    TaskAttemptId taId;
                    if (event.getDumpContainerThreads()) {
                        try {
                            ThreadInfo[] tInfos;
                            System.out.println(new Date());
                            RuntimeMXBean rtBean = ManagementFactory.getRuntimeMXBean();
                            System.out.println("Full thread dump " + rtBean.getVmName() + " (" + rtBean.getVmVersion() + " " + rtBean.getSystemProperties().get("java.vm.info") + "):\n");
                            ThreadMXBean tmxBean = ManagementFactory.getThreadMXBean();
                            for (ThreadInfo ti : tInfos = tmxBean.dumpAllThreads(tmxBean.isObjectMonitorUsageSupported(), tmxBean.isSynchronizerUsageSupported())) {
                                System.out.println(ti.toString());
                            }
                        }
                        catch (Throwable t) {
                            System.out.println("Could not create full thread dump: " + t.getMessage());
                        }
                    }
                    if ((future = this.futures.remove(taId = event.getTaskAttemptID())) != null) {
                        LOG.info("canceling the task attempt " + taId);
                        future.cancel(true);
                    }
                    LocalContainerLauncher.this.context.getEventHandler().handle(new TaskAttemptEvent(taId, TaskAttemptEventType.TA_CONTAINER_CLEANED));
                    continue;
                }
                if (event.getType() == ContainerLauncher.EventType.CONTAINER_COMPLETED) {
                    LOG.debug("Container completed " + event.toString());
                    continue;
                }
                LOG.warn("Ignoring unexpected event " + event.toString());
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void runTask(ContainerRemoteLaunchEvent launchEv, Map<TaskAttemptID, MapOutputFile> localMapFiles) {
            TaskAttemptId attemptID = launchEv.getTaskAttemptID();
            Job job = LocalContainerLauncher.this.context.getAllJobs().get(attemptID.getTaskId().getJobId());
            int numMapTasks = job.getTotalMaps();
            int numReduceTasks = job.getTotalReduces();
            org.apache.hadoop.mapreduce.v2.app.job.Task ytask = job.getTask(attemptID.getTaskId());
            Task remoteTask = launchEv.getRemoteTask();
            LocalContainerLauncher.this.context.getEventHandler().handle(new TaskAttemptContainerLaunchedEvent(attemptID, -1));
            if (numMapTasks == 0) {
                this.doneWithMaps = true;
            }
            try {
                if (remoteTask.isMapOrReduce()) {
                    JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
                    jce.addCounterUpdate(JobCounter.TOTAL_LAUNCHED_UBERTASKS, 1L);
                    if (remoteTask.isMapTask()) {
                        jce.addCounterUpdate(JobCounter.NUM_UBER_SUBMAPS, 1L);
                    } else {
                        jce.addCounterUpdate(JobCounter.NUM_UBER_SUBREDUCES, 1L);
                    }
                    LocalContainerLauncher.this.context.getEventHandler().handle(jce);
                }
                this.runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks, numReduceTasks > 0, localMapFiles);
                LocalContainerLauncher.this.context.getEventHandler().handle(new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
            }
            catch (RuntimeException re) {
                JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
                jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1L);
                LocalContainerLauncher.this.context.getEventHandler().handle(jce);
                LocalContainerLauncher.this.context.getEventHandler().handle(new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED));
            }
            catch (IOException ioe) {
                LOG.fatal("oopsie...  this can never happen: " + StringUtils.stringifyException((Throwable)ioe));
                ExitUtil.terminate((int)-1);
            }
            finally {
                if (this.futures.remove(attemptID) != null) {
                    LOG.info("removed attempt " + attemptID + " from the futures to keep track of");
                }
            }
        }

        private void runSubtask(Task task, TaskType taskType, TaskAttemptId attemptID, int numMapTasks, boolean renameOutputs, Map<TaskAttemptID, MapOutputFile> localMapFiles) throws RuntimeException, IOException {
            TaskAttemptID classicAttemptID = TypeConverter.fromYarn(attemptID);
            try {
                JobConf conf = new JobConf(LocalContainerLauncher.this.getConfig());
                conf.set("mapreduce.task.id", task.getTaskID().toString());
                conf.set("mapreduce.task.attempt.id", classicAttemptID.toString());
                conf.setBoolean("mapreduce.task.ismap", taskType == TaskType.MAP);
                conf.setInt("mapreduce.task.partition", task.getPartition());
                conf.set("mapreduce.job.id", task.getJobID().toString());
                String[] localSysDirs = StringUtils.getTrimmedStrings((String)System.getenv(ApplicationConstants.Environment.LOCAL_DIRS.name()));
                conf.setStrings("mapreduce.cluster.local.dir", localSysDirs);
                LOG.info("mapreduce.cluster.local.dir for uber task: " + conf.get("mapreduce.cluster.local.dir"));
                conf.setBoolean("mapreduce.task.uberized", true);
                task.setEncryptedSpillKey(LocalContainerLauncher.this.encryptedSpillKey);
                YarnChild.setEncryptedSpillKeyIfRequired(task);
                if (taskType == TaskType.MAP) {
                    if (this.doneWithMaps) {
                        LOG.error("CONTAINER_REMOTE_LAUNCH contains a map task (" + attemptID + "), but should be finished with maps");
                        throw new RuntimeException();
                    }
                    MapTask map = (MapTask)task;
                    map.setConf(conf);
                    map.run(conf, LocalContainerLauncher.this.umbilical);
                    if (renameOutputs) {
                        MapOutputFile renamed = LocalContainerLauncher.renameMapOutputForReduce(conf, attemptID, map.getMapOutputFile());
                        localMapFiles.put(classicAttemptID, renamed);
                    }
                    this.relocalize();
                    if (++this.finishedSubMaps == numMapTasks) {
                        this.doneWithMaps = true;
                    }
                } else {
                    if (!this.doneWithMaps) {
                        LOG.error("CONTAINER_REMOTE_LAUNCH contains a reduce task (" + attemptID + "), but not yet finished with maps");
                        throw new RuntimeException();
                    }
                    conf.set("mapreduce.framework.name", "local");
                    conf.set("mapreduce.jobtracker.address", "local");
                    ReduceTask reduce = (ReduceTask)task;
                    reduce.setLocalMapFiles(localMapFiles);
                    reduce.setConf(conf);
                    reduce.run(conf, LocalContainerLauncher.this.umbilical);
                    this.relocalize();
                }
            }
            catch (FSError e) {
                LOG.fatal("FSError from child", e);
                if (!ShutdownHookManager.get().isShutdownInProgress()) {
                    LocalContainerLauncher.this.umbilical.fsError(classicAttemptID, e.getMessage());
                }
                throw new RuntimeException();
            }
            catch (Exception exception) {
                LOG.warn("Exception running local (uberized) 'child' : " + StringUtils.stringifyException((Throwable)exception));
                try {
                    if (task != null) {
                        task.taskCleanup(LocalContainerLauncher.this.umbilical);
                    }
                }
                catch (Exception e) {
                    LOG.info("Exception cleaning up: " + StringUtils.stringifyException((Throwable)e));
                }
                LocalContainerLauncher.this.umbilical.reportDiagnosticInfo(classicAttemptID, StringUtils.stringifyException((Throwable)exception));
                throw new RuntimeException();
            }
            catch (Throwable throwable) {
                LOG.fatal("Error running local (uberized) 'child' : " + StringUtils.stringifyException((Throwable)throwable));
                if (!ShutdownHookManager.get().isShutdownInProgress()) {
                    Throwable tCause = throwable.getCause();
                    String cause = tCause == null ? throwable.getMessage() : StringUtils.stringifyException((Throwable)tCause);
                    LocalContainerLauncher.this.umbilical.fatalError(classicAttemptID, cause);
                }
                throw new RuntimeException();
            }
        }

        private void relocalize() {
            File[] curLocalFiles = curDir.listFiles();
            if (curLocalFiles != null) {
                for (int j = 0; j < curLocalFiles.length; ++j) {
                    if (LocalContainerLauncher.this.localizedFiles.contains(curLocalFiles[j])) continue;
                    boolean deleted = false;
                    try {
                        if (LocalContainerLauncher.this.curFC != null) {
                            deleted = LocalContainerLauncher.this.curFC.delete(new Path(curLocalFiles[j].getName()), true);
                        }
                    }
                    catch (IOException e) {
                        deleted = false;
                    }
                    if (deleted) continue;
                    LOG.warn("Unable to delete unexpected local file/dir " + curLocalFiles[j].getName() + ": insufficient permissions?");
                }
            }
        }
    }
}

