/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tika.batch;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.tika.batch.FileConsumerFutureResult;
import org.apache.tika.batch.FileResource;
import org.apache.tika.batch.FileStarted;
import org.apache.tika.batch.IFileProcessorFutureResult;
import org.apache.tika.batch.PoisonFileResource;
import org.apache.tika.metadata.Metadata;
import org.apache.tika.parser.ParseContext;
import org.apache.tika.parser.Parser;
import org.apache.tika.sax.SafeContentHandler;
import org.apache.tika.sax.ToXMLContentHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;

public abstract class FileResourceConsumer
implements Callable<IFileProcessorFutureResult> {
    protected static final Logger LOG = LoggerFactory.getLogger(FileResourceConsumer.class);
    public static String TIMED_OUT = "timed_out";
    public static String OOM = "oom";
    public static String IO_IS = "io_on_inputstream";
    public static String IO_OS = "io_on_outputstream";
    public static String PARSE_ERR = "parse_err";
    public static String PARSE_EX = "parse_ex";
    public static String ELAPSED_MILLIS = "elapsedMS";
    private static AtomicInteger numConsumers = new AtomicInteger(-1);
    private long maxConsecWaitInMillis = 600000L;
    private final ArrayBlockingQueue<FileResource> fileQueue;
    private final int consumerId;
    private final Object lock = new Object();
    private FileStarted currentFile = null;
    private volatile int numResourcesConsumed = 0;
    private volatile int numHandledExceptions = 0;
    private volatile STATE currentState = STATE.NOT_YET_STARTED;

    public FileResourceConsumer(ArrayBlockingQueue<FileResource> fileQueue) {
        this.fileQueue = fileQueue;
        this.consumerId = numConsumers.incrementAndGet();
    }

    @Override
    public IFileProcessorFutureResult call() {
        this.currentState = STATE.ACTIVELY_CONSUMING;
        try {
            FileResource fileResource = this.getNextFileResource();
            while (fileResource != null) {
                LOG.trace("file consumer is about to process: {}", (Object)fileResource.getResourceId());
                boolean consumed = this._processFileResource(fileResource);
                LOG.trace("file consumer has finished processing: {}", (Object)fileResource.getResourceId());
                if (consumed) {
                    ++this.numResourcesConsumed;
                }
                fileResource = this.getNextFileResource();
            }
        }
        catch (InterruptedException e) {
            this.setEndedState(STATE.THREAD_INTERRUPTED);
        }
        this.setEndedState(STATE.COMPLETED);
        return new FileConsumerFutureResult(this.currentFile, this.numResourcesConsumed);
    }

    public abstract boolean processFileResource(FileResource var1);

    protected void incrementHandledExceptions() {
        ++this.numHandledExceptions;
    }

    public boolean isStillActive() {
        if (Thread.currentThread().isInterrupted()) {
            return false;
        }
        return this.currentState == STATE.NOT_YET_STARTED || this.currentState == STATE.ACTIVELY_CONSUMING || this.currentState == STATE.ASKED_TO_SHUTDOWN;
    }

    private boolean _processFileResource(FileResource fileResource) {
        this.currentFile = new FileStarted(fileResource.getResourceId());
        boolean consumed = false;
        try {
            consumed = this.processFileResource(fileResource);
        }
        catch (RuntimeException e) {
            this.setEndedState(STATE.CONSUMER_EXCEPTION);
            throw e;
        }
        catch (Error e) {
            this.setEndedState(STATE.CONSUMER_ERROR);
            throw e;
        }
        this.currentFile = null;
        return consumed;
    }

    public void pleaseShutdown() {
        this.setEndedState(STATE.ASKED_TO_SHUTDOWN);
    }

    public FileStarted getCurrentFile() {
        return this.currentFile;
    }

    public int getNumResourcesConsumed() {
        return this.numResourcesConsumed;
    }

    public int getNumHandledExceptions() {
        return this.numHandledExceptions;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FileStarted checkForTimedOutMillis(long staleThresholdMillis) {
        if (this.currentFile == null) {
            return null;
        }
        if (staleThresholdMillis < 0L) {
            return null;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.currentState != STATE.ACTIVELY_CONSUMING && this.currentState != STATE.ASKED_TO_SHUTDOWN) {
                return null;
            }
            FileStarted tmp = this.currentFile;
            if (tmp == null) {
                return null;
            }
            if (tmp.getElapsedMillis() > staleThresholdMillis) {
                this.setEndedState(STATE.TIMED_OUT);
                LOG.error("{}", (Object)this.getXMLifiedLogMsg(TIMED_OUT, tmp.getResourceId(), ELAPSED_MILLIS, Long.toString(tmp.getElapsedMillis())));
                return tmp;
            }
        }
        return null;
    }

    protected String getXMLifiedLogMsg(String type, String resourceId, String ... attrs) {
        return this.getXMLifiedLogMsg(type, resourceId, (Throwable)null, attrs);
    }

    protected String getXMLifiedLogMsg(String type, String resourceId, Throwable t, String ... attrs) {
        ToXMLContentHandler toXML = new ToXMLContentHandler();
        SafeContentHandler handler = new SafeContentHandler(toXML);
        AttributesImpl attributes = new AttributesImpl();
        attributes.addAttribute("", "resourceId", "resourceId", "", resourceId);
        for (int i = 0; i < attrs.length - 1; ++i) {
            attributes.addAttribute("", attrs[i], attrs[i], "", attrs[i + 1]);
        }
        try {
            handler.startDocument();
            handler.startElement("", type, type, attributes);
            if (t != null) {
                StringWriter stackWriter = new StringWriter();
                PrintWriter printWriter = new PrintWriter(stackWriter);
                t.printStackTrace(printWriter);
                printWriter.flush();
                stackWriter.flush();
                char[] chars = stackWriter.toString().toCharArray();
                handler.characters(chars, 0, chars.length);
            }
            handler.endElement("", type, type);
            handler.endDocument();
        }
        catch (SAXException e) {
            LOG.warn("error writing xml stream for: {}", (Object)resourceId, (Object)t);
        }
        return handler.toString();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private FileResource getNextFileResource() throws InterruptedException {
        FileResource fileResource = null;
        long start = System.currentTimeMillis();
        while (fileResource == null) {
            if (Thread.currentThread().isInterrupted()) {
                this.setEndedState(STATE.THREAD_INTERRUPTED);
                LOG.debug("Consumer thread was interrupted.");
                break;
            }
            Object object = this.lock;
            synchronized (object) {
                if (this.currentState != STATE.ACTIVELY_CONSUMING) {
                    LOG.debug("Consumer already closed because of: {}", (Object)this.currentState);
                    break;
                }
            }
            fileResource = this.fileQueue.poll(1L, TimeUnit.SECONDS);
            if (fileResource != null) {
                if (!(fileResource instanceof PoisonFileResource)) break;
                this.setEndedState(STATE.SWALLOWED_POISON);
                fileResource = null;
                break;
            }
            LOG.debug("{} is waiting for file and the queue size is: {}", (Object)this.consumerId, (Object)this.fileQueue.size());
            long elapsed = System.currentTimeMillis() - start;
            if (this.maxConsecWaitInMillis <= 0L || elapsed <= this.maxConsecWaitInMillis) continue;
            this.setEndedState(STATE.EXCEEDED_MAX_CONSEC_WAIT_MILLIS);
            break;
        }
        return fileResource;
    }

    protected void close(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            }
            catch (IOException e) {
                LOG.warn(e.getMessage(), e);
            }
        }
        closeable = null;
    }

    protected void flushAndClose(Closeable closeable) {
        if (closeable == null) {
            return;
        }
        if (closeable instanceof Flushable) {
            try {
                ((Flushable)((Object)closeable)).flush();
            }
            catch (IOException e) {
                LOG.warn(e.getMessage(), e);
            }
        }
        this.close(closeable);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void setEndedState(STATE cause) {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentState == STATE.NOT_YET_STARTED || this.currentState == STATE.ACTIVELY_CONSUMING || this.currentState == STATE.ASKED_TO_SHUTDOWN) {
                this.currentState = cause;
            }
        }
    }

    protected void parse(String resourceId, Parser parser, InputStream is, ContentHandler handler, Metadata m, ParseContext parseContext) throws Throwable {
        try {
            parser.parse(is, handler, m, parseContext);
        }
        catch (Throwable t) {
            if (t instanceof OutOfMemoryError) {
                LOG.error(this.getXMLifiedLogMsg(OOM, resourceId, t, new String[0]));
            } else if (t instanceof Error) {
                LOG.error(this.getXMLifiedLogMsg(PARSE_ERR, resourceId, t, new String[0]));
            } else {
                LOG.warn(this.getXMLifiedLogMsg(PARSE_EX, resourceId, t, new String[0]));
                this.incrementHandledExceptions();
            }
            throw t;
        }
        finally {
            this.close(is);
        }
    }

    private static enum STATE {
        NOT_YET_STARTED,
        ACTIVELY_CONSUMING,
        SWALLOWED_POISON,
        THREAD_INTERRUPTED,
        EXCEEDED_MAX_CONSEC_WAIT_MILLIS,
        ASKED_TO_SHUTDOWN,
        TIMED_OUT,
        CONSUMER_EXCEPTION,
        CONSUMER_ERROR,
        COMPLETED;

    }
}

