/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.obs;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.obs.services.exception.ObsException;
import com.obs.services.model.CompleteMultipartUploadResult;
import com.obs.services.model.PartEtag;
import com.obs.services.model.PutObjectRequest;
import com.obs.services.model.PutObjectResult;
import com.obs.services.model.UploadPartRequest;
import com.obs.services.model.UploadPartResult;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.obs.OBSDataBlocks;
import org.apache.hadoop.fs.obs.OBSFileSystem;
import org.apache.hadoop.fs.obs.OBSInstrumentation;
import org.apache.hadoop.fs.obs.OBSUtils;
import org.apache.hadoop.fs.obs.Statistic;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
class OBSBlockOutputStream
extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(OBSBlockOutputStream.class);
    private final OBSFileSystem fs;
    private final String key;
    private final int blockSize;
    private final ListeningExecutorService executorService;
    private final RetryPolicy retryPolicy = RetryPolicies.retryUpToMaximumCountWithProportionalSleep((int)5, (long)2000L, (TimeUnit)TimeUnit.MILLISECONDS);
    private final OBSDataBlocks.BlockFactory blockFactory;
    private final byte[] singleCharWrite = new byte[1];
    private MultiPartUpload multiPartUpload;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private OBSDataBlocks.DataBlock activeBlock;
    private long blockCount = 0L;
    private final OBSInstrumentation.OutputStreamStatistics statistics;
    private final OBSFileSystem.OBSWriteOperationHelper writeOperationHelper;

    OBSBlockOutputStream(OBSFileSystem fs, String key, ExecutorService executorService, Progressable progress, long blockSize, OBSDataBlocks.BlockFactory blockFactory, OBSInstrumentation.OutputStreamStatistics statistics, OBSFileSystem.OBSWriteOperationHelper writeOperationHelper) throws IOException {
        this.fs = fs;
        this.key = key;
        this.blockFactory = blockFactory;
        this.blockSize = (int)blockSize;
        this.statistics = statistics;
        this.writeOperationHelper = writeOperationHelper;
        Preconditions.checkArgument((blockSize >= 0x500000L ? 1 : 0) != 0, (String)"Block size is too small: %d", (Object[])new Object[]{blockSize});
        this.executorService = MoreExecutors.listeningDecorator((ExecutorService)executorService);
        this.multiPartUpload = null;
        this.createBlockIfNeeded();
        LOG.debug("Initialized OBSBlockOutputStream for {} output to {}", (Object)writeOperationHelper, (Object)this.activeBlock);
    }

    private synchronized OBSDataBlocks.DataBlock createBlockIfNeeded() throws IOException {
        if (this.activeBlock == null) {
            ++this.blockCount;
            if (this.blockCount >= 10000L) {
                LOG.error("Number of partitions in stream exceeds limit for OBS: 10000 write may fail.");
            }
            this.activeBlock = this.blockFactory.create(this.blockCount, this.blockSize, this.statistics);
        }
        return this.activeBlock;
    }

    private synchronized OBSDataBlocks.DataBlock getActiveBlock() {
        return this.activeBlock;
    }

    private synchronized boolean hasActiveBlock() {
        return this.activeBlock != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearActiveBlock() {
        if (this.activeBlock != null) {
            LOG.debug("Clearing active block");
        }
        OBSBlockOutputStream oBSBlockOutputStream = this;
        synchronized (oBSBlockOutputStream) {
            this.activeBlock = null;
        }
    }

    void checkOpen() throws IOException {
        if (this.closed.get()) {
            throw new IOException("Filesystem " + this.writeOperationHelper + " closed");
        }
    }

    @Override
    public synchronized void flush() throws IOException {
        this.checkOpen();
        OBSDataBlocks.DataBlock dataBlock = this.getActiveBlock();
        if (dataBlock != null) {
            dataBlock.flush();
        }
    }

    @Override
    public synchronized void write(int b) throws IOException {
        this.singleCharWrite[0] = (byte)b;
        this.write(this.singleCharWrite, 0, 1);
    }

    @Override
    public synchronized void write(byte[] source, int offset, int len) throws IOException {
        OBSDataBlocks.validateWriteArgs(source, offset, len);
        this.checkOpen();
        if (len == 0) {
            return;
        }
        OBSDataBlocks.DataBlock block = this.createBlockIfNeeded();
        int written = block.write(source, offset, len);
        int remainingCapacity = block.remainingCapacity();
        if (written < len) {
            LOG.debug("writing more data than block has capacity -triggering upload");
            this.uploadCurrentBlock();
            this.write(source, offset + written, len - written);
        } else if (remainingCapacity == 0) {
            this.uploadCurrentBlock();
        }
    }

    private synchronized void uploadCurrentBlock() throws IOException {
        Preconditions.checkState((boolean)this.hasActiveBlock(), (Object)"No active block");
        LOG.debug("Writing block # {}", (Object)this.blockCount);
        if (this.multiPartUpload == null) {
            LOG.debug("Initiating Multipart upload");
            this.multiPartUpload = new MultiPartUpload();
        }
        try {
            this.multiPartUpload.uploadBlockAsync(this.getActiveBlock());
        }
        finally {
            this.clearActiveBlock();
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            LOG.debug("Ignoring close() as stream is already closed");
            return;
        }
        OBSDataBlocks.DataBlock block = this.getActiveBlock();
        boolean hasBlock = this.hasActiveBlock();
        LOG.debug("{}: Closing block #{}: current block= {}", new Object[]{this, this.blockCount, hasBlock ? block : "(none)"});
        try {
            if (this.multiPartUpload == null) {
                if (hasBlock) {
                    this.putObject();
                }
            } else {
                if (hasBlock && block.hasData()) {
                    this.uploadCurrentBlock();
                }
                List partETags = this.multiPartUpload.waitForAllPartUploads();
                this.multiPartUpload.complete(partETags);
            }
            LOG.debug("Upload complete for {}", (Object)this.writeOperationHelper);
        }
        catch (IOException ioe) {
            try {
                this.writeOperationHelper.writeFailed(ioe);
                throw ioe;
            }
            catch (Throwable throwable) {
                OBSUtils.closeAll(LOG, block, this.blockFactory);
                LOG.debug("Statistics: {}", (Object)this.statistics);
                OBSUtils.closeAll(LOG, this.statistics);
                this.clearActiveBlock();
                throw throwable;
            }
        }
        OBSUtils.closeAll(LOG, block, this.blockFactory);
        LOG.debug("Statistics: {}", (Object)this.statistics);
        OBSUtils.closeAll(LOG, this.statistics);
        this.clearActiveBlock();
        this.writeOperationHelper.writeSuccessful();
    }

    private void putObject() throws IOException {
        LOG.debug("Executing regular upload for {}", (Object)this.writeOperationHelper);
        final OBSDataBlocks.DataBlock block = this.getActiveBlock();
        final int size = block.dataSize();
        final OBSDataBlocks.BlockUploadData uploadData = block.startUpload();
        final PutObjectRequest putObjectRequest = uploadData.hasFile() ? this.writeOperationHelper.newPutRequest(uploadData.getFile()) : this.writeOperationHelper.newPutRequest(uploadData.getUploadStream(), size);
        long transferQueueTime = this.now();
        this.statistics.blockUploadQueued(size);
        this.incrementWriteOperations();
        final long uploadQueuedTime = this.now();
        ListenableFuture putObjectResult = this.executorService.submit((Callable)new Callable<PutObjectResult>(){

            @Override
            public PutObjectResult call() throws Exception {
                PutObjectResult result;
                long uploadStartTime = OBSBlockOutputStream.this.now();
                OBSBlockOutputStream.this.statistics.blockUploadStarted(uploadStartTime - uploadQueuedTime, size);
                try {
                    result = OBSBlockOutputStream.this.writeOperationHelper.putObject(putObjectRequest);
                    long uploadCompletedTime = OBSBlockOutputStream.this.now();
                    OBSBlockOutputStream.this.statistics.blockUploadCompleted(uploadCompletedTime - uploadStartTime, size);
                }
                catch (Exception e) {
                    try {
                        long uploadFailedTime = OBSBlockOutputStream.this.now();
                        OBSBlockOutputStream.this.statistics.blockUploadFailed(uploadFailedTime - uploadStartTime, size);
                        throw e;
                    }
                    catch (Throwable throwable) {
                        OBSUtils.closeAll(LOG, uploadData, block);
                        throw throwable;
                    }
                }
                OBSUtils.closeAll(LOG, uploadData, block);
                return result;
            }
        });
        this.clearActiveBlock();
        try {
            putObjectResult.get();
        }
        catch (InterruptedException ie) {
            LOG.warn("Interrupted object upload", (Throwable)ie);
            Thread.currentThread().interrupt();
        }
        catch (ExecutionException ee) {
            throw OBSUtils.extractException("regular upload", this.key, ee);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("OBSBlockOutputStream{");
        sb.append(this.writeOperationHelper.toString());
        sb.append(", blockSize=").append(this.blockSize);
        OBSDataBlocks.DataBlock block = this.activeBlock;
        if (block != null) {
            sb.append(", activeBlock=").append(block);
        }
        sb.append('}');
        return sb.toString();
    }

    private void incrementWriteOperations() {
        this.fs.incrementWriteOperations();
    }

    private long now() {
        return System.currentTimeMillis();
    }

    OBSInstrumentation.OutputStreamStatistics getStatistics() {
        return this.statistics;
    }

    private class MultiPartUpload {
        private final String uploadId;
        private final List<ListenableFuture<PartEtag>> partETagsFutures;

        MultiPartUpload() throws IOException {
            this.uploadId = OBSBlockOutputStream.this.writeOperationHelper.initiateMultiPartUpload();
            this.partETagsFutures = new ArrayList<ListenableFuture<PartEtag>>(2);
            LOG.debug("Initiated multi-part upload for {} with id '{}'", (Object)OBSBlockOutputStream.this.writeOperationHelper, (Object)this.uploadId);
        }

        private void uploadBlockAsync(final OBSDataBlocks.DataBlock block) throws IOException {
            LOG.debug("Queueing upload of {}", (Object)block);
            int size = block.dataSize();
            final OBSDataBlocks.BlockUploadData uploadData = block.startUpload();
            final int currentPartNumber = this.partETagsFutures.size() + 1;
            final UploadPartRequest request = OBSBlockOutputStream.this.writeOperationHelper.newUploadPartRequest(this.uploadId, currentPartNumber, size, uploadData.getUploadStream(), uploadData.getFile());
            long transferQueueTime = OBSBlockOutputStream.this.now();
            OBSBlockOutputStream.this.statistics.blockUploadQueued(block.dataSize());
            ListenableFuture partETagFuture = OBSBlockOutputStream.this.executorService.submit((Callable)new Callable<PartEtag>(){

                @Override
                public PartEtag call() throws Exception {
                    PartEtag partETag;
                    LOG.debug("Uploading part {} for id '{}'", (Object)currentPartNumber, (Object)MultiPartUpload.this.uploadId);
                    try {
                        UploadPartResult uploadPartResult = OBSBlockOutputStream.this.fs.uploadPart(request);
                        partETag = new PartEtag(uploadPartResult.getEtag(), Integer.valueOf(uploadPartResult.getPartNumber()));
                        LOG.debug("Completed upload of {} to part {}", (Object)block, (Object)partETag);
                        LOG.debug("Stream statistics of {}", (Object)OBSBlockOutputStream.this.statistics);
                    }
                    catch (Throwable throwable) {
                        OBSUtils.closeAll(LOG, uploadData, block);
                        throw throwable;
                    }
                    OBSUtils.closeAll(LOG, uploadData, block);
                    return partETag;
                }
            });
            this.partETagsFutures.add((ListenableFuture<PartEtag>)partETagFuture);
        }

        private List<PartEtag> waitForAllPartUploads() throws IOException {
            LOG.debug("Waiting for {} uploads to complete", (Object)this.partETagsFutures.size());
            try {
                return (List)Futures.allAsList(this.partETagsFutures).get();
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted partUpload", (Throwable)ie);
                Thread.currentThread().interrupt();
                return null;
            }
            catch (ExecutionException ee) {
                LOG.debug("While waiting for upload completion", (Throwable)ee);
                LOG.debug("Cancelling futures");
                for (ListenableFuture<PartEtag> future : this.partETagsFutures) {
                    future.cancel(true);
                }
                this.abort();
                throw OBSUtils.extractException("Multi-part upload with id '" + this.uploadId + "' to " + OBSBlockOutputStream.this.key, OBSBlockOutputStream.this.key, ee);
            }
        }

        private CompleteMultipartUploadResult complete(List<PartEtag> partETags) throws IOException {
            int retryCount = 0;
            String operation = String.format("Completing multi-part upload for key '%s', id '%s' with %s partitions ", OBSBlockOutputStream.this.key, this.uploadId, partETags.size());
            while (true) {
                try {
                    LOG.debug(operation);
                    return OBSBlockOutputStream.this.writeOperationHelper.completeMultipartUpload(this.uploadId, partETags);
                }
                catch (ObsException e) {
                    ObsException lastException = e;
                    OBSBlockOutputStream.this.statistics.exceptionInMultipartComplete();
                    if (this.shouldRetry(operation, lastException, retryCount++)) continue;
                    throw OBSUtils.translateException(operation, OBSBlockOutputStream.this.key, lastException);
                }
                break;
            }
        }

        public void abort() {
            int retryCount = 0;
            OBSBlockOutputStream.this.fs.incrementStatistic(Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED);
            String operation = String.format("Aborting multi-part upload for '%s', id '%s", OBSBlockOutputStream.this.writeOperationHelper, this.uploadId);
            while (true) {
                try {
                    LOG.debug(operation);
                    OBSBlockOutputStream.this.writeOperationHelper.abortMultipartUpload(this.uploadId);
                    return;
                }
                catch (ObsException e) {
                    ObsException lastException = e;
                    OBSBlockOutputStream.this.statistics.exceptionInMultipartAbort();
                    if (this.shouldRetry(operation, lastException, retryCount++)) continue;
                    LOG.warn("Unable to abort multipart upload, you may need to purge  uploaded parts", (Throwable)lastException);
                    return;
                }
                break;
            }
        }

        private boolean shouldRetry(String operation, ObsException e, int retryCount) {
            try {
                boolean retry;
                RetryPolicy.RetryAction retryAction = OBSBlockOutputStream.this.retryPolicy.shouldRetry((Exception)((Object)e), retryCount, 0, true);
                boolean bl = retry = retryAction == RetryPolicy.RetryAction.RETRY;
                if (retry) {
                    OBSBlockOutputStream.this.fs.incrementStatistic(Statistic.IGNORED_ERRORS);
                    LOG.info("Retrying {} after exception ", (Object)operation, (Object)e);
                    Thread.sleep(retryAction.delayMillis);
                }
                return retry;
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                return false;
            }
            catch (Exception ignored) {
                return false;
            }
        }
    }
}

