/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.PartitionRequestClient;
import org.apache.flink.runtime.io.network.netty.ClientOutboundMessage;
import org.apache.flink.runtime.io.network.netty.ConnectionErrorMessage;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
import org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyPartitionRequestClient
implements PartitionRequestClient {
    private static final Logger LOG = LoggerFactory.getLogger(NettyPartitionRequestClient.class);
    private final Channel tcpChannel;
    private final NetworkClientHandler clientHandler;
    private final ConnectionID connectionId;
    private final PartitionRequestClientFactory clientFactory;
    private final AtomicInteger closeReferenceCounter = new AtomicInteger(0);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    NettyPartitionRequestClient(Channel tcpChannel, NetworkClientHandler clientHandler, ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
        this.tcpChannel = Preconditions.checkNotNull(tcpChannel);
        this.clientHandler = Preconditions.checkNotNull(clientHandler);
        this.connectionId = Preconditions.checkNotNull(connectionId);
        this.clientFactory = Preconditions.checkNotNull(clientFactory);
    }

    boolean canBeDisposed() {
        return this.closeReferenceCounter.get() == 0 && !this.canBeReused();
    }

    boolean validateClientAndIncrementReferenceCounter() {
        if (!this.clientHandler.hasChannelError()) {
            return this.closeReferenceCounter.incrementAndGet() > 0;
        }
        return false;
    }

    @Override
    public void requestSubpartition(ResultPartitionID partitionId, int subpartitionIndex, final RemoteInputChannel inputChannel, int delayMs) throws IOException {
        this.checkNotClosed();
        LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.", new Object[]{subpartitionIndex, partitionId, delayMs});
        this.clientHandler.addInputChannel(inputChannel);
        final NettyMessage.PartitionRequest request = new NettyMessage.PartitionRequest(partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());
        final ChannelFutureListener listener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    NettyPartitionRequestClient.this.clientHandler.removeInputChannel(inputChannel);
                    inputChannel.onError(new LocalTransportException(String.format("Sending the partition request to '%s (#%d)' failed.", NettyPartitionRequestClient.this.connectionId.getAddress(), NettyPartitionRequestClient.this.connectionId.getConnectionIndex()), future.channel().localAddress(), future.cause()));
                    NettyPartitionRequestClient.this.sendToChannel(new ConnectionErrorMessage(future.cause() == null ? new RuntimeException("Cannot send partition request.") : future.cause()));
                }
            }
        };
        if (delayMs == 0) {
            ChannelFuture f = this.tcpChannel.writeAndFlush((Object)request);
            f.addListener((GenericFutureListener)listener);
        } else {
            final ChannelFuture[] f = new ChannelFuture[1];
            this.tcpChannel.eventLoop().schedule(new Runnable(){

                @Override
                public void run() {
                    f[0] = NettyPartitionRequestClient.this.tcpChannel.writeAndFlush((Object)request);
                    f[0].addListener((GenericFutureListener)listener);
                }
            }, (long)delayMs, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel inputChannel) throws IOException {
        this.checkNotClosed();
        this.tcpChannel.writeAndFlush((Object)new NettyMessage.TaskEventRequest(event, partitionId, inputChannel.getInputChannelId())).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    inputChannel.onError(new LocalTransportException(String.format("Sending the task event to '%s (#%d)' failed.", NettyPartitionRequestClient.this.connectionId.getAddress(), NettyPartitionRequestClient.this.connectionId.getConnectionIndex()), future.channel().localAddress(), future.cause()));
                    NettyPartitionRequestClient.this.sendToChannel(new ConnectionErrorMessage(future.cause() == null ? new RuntimeException("Cannot send task event.") : future.cause()));
                }
            }
        });
    }

    @Override
    public void notifyCreditAvailable(RemoteInputChannel inputChannel) {
        this.sendToChannel(new AddCreditMessage(inputChannel));
    }

    @Override
    public void notifyNewBufferSize(RemoteInputChannel inputChannel, int bufferSize) {
        this.sendToChannel(new NewBufferSizeMessage(inputChannel, bufferSize));
    }

    @Override
    public void resumeConsumption(RemoteInputChannel inputChannel) {
        this.sendToChannel(new ResumeConsumptionMessage(inputChannel));
    }

    @Override
    public void acknowledgeAllRecordsProcessed(RemoteInputChannel inputChannel) {
        this.sendToChannel(new AcknowledgeAllRecordsProcessedMessage(inputChannel));
    }

    private void sendToChannel(Object message) {
        this.tcpChannel.eventLoop().execute(() -> this.tcpChannel.pipeline().fireUserEventTriggered(message));
    }

    @Override
    public void close(RemoteInputChannel inputChannel) throws IOException {
        this.clientHandler.removeInputChannel(inputChannel);
        if (this.closeReferenceCounter.updateAndGet(count -> Math.max(count - 1, 0)) == 0 && !this.canBeReused()) {
            this.closeConnection();
        } else {
            this.clientHandler.cancelRequestFor(inputChannel.getInputChannelId());
        }
    }

    public void closeConnection() {
        Preconditions.checkState(this.canBeDisposed(), "The connection should not be closed before disposed.");
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.tcpChannel.writeAndFlush((Object)new NettyMessage.CloseRequest()).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
        this.clientFactory.destroyPartitionRequestClient(this.connectionId, this);
    }

    private boolean canBeReused() {
        return this.clientFactory.isConnectionReuseEnabled() && !this.clientHandler.hasChannelError();
    }

    private void checkNotClosed() throws IOException {
        if (this.closed.get()) {
            SocketAddress localAddr = this.tcpChannel.localAddress();
            SocketAddress remoteAddr = this.tcpChannel.remoteAddress();
            throw new LocalTransportException(String.format("Channel to '%s' closed.", remoteAddr), localAddr);
        }
    }

    private static class AcknowledgeAllRecordsProcessedMessage
    extends ClientOutboundMessage {
        private AcknowledgeAllRecordsProcessedMessage(RemoteInputChannel inputChannel) {
            super(Preconditions.checkNotNull(inputChannel));
        }

        @Override
        Object buildMessage() {
            return new NettyMessage.AckAllUserRecordsProcessed(this.inputChannel.getInputChannelId());
        }
    }

    private static class ResumeConsumptionMessage
    extends ClientOutboundMessage {
        private ResumeConsumptionMessage(RemoteInputChannel inputChannel) {
            super(Preconditions.checkNotNull(inputChannel));
        }

        @Override
        Object buildMessage() {
            return new NettyMessage.ResumeConsumption(this.inputChannel.getInputChannelId());
        }
    }

    private static class NewBufferSizeMessage
    extends ClientOutboundMessage {
        private final int bufferSize;

        private NewBufferSizeMessage(RemoteInputChannel inputChannel, int bufferSize) {
            super(Preconditions.checkNotNull(inputChannel));
            this.bufferSize = bufferSize;
        }

        @Override
        Object buildMessage() {
            return new NettyMessage.NewBufferSize(this.bufferSize, this.inputChannel.getInputChannelId());
        }
    }

    private static class AddCreditMessage
    extends ClientOutboundMessage {
        private AddCreditMessage(RemoteInputChannel inputChannel) {
            super(Preconditions.checkNotNull(inputChannel));
        }

        @Override
        Object buildMessage() {
            int credits = this.inputChannel.getAndResetUnannouncedCredit();
            return credits > 0 ? new NettyMessage.AddCredit(credits, this.inputChannel.getInputChannelId()) : null;
        }
    }
}

