/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.table.store.connector.source.FileStoreSourceSplit;
import org.apache.flink.table.store.connector.source.FileStoreSourceSplitGenerator;
import org.apache.flink.table.store.connector.source.PendingSplitsCheckpoint;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.source.TableScan;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    private final SplitEnumeratorContext<FileStoreSourceSplit> context;
    private final TableScan scan;
    private final SnapshotManager snapshotManager;
    private final Map<Integer, Queue<FileStoreSourceSplit>> bucketSplits;
    private final long discoveryInterval;
    private final Set<Integer> readersAwaitingSplit;
    private final FileStoreSourceSplitGenerator splitGenerator;
    private final SnapshotEnumerator snapshotEnumerator;
    private Long currentSnapshotId;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, TableScan scan, SnapshotManager snapshotManager, Collection<FileStoreSourceSplit> remainSplits, long currentSnapshotId, long discoveryInterval) {
        Preconditions.checkArgument((discoveryInterval > 0L ? 1 : 0) != 0);
        this.context = (SplitEnumeratorContext)Preconditions.checkNotNull(context);
        this.scan = (TableScan)Preconditions.checkNotNull((Object)scan);
        this.snapshotManager = snapshotManager;
        this.bucketSplits = new HashMap<Integer, Queue<FileStoreSourceSplit>>();
        this.addSplits(remainSplits);
        this.currentSnapshotId = currentSnapshotId;
        this.discoveryInterval = discoveryInterval;
        this.readersAwaitingSplit = new HashSet<Integer>();
        this.splitGenerator = new FileStoreSourceSplitGenerator();
        this.snapshotEnumerator = new SnapshotEnumerator(currentSnapshotId);
    }

    private void addSplits(Collection<FileStoreSourceSplit> splits) {
        splits.forEach(this::addSplit);
    }

    private void addSplit(FileStoreSourceSplit split) {
        this.bucketSplits.computeIfAbsent(split.split().bucket(), i -> new LinkedList()).add(split);
    }

    public void start() {
        this.context.callAsync((Callable)this.snapshotEnumerator, this::processDiscoveredSplits, this.discoveryInterval, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int subtaskId) {
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        this.readersAwaitingSplit.add(subtaskId);
        this.assignSplits();
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
    }

    public void addSplitsBack(List<FileStoreSourceSplit> splits, int subtaskId) {
        LOG.debug("File Source Enumerator adds splits back: {}", splits);
        this.addSplits(splits);
    }

    public PendingSplitsCheckpoint snapshotState(long checkpointId) {
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>();
        this.bucketSplits.values().forEach(splits::addAll);
        PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(splits, this.currentSnapshotId == null ? -1L : this.currentSnapshotId);
        LOG.debug("Source Checkpoint is {}", (Object)checkpoint);
        return checkpoint;
    }

    private void processDiscoveredSplits(@Nullable EnumeratorResult result, Throwable error) {
        if (error != null) {
            LOG.error("Failed to enumerate files", error);
            return;
        }
        if (result == null) {
            return;
        }
        this.currentSnapshotId = result.snapshotId;
        this.addSplits(result.splits);
        this.assignSplits();
    }

    private void assignSplits() {
        this.bucketSplits.forEach((bucket, splits) -> {
            int task;
            if (splits.size() > 0 && this.readersAwaitingSplit.remove(task = bucket % this.context.currentParallelism())) {
                if (!this.context.registeredReaders().containsKey(task)) {
                    return;
                }
                this.context.assignSplit((SourceSplit)splits.poll(), task);
            }
        });
    }

    private static class EnumeratorResult {
        private final long snapshotId;
        private final List<FileStoreSourceSplit> splits;

        private EnumeratorResult(long snapshotId, List<FileStoreSourceSplit> splits) {
            this.snapshotId = snapshotId;
            this.splits = splits;
        }
    }

    private class SnapshotEnumerator
    implements Callable<EnumeratorResult> {
        private long nextSnapshotId;

        private SnapshotEnumerator(long currentSnapshot) {
            this.nextSnapshotId = currentSnapshot + 1L;
        }

        @Override
        @Nullable
        public EnumeratorResult call() {
            while (true) {
                if (!ContinuousFileSplitEnumerator.this.snapshotManager.snapshotExists(this.nextSnapshotId)) {
                    LOG.debug("Next snapshot id {} not exists, wait for it to be generated.", (Object)this.nextSnapshotId);
                    return null;
                }
                Snapshot snapshot = ContinuousFileSplitEnumerator.this.snapshotManager.snapshot(this.nextSnapshotId);
                if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) break;
                if (snapshot.commitKind() == Snapshot.CommitKind.OVERWRITE) {
                    LOG.warn("Ignore overwrite snapshot id {}.", (Object)this.nextSnapshotId);
                }
                ++this.nextSnapshotId;
                LOG.debug("Next snapshot id {} is not append, but is {}, check next one.", (Object)this.nextSnapshotId, (Object)snapshot.commitKind());
            }
            List<FileStoreSourceSplit> splits = ContinuousFileSplitEnumerator.this.splitGenerator.createSplits(ContinuousFileSplitEnumerator.this.scan.withSnapshot(this.nextSnapshotId).plan());
            EnumeratorResult result = new EnumeratorResult(this.nextSnapshotId, splits);
            LOG.debug("Find snapshot id {}, it has {} splits.", (Object)this.nextSnapshotId, (Object)splits.size());
            ++this.nextSnapshotId;
            return result;
        }
    }
}

