/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
    private final String commitUser;
    protected final SnapshotManager snapshotManager;
    private final FileStoreScan scan;
    @Nullable
    protected IOManager ioManager;
    protected final Map<BinaryRowData, Map<Integer, WriterContainer<T>>> writers;
    private final ExecutorService compactExecutor;
    private boolean overwrite = false;

    protected AbstractFileStoreWrite(String commitUser, SnapshotManager snapshotManager, FileStoreScan scan) {
        this.commitUser = commitUser;
        this.snapshotManager = snapshotManager;
        this.scan = scan;
        this.writers = new HashMap<BinaryRowData, Map<Integer, WriterContainer<T>>>();
        this.compactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-compaction"));
    }

    @Override
    public FileStoreWrite<T> withIOManager(IOManager ioManager) {
        this.ioManager = ioManager;
        return this;
    }

    protected List<DataFileMeta> scanExistingFileMetas(Long snapshotId, BinaryRowData partition, int bucket) {
        ArrayList<DataFileMeta> existingFileMetas = new ArrayList<DataFileMeta>();
        if (snapshotId != null) {
            this.scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition)).withBucket(bucket).plan().files().stream().map(ManifestEntry::file).forEach(existingFileMetas::add);
        }
        return existingFileMetas;
    }

    @Override
    public void withOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    @Override
    public void write(BinaryRowData partition, int bucket, T data) throws Exception {
        RecordWriter<T> writer = this.getWriterWrapper((BinaryRowData)partition, (int)bucket).writer;
        writer.write(data);
    }

    @Override
    public void compact(BinaryRowData partition, int bucket, boolean fullCompaction) throws Exception {
        this.getWriterWrapper((BinaryRowData)partition, (int)bucket).writer.compact(fullCompaction);
    }

    @Override
    public void notifyNewFiles(long snapshotId, BinaryRowData partition, int bucket, List<DataFileMeta> files) {
        WriterContainer<T> writerContainer = this.getWriterWrapper(partition, bucket);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}", new Object[]{partition, bucket, snapshotId, ((WriterContainer)writerContainer).baseSnapshotId, files});
        }
        if (snapshotId > ((WriterContainer)writerContainer).baseSnapshotId) {
            writerContainer.writer.addNewFiles(files);
        }
    }

    @Override
    public List<FileCommittable> prepareCommit(boolean blocking, long commitIdentifier) throws Exception {
        long latestCommittedIdentifier = this.writers.values().stream().map(Map::values).flatMap(Collection::stream).mapToLong(w -> ((WriterContainer)w).lastModifiedCommitIdentifier).max().orElse(Long.MIN_VALUE) == Long.MIN_VALUE ? Long.MIN_VALUE : this.snapshotManager.latestSnapshotOfUser(this.commitUser).map(Snapshot::commitIdentifier).orElse(Long.MIN_VALUE);
        ArrayList<FileCommittable> result = new ArrayList<FileCommittable>();
        Iterator<Map.Entry<BinaryRowData, Map<Integer, WriterContainer<T>>>> partIter = this.writers.entrySet().iterator();
        while (partIter.hasNext()) {
            Map.Entry<BinaryRowData, Map<Integer, WriterContainer<T>>> partEntry = partIter.next();
            BinaryRowData partition = partEntry.getKey();
            Iterator<Map.Entry<Integer, WriterContainer<T>>> bucketIter = partEntry.getValue().entrySet().iterator();
            while (bucketIter.hasNext()) {
                Map.Entry<Integer, WriterContainer<T>> entry = bucketIter.next();
                int bucket = entry.getKey();
                WriterContainer<T> writerContainer = entry.getValue();
                RecordWriter.CommitIncrement increment = writerContainer.writer.prepareCommit(blocking);
                FileCommittable committable = new FileCommittable(partition, bucket, increment.newFilesIncrement(), increment.compactIncrement());
                result.add(committable);
                if (committable.isEmpty()) {
                    if (((WriterContainer)writerContainer).lastModifiedCommitIdentifier > latestCommittedIdentifier) continue;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Closing writer for partition {}, bucket {}. Writer's last modified identifier is {}, while latest committed identifier is {}", new Object[]{partition, bucket, ((WriterContainer)writerContainer).lastModifiedCommitIdentifier, latestCommittedIdentifier});
                    }
                    writerContainer.writer.close();
                    bucketIter.remove();
                    continue;
                }
                ((WriterContainer)writerContainer).lastModifiedCommitIdentifier = commitIdentifier;
            }
            if (!partEntry.getValue().isEmpty()) continue;
            partIter.remove();
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        for (Map<Integer, WriterContainer<T>> bucketWriters : this.writers.values()) {
            for (WriterContainer<T> writerContainer : bucketWriters.values()) {
                writerContainer.writer.close();
            }
        }
        this.writers.clear();
        this.compactExecutor.shutdownNow();
    }

    private WriterContainer<T> getWriterWrapper(BinaryRowData partition, int bucket) {
        Map<Integer, WriterContainer<T>> buckets = this.writers.get(partition);
        if (buckets == null) {
            buckets = new HashMap<Integer, WriterContainer<T>>();
            this.writers.put(partition.copy(), buckets);
        }
        return buckets.computeIfAbsent(bucket, k -> this.createWriterContainer(partition.copy(), bucket));
    }

    private WriterContainer<T> createWriterContainer(BinaryRowData partition, int bucket) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating writer for partition {}, bucket {}", (Object)partition, (Object)bucket);
        }
        WriterContainer<T> writerContainer = this.overwrite ? this.createEmptyWriterContainer(partition.copy(), bucket, this.compactExecutor) : this.createWriterContainer(partition.copy(), bucket, this.compactExecutor);
        this.notifyNewWriter(writerContainer.writer);
        return writerContainer;
    }

    protected void notifyNewWriter(RecordWriter<T> writer) {
    }

    @VisibleForTesting
    public abstract WriterContainer<T> createWriterContainer(BinaryRowData var1, int var2, ExecutorService var3);

    @VisibleForTesting
    public abstract WriterContainer<T> createEmptyWriterContainer(BinaryRowData var1, int var2, ExecutorService var3);

    @VisibleForTesting
    public static class WriterContainer<T> {
        public final RecordWriter<T> writer;
        private final long baseSnapshotId;
        private long lastModifiedCommitIdentifier;

        protected WriterContainer(RecordWriter<T> writer, Long baseSnapshotId) {
            this.writer = writer;
            this.baseSnapshotId = baseSnapshotId == null ? 0L : baseSnapshotId;
            this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
        }
    }
}

