/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.operation;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.manifest.FileKind;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.manifest.ManifestEntry;
import org.apache.flink.table.store.file.manifest.ManifestFile;
import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.predicate.PredicateConverter;
import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.utils.RowDataToObjectArrayConverter;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStoreCommitImpl
implements FileStoreCommit {
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreCommitImpl.class);
    private final long schemaId;
    private final String commitUser;
    private final RowType partitionType;
    private final RowDataToObjectArrayConverter partitionObjectConverter;
    private final FileStorePathFactory pathFactory;
    private final SnapshotManager snapshotManager;
    private final ManifestFile manifestFile;
    private final ManifestList manifestList;
    private final FileStoreScan scan;
    private final int numBucket;
    private final MemorySize manifestTargetSize;
    private final int manifestMergeMinCount;
    @Nullable
    private final Comparator<RowData> keyComparator;
    @Nullable
    private Lock lock;
    private boolean createEmptyCommit;

    public FileStoreCommitImpl(long schemaId, String commitUser, RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, ManifestFile.Factory manifestFileFactory, ManifestList.Factory manifestListFactory, FileStoreScan scan, int numBucket, MemorySize manifestTargetSize, int manifestMergeMinCount, @Nullable Comparator<RowData> keyComparator) {
        this.schemaId = schemaId;
        this.commitUser = commitUser;
        this.partitionType = partitionType;
        this.partitionObjectConverter = new RowDataToObjectArrayConverter(partitionType);
        this.pathFactory = pathFactory;
        this.snapshotManager = snapshotManager;
        this.manifestFile = manifestFileFactory.create();
        this.manifestList = manifestListFactory.create();
        this.scan = scan;
        this.numBucket = numBucket;
        this.manifestTargetSize = manifestTargetSize;
        this.manifestMergeMinCount = manifestMergeMinCount;
        this.keyComparator = keyComparator;
        this.lock = null;
        this.createEmptyCommit = false;
    }

    @Override
    public FileStoreCommit withLock(Lock lock) {
        this.lock = lock;
        return this;
    }

    @Override
    public FileStoreCommit withCreateEmptyCommit(boolean createEmptyCommit) {
        this.createEmptyCommit = createEmptyCommit;
        return this;
    }

    @Override
    public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> committableList) {
        if (committableList.isEmpty()) {
            return committableList;
        }
        Optional<Snapshot> latestSnapshot = this.snapshotManager.latestSnapshotOfUser(this.commitUser);
        if (latestSnapshot.isPresent()) {
            ArrayList<ManifestCommittable> result = new ArrayList<ManifestCommittable>();
            for (ManifestCommittable committable : committableList) {
                if (committable.identifier() <= latestSnapshot.get().commitIdentifier()) continue;
                result.add(committable);
            }
            return result;
        }
        return committableList;
    }

    @Override
    public void commit(ManifestCommittable committable, Map<String, String> properties) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit\n" + committable.toString());
        }
        Long safeLatestSnapshotId = null;
        ArrayList<ManifestEntry> baseEntries = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> appendTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> appendChangelog = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactChangelog = new ArrayList<ManifestEntry>();
        this.collectChanges(committable.fileCommittables(), appendTableFiles, appendChangelog, compactTableFiles, compactChangelog);
        if (this.createEmptyCommit || !appendTableFiles.isEmpty() || !appendChangelog.isEmpty()) {
            Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
            if (latestSnapshotId != null) {
                baseEntries.addAll(this.readAllEntriesFromChangedPartitions(latestSnapshotId, appendTableFiles, compactTableFiles));
                this.noConflictsOrFail(baseEntries, appendTableFiles);
                safeLatestSnapshotId = latestSnapshotId;
            }
            this.tryCommit(appendTableFiles, appendChangelog, committable.identifier(), committable.logOffsets(), Snapshot.CommitKind.APPEND, safeLatestSnapshotId);
        }
        if (!compactTableFiles.isEmpty() || !compactChangelog.isEmpty()) {
            if (safeLatestSnapshotId != null) {
                baseEntries.addAll(appendTableFiles);
                this.noConflictsOrFail(baseEntries, compactTableFiles);
                safeLatestSnapshotId = safeLatestSnapshotId + 1L;
            }
            this.tryCommit(compactTableFiles, compactChangelog, committable.identifier(), committable.logOffsets(), Snapshot.CommitKind.COMPACT, safeLatestSnapshotId);
        }
    }

    @Override
    public void overwrite(Map<String, String> partition, ManifestCommittable committable, Map<String, String> properties) {
        Predicate partitionFilter;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to overwrite partition " + partition.toString() + "\n" + committable.toString());
        }
        ArrayList<ManifestEntry> appendTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> appendChangelog = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactTableFiles = new ArrayList<ManifestEntry>();
        ArrayList<ManifestEntry> compactChangelog = new ArrayList<ManifestEntry>();
        this.collectChanges(committable.fileCommittables(), appendTableFiles, appendChangelog, compactTableFiles, compactChangelog);
        if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
            StringBuilder warnMessage = new StringBuilder("Overwrite mode currently does not commit any changelog.\nPlease make sure that the partition you're overwriting is not being consumed by a streaming reader.\nIgnored changelog files are:\n");
            for (ManifestEntry entry : appendChangelog) {
                warnMessage.append("  * ").append(entry.toString()).append("\n");
            }
            for (ManifestEntry entry : compactChangelog) {
                warnMessage.append("  * ").append(entry.toString()).append("\n");
            }
            LOG.warn(warnMessage.toString());
        }
        if ((partitionFilter = PredicateConverter.fromMap(partition, this.partitionType)) != null) {
            for (ManifestEntry entry : appendTableFiles) {
                if (partitionFilter.test(this.partitionObjectConverter.convert((RowData)entry.partition()))) continue;
                throw new IllegalArgumentException("Trying to overwrite partition " + partition + ", but the changes in " + this.pathFactory.getPartitionString(entry.partition()) + " does not belong to this partition");
            }
        }
        this.tryOverwrite(partitionFilter, appendTableFiles, committable.identifier(), committable.logOffsets());
        if (!compactTableFiles.isEmpty()) {
            this.tryCommit(compactTableFiles, Collections.emptyList(), committable.identifier(), committable.logOffsets(), Snapshot.CommitKind.COMPACT, null);
        }
    }

    private void collectChanges(List<FileCommittable> fileCommittables, List<ManifestEntry> appendTableFiles, List<ManifestEntry> appendChangelog, List<ManifestEntry> compactTableFiles, List<ManifestEntry> compactChangelog) {
        for (FileCommittable fileCommittable : fileCommittables) {
            fileCommittable.newFilesIncrement().newFiles().forEach(m -> appendTableFiles.add(this.makeEntry(FileKind.ADD, fileCommittable, (DataFileMeta)m)));
            fileCommittable.newFilesIncrement().changelogFiles().forEach(m -> appendChangelog.add(this.makeEntry(FileKind.ADD, fileCommittable, (DataFileMeta)m)));
            fileCommittable.compactIncrement().compactBefore().forEach(m -> compactTableFiles.add(this.makeEntry(FileKind.DELETE, fileCommittable, (DataFileMeta)m)));
            fileCommittable.compactIncrement().compactAfter().forEach(m -> compactTableFiles.add(this.makeEntry(FileKind.ADD, fileCommittable, (DataFileMeta)m)));
            fileCommittable.compactIncrement().changelogFiles().forEach(m -> compactChangelog.add(this.makeEntry(FileKind.ADD, fileCommittable, (DataFileMeta)m)));
        }
    }

    private ManifestEntry makeEntry(FileKind kind, FileCommittable fileCommittable, DataFileMeta file) {
        return new ManifestEntry(kind, fileCommittable.partition(), fileCommittable.bucket(), this.numBucket, file);
    }

    private void tryCommit(List<ManifestEntry> tableFiles, List<ManifestEntry> changelogFiles, long identifier, Map<Integer, Long> logOffsets, Snapshot.CommitKind commitKind, Long safeLatestSnapshotId) {
        Long latestSnapshotId;
        while (!this.tryCommitOnce(tableFiles, changelogFiles, identifier, logOffsets, commitKind, latestSnapshotId = this.snapshotManager.latestSnapshotId(), safeLatestSnapshotId)) {
        }
    }

    private void tryOverwrite(Predicate partitionFilter, List<ManifestEntry> changes, long identifier, Map<Integer, Long> logOffsets) {
        Long latestSnapshotId;
        ArrayList<ManifestEntry> changesWithOverwrite;
        do {
            latestSnapshotId = this.snapshotManager.latestSnapshotId();
            changesWithOverwrite = new ArrayList<ManifestEntry>();
            if (latestSnapshotId != null) {
                List<ManifestEntry> currentEntries = this.scan.withSnapshot(latestSnapshotId).withPartitionFilter(partitionFilter).plan().files();
                for (ManifestEntry entry : currentEntries) {
                    changesWithOverwrite.add(new ManifestEntry(FileKind.DELETE, entry.partition(), entry.bucket(), entry.totalBuckets(), entry.file()));
                }
            }
            changesWithOverwrite.addAll(changes);
        } while (!this.tryCommitOnce(changesWithOverwrite, Collections.emptyList(), identifier, logOffsets, Snapshot.CommitKind.OVERWRITE, latestSnapshotId, null));
    }

    private boolean tryCommitOnce(List<ManifestEntry> tableFiles, List<ManifestEntry> changelogFiles, long identifier, Map<Integer, Long> logOffsets, Snapshot.CommitKind commitKind, Long latestSnapshotId, Long safeLatestSnapshotId) {
        boolean success;
        Snapshot newSnapshot;
        long newSnapshotId = latestSnapshotId == null ? 1L : latestSnapshotId + 1L;
        Path newSnapshotPath = this.snapshotManager.snapshotPath(newSnapshotId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
            for (ManifestEntry entry : tableFiles) {
                LOG.debug("  * " + entry.toString());
            }
            LOG.debug("Ready to commit changelog to snapshot #" + newSnapshotId);
            for (ManifestEntry entry : changelogFiles) {
                LOG.debug("  * " + entry.toString());
            }
        }
        Snapshot latestSnapshot = null;
        if (latestSnapshotId != null) {
            if (!latestSnapshotId.equals(safeLatestSnapshotId)) {
                this.noConflictsOrFail(latestSnapshotId, tableFiles);
            }
            latestSnapshot = this.snapshotManager.snapshot(latestSnapshotId);
        }
        String previousChangesListName = null;
        String newChangesListName = null;
        String changelogListName = null;
        ArrayList<ManifestFileMeta> oldMetas = new ArrayList<ManifestFileMeta>();
        ArrayList<ManifestFileMeta> newMetas = new ArrayList<ManifestFileMeta>();
        ArrayList<ManifestFileMeta> changelogMetas = new ArrayList<ManifestFileMeta>();
        try {
            if (latestSnapshot != null) {
                oldMetas.addAll(latestSnapshot.readAllDataManifests(this.manifestList));
                latestSnapshot.getLogOffsets().forEach(logOffsets::putIfAbsent);
            }
            newMetas.addAll(ManifestFileMeta.merge(oldMetas, this.manifestFile, this.manifestTargetSize.getBytes(), this.manifestMergeMinCount));
            previousChangesListName = this.manifestList.write(newMetas);
            List<ManifestFileMeta> newChangesManifests = this.manifestFile.write(tableFiles);
            newMetas.addAll(newChangesManifests);
            newChangesListName = this.manifestList.write(newChangesManifests);
            if (!changelogFiles.isEmpty()) {
                changelogMetas.addAll(this.manifestFile.write(changelogFiles));
                changelogListName = this.manifestList.write(changelogMetas);
            }
            newSnapshot = new Snapshot(newSnapshotId, this.schemaId, previousChangesListName, newChangesListName, changelogListName, this.commitUser, identifier, commitKind, System.currentTimeMillis(), logOffsets);
        }
        catch (Throwable e) {
            this.cleanUpTmpManifests(previousChangesListName, newChangesListName, changelogListName, oldMetas, newMetas, changelogMetas);
            throw new RuntimeException(String.format("Exception occurs when preparing snapshot #%d (path %s) by user %s with hash %s and kind %s. Clean up.", newSnapshotId, newSnapshotPath.toString(), this.commitUser, identifier, commitKind.name()), e);
        }
        try {
            FileSystem fs = newSnapshotPath.getFileSystem();
            Callable<Boolean> callable = () -> {
                if (fs.exists(newSnapshotPath)) {
                    return false;
                }
                boolean committed = AtomicFileWriter.writeFileUtf8(newSnapshotPath, newSnapshot.toJson());
                if (committed) {
                    this.snapshotManager.commitLatestHint(newSnapshotId);
                }
                return committed;
            };
            success = this.lock != null ? this.lock.runWithLock(() -> !fs.exists(newSnapshotPath) && (Boolean)callable.call() != false).booleanValue() : callable.call().booleanValue();
        }
        catch (Throwable e) {
            throw new RuntimeException(String.format("Exception occurs when committing snapshot #%d (path %s) by user %s with identifier %s and kind %s. Cannot clean up because we can't determine the success.", newSnapshotId, newSnapshotPath, this.commitUser, identifier, commitKind.name()), e);
        }
        if (success) {
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Successfully commit snapshot #%d (path %s) by user %s with identifier %s and kind %s.", newSnapshotId, newSnapshotPath, this.commitUser, identifier, commitKind.name()));
            }
            return true;
        }
        LOG.warn(String.format("Atomic commit failed for snapshot #%d (path %s) by user %s with identifier %s and kind %s. Clean up and try again.", newSnapshotId, newSnapshotPath, this.commitUser, identifier, commitKind.name()));
        this.cleanUpTmpManifests(previousChangesListName, newChangesListName, changelogListName, oldMetas, newMetas, changelogMetas);
        return false;
    }

    @SafeVarargs
    private final List<ManifestEntry> readAllEntriesFromChangedPartitions(long snapshotId, List<ManifestEntry> ... changes) {
        List<BinaryRowData> changedPartitions = Arrays.stream(changes).flatMap(Collection::stream).map(ManifestEntry::partition).distinct().collect(Collectors.toList());
        try {
            return this.scan.withSnapshot(snapshotId).withPartitionFilter(changedPartitions).plan().files();
        }
        catch (Throwable e) {
            throw new RuntimeException("Cannot read manifest entries from changed partitions.", e);
        }
    }

    private void noConflictsOrFail(long snapshotId, List<ManifestEntry> changes) {
        this.noConflictsOrFail(this.readAllEntriesFromChangedPartitions(snapshotId, changes), changes);
    }

    private void noConflictsOrFail(List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
        Collection<ManifestEntry> mergedEntries;
        ArrayList<ManifestEntry> allEntries = new ArrayList<ManifestEntry>(baseEntries);
        allEntries.addAll(changes);
        try {
            mergedEntries = ManifestEntry.mergeEntries(allEntries);
            ManifestEntry.assertNoDelete(mergedEntries);
        }
        catch (Throwable e) {
            LOG.warn("File deletion conflicts detected! Give up committing.", e);
            throw this.createConflictException("File deletion conflicts detected! Give up committing.", baseEntries, changes);
        }
        if (this.keyComparator == null) {
            return;
        }
        HashMap<LevelIdentifier, List> levels = new HashMap<LevelIdentifier, List>();
        for (ManifestEntry entry : mergedEntries) {
            int level = entry.file().level();
            if (level < 1) continue;
            levels.computeIfAbsent(new LevelIdentifier(entry.partition(), entry.bucket(), level), lv -> new ArrayList()).add(entry);
        }
        for (List entries : levels.values()) {
            entries.sort((a, b) -> this.keyComparator.compare((RowData)a.file().minKey(), (RowData)b.file().minKey()));
            int i = 0;
            while (i + 1 < entries.size()) {
                ManifestEntry a2 = (ManifestEntry)entries.get(i);
                ManifestEntry b2 = (ManifestEntry)entries.get(i + 1);
                if (this.keyComparator.compare((RowData)a2.file().maxKey(), (RowData)b2.file().minKey()) >= 0) {
                    throw this.createConflictException("LSM conflicts detected! Give up committing. Conflict files are:\n" + a2.identifier().toString(this.pathFactory) + "\n" + b2.identifier().toString(this.pathFactory), baseEntries, changes);
                }
                ++i;
            }
        }
    }

    private RuntimeException createConflictException(String message, List<ManifestEntry> baseEntries, List<ManifestEntry> changes) {
        String possibleCauses = String.join((CharSequence)"\n", "Conflicts during commits are normal and this failure is intended to resolve the conflicts.", "Conflicts are mainly caused by the following scenarios:", "1. Multiple jobs are writing into the same partition at the same time.", "2. You're recovering from an old savepoint, or you're creating multiple jobs from a savepoint.", "   The job will fail continuously in this scenario to protect metadata from corruption.", "   You can either recover from the latest savepoint, or you can revert the table to the snapshot corresponding to the old savepoint.");
        String baseEntriesString = "Base entries are:\n" + baseEntries.stream().map(ManifestEntry::toString).collect(Collectors.joining("\n"));
        String changesString = "Changes are:\n" + changes.stream().map(ManifestEntry::toString).collect(Collectors.joining("\n"));
        return new RuntimeException(message + "\n\n" + possibleCauses + "\n\n" + baseEntriesString + "\n\n" + changesString);
    }

    private void cleanUpTmpManifests(String previousChangesListName, String newChangesListName, String changelogListName, List<ManifestFileMeta> oldMetas, List<ManifestFileMeta> newMetas, List<ManifestFileMeta> changelogMetas) {
        if (previousChangesListName != null) {
            this.manifestList.delete(previousChangesListName);
        }
        if (newChangesListName != null) {
            this.manifestList.delete(newChangesListName);
        }
        if (changelogListName != null) {
            this.manifestList.delete(changelogListName);
        }
        HashSet<ManifestFileMeta> oldMetaSet = new HashSet<ManifestFileMeta>(oldMetas);
        for (ManifestFileMeta suspect : newMetas) {
            if (oldMetaSet.contains(suspect)) continue;
            this.manifestList.delete(suspect.fileName());
        }
        for (ManifestFileMeta meta : changelogMetas) {
            this.manifestList.delete(meta.fileName());
        }
    }

    private static class LevelIdentifier {
        private final BinaryRowData partition;
        private final int bucket;
        private final int level;

        private LevelIdentifier(BinaryRowData partition, int bucket, int level) {
            this.partition = partition;
            this.bucket = bucket;
            this.level = level;
        }

        public boolean equals(Object o) {
            if (!(o instanceof LevelIdentifier)) {
                return false;
            }
            LevelIdentifier that = (LevelIdentifier)o;
            return Objects.equals(this.partition, that.partition) && this.bucket == that.bucket && this.level == that.level;
        }

        public int hashCode() {
            return Objects.hash(this.partition, this.bucket, this.level);
        }
    }
}

