/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.LatencyMarkerEmitter;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.FunctionWithException;

@Internal
public class SourceOperator<OUT, SplitT extends SourceSplit>
extends AbstractStreamOperator<OUT>
implements OperatorEventHandler,
PushingAsyncDataInput<OUT> {
    private static final long serialVersionUID = 1405537676017904695L;
    static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC = new ListStateDescriptor<byte[]>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
    private final FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory;
    private final SimpleVersionedSerializer<SplitT> splitSerializer;
    private final OperatorEventGateway operatorEventGateway;
    private final WatermarkStrategy<OUT> watermarkStrategy;
    private final WatermarkAlignmentParams watermarkAlignmentParams;
    private final Configuration configuration;
    private final String localHostname;
    private final boolean emitProgressiveWatermarks;
    private SourceReader<OUT, SplitT> sourceReader;
    private ReaderOutput<OUT> currentMainOutput;
    private PushingAsyncDataInput.DataOutput<OUT> lastInvokedOutput;
    private long lastEmittedWatermark = Watermark.UNINITIALIZED.getTimestamp();
    private ListState<SplitT> readerState;
    private TimestampsAndWatermarks<OUT> eventTimeLogic;
    private OperatingMode operatingMode;
    private final CompletableFuture<Void> finished = new CompletableFuture();
    private final SourceOperatorAvailabilityHelper availabilityHelper = new SourceOperatorAvailabilityHelper();
    private final List<SplitT> outputPendingSplits = new ArrayList<SplitT>();
    private InternalSourceReaderMetricGroup sourceMetricGroup;
    private long currentMaxDesiredWatermark = Watermark.MAX_WATERMARK.getTimestamp();
    private CompletableFuture<Void> waitingForAlignmentFuture = CompletableFuture.completedFuture(null);
    @Nullable
    private LatencyMarkerEmitter<OUT> latencyMarkerEmitter;

    public SourceOperator(FunctionWithException<SourceReaderContext, SourceReader<OUT, SplitT>, Exception> readerFactory, OperatorEventGateway operatorEventGateway, SimpleVersionedSerializer<SplitT> splitSerializer, WatermarkStrategy<OUT> watermarkStrategy, ProcessingTimeService timeService, Configuration configuration, String localHostname, boolean emitProgressiveWatermarks) {
        this.readerFactory = Preconditions.checkNotNull(readerFactory);
        this.operatorEventGateway = Preconditions.checkNotNull(operatorEventGateway);
        this.splitSerializer = Preconditions.checkNotNull(splitSerializer);
        this.watermarkStrategy = Preconditions.checkNotNull(watermarkStrategy);
        this.processingTimeService = timeService;
        this.configuration = Preconditions.checkNotNull(configuration);
        this.localHostname = Preconditions.checkNotNull(localHostname);
        this.emitProgressiveWatermarks = emitProgressiveWatermarks;
        this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
        this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters();
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        super.setup(containingTask, config, output);
        this.initSourceMetricGroup();
    }

    @VisibleForTesting
    protected void initSourceMetricGroup() {
        this.sourceMetricGroup = InternalSourceReaderMetricGroup.wrap(this.getMetricGroup());
    }

    public void initReader() throws Exception {
        if (this.sourceReader != null) {
            return;
        }
        final int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        SourceReaderContext context = new SourceReaderContext(){

            @Override
            public SourceReaderMetricGroup metricGroup() {
                return SourceOperator.this.sourceMetricGroup;
            }

            @Override
            public Configuration getConfiguration() {
                return SourceOperator.this.configuration;
            }

            @Override
            public String getLocalHostName() {
                return SourceOperator.this.localHostname;
            }

            @Override
            public int getIndexOfSubtask() {
                return subtaskIndex;
            }

            @Override
            public void sendSplitRequest() {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator(new RequestSplitEvent(this.getLocalHostName()));
            }

            @Override
            public void sendSourceEventToCoordinator(SourceEvent event) {
                SourceOperator.this.operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
            }

            @Override
            public UserCodeClassLoader getUserCodeClassLoader() {
                return new UserCodeClassLoader(){

                    @Override
                    public ClassLoader asClassLoader() {
                        return SourceOperator.this.getRuntimeContext().getUserCodeClassLoader();
                    }

                    @Override
                    public void registerReleaseHookIfAbsent(String releaseHookName, Runnable releaseHook) {
                        SourceOperator.this.getRuntimeContext().registerUserCodeClassLoaderReleaseHookIfAbsent(releaseHookName, releaseHook);
                    }
                };
            }
        };
        this.sourceReader = this.readerFactory.apply(context);
    }

    public InternalSourceReaderMetricGroup getSourceMetricGroup() {
        return this.sourceMetricGroup;
    }

    @Override
    public void open() throws Exception {
        this.initReader();
        this.eventTimeLogic = this.emitProgressiveWatermarks ? TimestampsAndWatermarks.createProgressiveEventTimeLogic(this.watermarkStrategy, this.sourceMetricGroup, this.getProcessingTimeService(), this.getExecutionConfig().getAutoWatermarkInterval()) : TimestampsAndWatermarks.createNoOpEventTimeLogic(this.watermarkStrategy, this.sourceMetricGroup);
        List splits = CollectionUtil.iterableToList((Iterable)this.readerState.get());
        if (!splits.isEmpty()) {
            this.sourceReader.addSplits(splits);
        }
        this.registerReader();
        this.sourceMetricGroup.idlingStarted();
        this.sourceReader.start();
        this.eventTimeLogic.startPeriodicWatermarkEmits();
    }

    @Override
    public void finish() throws Exception {
        this.stopInternalServices();
        super.finish();
        this.finished.complete(null);
    }

    private void stopInternalServices() {
        if (this.eventTimeLogic != null) {
            this.eventTimeLogic.stopPeriodicWatermarkEmits();
        }
        if (this.latencyMarkerEmitter != null) {
            this.latencyMarkerEmitter.close();
        }
    }

    public CompletableFuture<Void> stop(StopMode mode) {
        switch (this.operatingMode) {
            case WAITING_FOR_ALIGNMENT: 
            case OUTPUT_NOT_INITIALIZED: 
            case READING: {
                this.operatingMode = mode == StopMode.DRAIN ? OperatingMode.SOURCE_DRAINED : OperatingMode.SOURCE_STOPPED;
                this.availabilityHelper.forceStop();
                if (this.operatingMode != OperatingMode.SOURCE_STOPPED) break;
                this.stopInternalServices();
                this.finished.complete(null);
                return this.finished;
            }
        }
        return this.finished;
    }

    @Override
    public void close() throws Exception {
        if (this.sourceReader != null) {
            this.sourceReader.close();
        }
        super.close();
    }

    @Override
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<OUT> output) throws Exception {
        assert (this.lastInvokedOutput == output || this.lastInvokedOutput == null || this.operatingMode == OperatingMode.DATA_FINISHED);
        if (this.operatingMode == OperatingMode.READING) {
            return this.convertToInternalStatus(this.sourceReader.pollNext(this.currentMainOutput));
        }
        return this.emitNextNotReading(output);
    }

    private DataInputStatus emitNextNotReading(PushingAsyncDataInput.DataOutput<OUT> output) throws Exception {
        switch (this.operatingMode) {
            case OUTPUT_NOT_INITIALIZED: {
                if (this.watermarkAlignmentParams.isEnabled()) {
                    this.processingTimeService.scheduleWithFixedDelay(this::emitLatestWatermark, this.watermarkAlignmentParams.getUpdateInterval(), this.watermarkAlignmentParams.getUpdateInterval());
                }
                this.initializeMainOutput(output);
                return this.convertToInternalStatus(this.sourceReader.pollNext(this.currentMainOutput));
            }
            case SOURCE_STOPPED: {
                this.operatingMode = OperatingMode.DATA_FINISHED;
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.STOPPED;
            }
            case SOURCE_DRAINED: {
                this.operatingMode = OperatingMode.DATA_FINISHED;
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.END_OF_DATA;
            }
            case DATA_FINISHED: {
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.END_OF_INPUT;
            }
            case WAITING_FOR_ALIGNMENT: {
                Preconditions.checkState(!this.waitingForAlignmentFuture.isDone());
                Preconditions.checkState(this.shouldWaitForAlignment());
                return this.convertToInternalStatus(InputStatus.NOTHING_AVAILABLE);
            }
        }
        throw new IllegalStateException("Unknown operating mode: " + (Object)((Object)this.operatingMode));
    }

    private void initializeMainOutput(PushingAsyncDataInput.DataOutput<OUT> output) {
        this.currentMainOutput = this.eventTimeLogic.createMainOutput(output, this::onWatermarkEmitted);
        this.initializeLatencyMarkerEmitter(output);
        this.lastInvokedOutput = output;
        this.createOutputForSplits(this.outputPendingSplits);
        this.operatingMode = OperatingMode.READING;
    }

    private void initializeLatencyMarkerEmitter(PushingAsyncDataInput.DataOutput<OUT> output) {
        long latencyTrackingInterval;
        long l = latencyTrackingInterval = this.getExecutionConfig().isLatencyTrackingConfigured() ? this.getExecutionConfig().getLatencyTrackingInterval() : this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration().getLong(MetricOptions.LATENCY_INTERVAL);
        if (latencyTrackingInterval > 0L) {
            this.latencyMarkerEmitter = new LatencyMarkerEmitter(this.getProcessingTimeService(), output::emitLatencyMarker, latencyTrackingInterval, this.getOperatorID(), this.getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    private DataInputStatus convertToInternalStatus(InputStatus inputStatus) {
        switch (inputStatus) {
            case MORE_AVAILABLE: {
                return DataInputStatus.MORE_AVAILABLE;
            }
            case NOTHING_AVAILABLE: {
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.NOTHING_AVAILABLE;
            }
            case END_OF_INPUT: {
                this.operatingMode = OperatingMode.DATA_FINISHED;
                this.sourceMetricGroup.idlingStarted();
                return DataInputStatus.END_OF_DATA;
            }
        }
        throw new IllegalArgumentException("Unknown input status: " + (Object)((Object)inputStatus));
    }

    private void emitLatestWatermark(long time) {
        Preconditions.checkState(this.currentMainOutput != null);
        this.operatorEventGateway.sendEventToCoordinator(new ReportedWatermarkEvent(this.lastEmittedWatermark));
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        LOG.debug("Taking a snapshot for checkpoint {}", (Object)checkpointId);
        this.readerState.update(this.sourceReader.snapshotState(checkpointId));
    }

    @Override
    public CompletableFuture<?> getAvailableFuture() {
        switch (this.operatingMode) {
            case WAITING_FOR_ALIGNMENT: {
                return this.availabilityHelper.update(this.waitingForAlignmentFuture);
            }
            case OUTPUT_NOT_INITIALIZED: 
            case READING: {
                return this.availabilityHelper.update(this.sourceReader.isAvailable());
            }
            case SOURCE_STOPPED: 
            case SOURCE_DRAINED: 
            case DATA_FINISHED: {
                return AvailabilityProvider.AVAILABLE;
            }
        }
        throw new IllegalStateException("Unknown operating mode: " + (Object)((Object)this.operatingMode));
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        ListState<byte[]> rawState = context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
        this.readerState = new SimpleVersionedListState<SplitT>(rawState, this.splitSerializer);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        this.sourceReader.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        super.notifyCheckpointAborted(checkpointId);
        this.sourceReader.notifyCheckpointAborted(checkpointId);
    }

    @Override
    public void handleOperatorEvent(OperatorEvent event) {
        if (event instanceof WatermarkAlignmentEvent) {
            this.updateMaxDesiredWatermark((WatermarkAlignmentEvent)event);
            this.checkWatermarkAlignment();
        } else if (event instanceof AddSplitEvent) {
            this.handleAddSplitsEvent((AddSplitEvent)event);
        } else if (event instanceof SourceEventWrapper) {
            this.sourceReader.handleSourceEvents(((SourceEventWrapper)event).getSourceEvent());
        } else if (event instanceof NoMoreSplitsEvent) {
            this.sourceReader.notifyNoMoreSplits();
        } else {
            throw new IllegalStateException("Received unexpected operator event " + event);
        }
    }

    private void handleAddSplitsEvent(AddSplitEvent<SplitT> event) {
        try {
            List<SplitT> newSplits = event.splits(this.splitSerializer);
            if (this.operatingMode == OperatingMode.OUTPUT_NOT_INITIALIZED) {
                this.outputPendingSplits.addAll(newSplits);
            } else {
                this.createOutputForSplits(newSplits);
            }
            this.sourceReader.addSplits(newSplits);
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Failed to deserialize the splits.", e);
        }
    }

    private void createOutputForSplits(List<SplitT> newSplits) {
        for (SourceSplit split : newSplits) {
            this.currentMainOutput.createOutputForSplit(split.splitId());
        }
    }

    private void updateMaxDesiredWatermark(WatermarkAlignmentEvent event) {
        this.currentMaxDesiredWatermark = event.getMaxWatermark();
        this.sourceMetricGroup.updateMaxDesiredWatermark(this.currentMaxDesiredWatermark);
    }

    private void onWatermarkEmitted(long emittedWatermark) {
        this.lastEmittedWatermark = emittedWatermark;
        this.checkWatermarkAlignment();
    }

    private void checkWatermarkAlignment() {
        if (this.operatingMode == OperatingMode.READING) {
            Preconditions.checkState(this.waitingForAlignmentFuture.isDone());
            if (this.shouldWaitForAlignment()) {
                this.operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
                this.waitingForAlignmentFuture = new CompletableFuture();
            }
        } else if (this.operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
            Preconditions.checkState(!this.waitingForAlignmentFuture.isDone());
            if (!this.shouldWaitForAlignment()) {
                this.operatingMode = OperatingMode.READING;
                this.waitingForAlignmentFuture.complete(null);
            }
        }
    }

    private boolean shouldWaitForAlignment() {
        return this.currentMaxDesiredWatermark < this.lastEmittedWatermark;
    }

    private void registerReader() {
        this.operatorEventGateway.sendEventToCoordinator(new ReaderRegistrationEvent(this.getRuntimeContext().getIndexOfThisSubtask(), this.localHostname));
    }

    @VisibleForTesting
    public SourceReader<OUT, SplitT> getSourceReader() {
        return this.sourceReader;
    }

    @VisibleForTesting
    ListState<SplitT> getReaderState() {
        return this.readerState;
    }

    private static class SourceOperatorAvailabilityHelper {
        private final CompletableFuture<Void> forcedStopFuture = new CompletableFuture();
        private final MultipleFuturesAvailabilityHelper availabilityHelper = new MultipleFuturesAvailabilityHelper(2);

        private SourceOperatorAvailabilityHelper() {
            this.availabilityHelper.anyOf(0, this.forcedStopFuture);
        }

        public CompletableFuture<?> update(CompletableFuture<Void> sourceReaderFuture) {
            if (sourceReaderFuture == AvailabilityProvider.AVAILABLE || sourceReaderFuture.isDone()) {
                return AvailabilityProvider.AVAILABLE;
            }
            this.availabilityHelper.resetToUnAvailable();
            this.availabilityHelper.anyOf(0, this.forcedStopFuture);
            this.availabilityHelper.anyOf(1, sourceReaderFuture);
            return this.availabilityHelper.getAvailableFuture();
        }

        public void forceStop() {
            this.forcedStopFuture.complete(null);
        }
    }

    private static enum OperatingMode {
        READING,
        WAITING_FOR_ALIGNMENT,
        OUTPUT_NOT_INITIALIZED,
        SOURCE_DRAINED,
        SOURCE_STOPPED,
        DATA_FINISHED;

    }
}

