/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.SubtaskAccess;
import org.apache.flink.runtime.operators.coordination.util.IncompleteFuturesTracker;
import org.apache.flink.runtime.util.Runnables;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;

class SubtaskGatewayImpl
implements OperatorCoordinator.SubtaskGateway {
    private static final String EVENT_LOSS_ERROR_MESSAGE = "An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency. Event: '%s', targetTask: %s";
    private static final long NO_CHECKPOINT = Long.MIN_VALUE;
    private final SubtaskAccess subtaskAccess;
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final IncompleteFuturesTracker incompleteFuturesTracker;
    private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap;
    private final TreeSet<Long> currentMarkedCheckpointIds;
    private long latestAttemptedCheckpointId;

    SubtaskGatewayImpl(SubtaskAccess subtaskAccess, ComponentMainThreadExecutor mainThreadExecutor, IncompleteFuturesTracker incompleteFuturesTracker) {
        this.subtaskAccess = subtaskAccess;
        this.mainThreadExecutor = mainThreadExecutor;
        this.incompleteFuturesTracker = incompleteFuturesTracker;
        this.blockedEventsMap = new TreeMap();
        this.currentMarkedCheckpointIds = new TreeSet();
        this.latestAttemptedCheckpointId = Long.MIN_VALUE;
    }

    @Override
    public CompletableFuture<Acknowledge> sendEvent(OperatorEvent evt) {
        SerializedValue<OperatorEvent> serializedEvent;
        if (!this.isReady()) {
            throw new FlinkRuntimeException("SubtaskGateway is not ready, task not yet running.");
        }
        try {
            serializedEvent = new SerializedValue<OperatorEvent>(evt);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Cannot serialize operator event", e);
        }
        Callable<CompletableFuture<Acknowledge>> sendAction = this.subtaskAccess.createEventSendAction(serializedEvent);
        CompletableFuture sendResult = new CompletableFuture();
        CompletionStage result = sendResult.whenCompleteAsync((success, failure) -> {
            if (failure != null && this.subtaskAccess.isStillRunning()) {
                String msg = String.format(EVENT_LOSS_ERROR_MESSAGE, evt, this.subtaskAccess.subtaskName());
                Runnables.assertNoException(() -> this.subtaskAccess.triggerTaskFailover(new FlinkException(msg, (Throwable)failure)));
            }
        }, (Executor)this.mainThreadExecutor);
        this.mainThreadExecutor.execute(() -> this.lambda$sendEvent$2(sendAction, sendResult, (CompletableFuture)result));
        return result;
    }

    private void sendEventInternal(Callable<CompletableFuture<Acknowledge>> sendAction, CompletableFuture<Acknowledge> result) {
        this.checkRunsInMainThread();
        if (!this.blockedEventsMap.isEmpty()) {
            this.blockedEventsMap.lastEntry().getValue().add(new BlockedEvent(sendAction, result));
        } else {
            this.callSendAction(sendAction, result);
        }
    }

    private void callSendAction(Callable<CompletableFuture<Acknowledge>> sendAction, CompletableFuture<Acknowledge> result) {
        try {
            CompletableFuture<Acknowledge> sendResult = sendAction.call();
            FutureUtils.forward(sendResult, result);
        }
        catch (Throwable t) {
            ExceptionUtils.rethrowIfFatalError(t);
            result.completeExceptionally(t);
        }
    }

    @Override
    public ExecutionAttemptID getExecution() {
        return this.subtaskAccess.currentAttempt();
    }

    @Override
    public int getSubtask() {
        return this.subtaskAccess.getSubtaskIndex();
    }

    private boolean isReady() {
        return this.subtaskAccess.hasSwitchedToRunning().isDone();
    }

    void markForCheckpoint(long checkpointId) {
        this.checkRunsInMainThread();
        if (checkpointId <= this.latestAttemptedCheckpointId) {
            throw new IllegalStateException(String.format("Regressing checkpoint IDs. Previous checkpointId = %d, new checkpointId = %d", this.latestAttemptedCheckpointId, checkpointId));
        }
        this.currentMarkedCheckpointIds.add(checkpointId);
        this.latestAttemptedCheckpointId = checkpointId;
    }

    boolean tryCloseGateway(long checkpointId) {
        this.checkRunsInMainThread();
        if (this.currentMarkedCheckpointIds.contains(checkpointId)) {
            this.blockedEventsMap.putIfAbsent(checkpointId, new LinkedList());
            return true;
        }
        return false;
    }

    void openGatewayAndUnmarkCheckpoint(long checkpointId) {
        this.checkRunsInMainThread();
        if (this.latestAttemptedCheckpointId < checkpointId) {
            throw new IllegalStateException(String.format("Trying to open gateway for unseen checkpoint: latest known checkpoint = %d, incoming checkpoint = %d", this.latestAttemptedCheckpointId, checkpointId));
        }
        if (!this.currentMarkedCheckpointIds.contains(checkpointId)) {
            return;
        }
        if (this.blockedEventsMap.containsKey(checkpointId)) {
            if (this.blockedEventsMap.firstKey() == checkpointId) {
                for (BlockedEvent blockedEvent : this.blockedEventsMap.firstEntry().getValue()) {
                    this.callSendAction(blockedEvent.sendAction, blockedEvent.future);
                }
            } else {
                this.blockedEventsMap.floorEntry(checkpointId - 1L).getValue().addAll((Collection<BlockedEvent>)this.blockedEventsMap.get(checkpointId));
            }
            this.blockedEventsMap.remove(checkpointId);
        }
        this.currentMarkedCheckpointIds.remove(checkpointId);
    }

    void openGatewayAndUnmarkAllCheckpoint() {
        this.checkRunsInMainThread();
        for (List<BlockedEvent> blockedEvents : this.blockedEventsMap.values()) {
            for (BlockedEvent blockedEvent : blockedEvents) {
                this.callSendAction(blockedEvent.sendAction, blockedEvent.future);
            }
        }
        this.blockedEventsMap.clear();
        this.currentMarkedCheckpointIds.clear();
    }

    void openGatewayAndUnmarkLastCheckpointIfAny() {
        if (!this.currentMarkedCheckpointIds.isEmpty()) {
            this.openGatewayAndUnmarkCheckpoint(this.currentMarkedCheckpointIds.last());
        }
    }

    private void checkRunsInMainThread() {
        this.mainThreadExecutor.assertRunningInMainThread();
    }

    private /* synthetic */ void lambda$sendEvent$2(Callable sendAction, CompletableFuture sendResult, CompletableFuture result) {
        this.sendEventInternal(sendAction, sendResult);
        this.incompleteFuturesTracker.trackFutureWhileIncomplete(result);
    }

    private static final class BlockedEvent {
        private final Callable<CompletableFuture<Acknowledge>> sendAction;
        private final CompletableFuture<Acknowledge> future;

        BlockedEvent(Callable<CompletableFuture<Acknowledge>> sendAction, CompletableFuture<Acknowledge> future) {
            this.sendAction = sendAction;
            this.future = future;
        }
    }
}

