/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.operation;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
import org.apache.flink.table.store.file.mergetree.Levels;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import org.apache.flink.table.store.file.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
import org.apache.flink.table.store.file.operation.FileStoreScan;
import org.apache.flink.table.store.file.operation.MemoryFileStoreWrite;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyValueFileStoreWrite
extends MemoryFileStoreWrite<KeyValue> {
    private static final Logger LOG = LoggerFactory.getLogger(KeyValueFileStoreWrite.class);
    private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
    private final KeyValueFileWriterFactory.Builder writerFactoryBuilder;
    private final Supplier<Comparator<RowData>> keyComparatorSupplier;
    private final MergeFunctionFactory<KeyValue> mfFactory;
    private final CoreOptions options;
    private final FileStorePathFactory pathFactory;

    public KeyValueFileStoreWrite(SchemaManager schemaManager, long schemaId, String commitUser, RowType keyType, RowType valueType, Supplier<Comparator<RowData>> keyComparatorSupplier, MergeFunctionFactory<KeyValue> mfFactory, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, KeyValueFieldsExtractor extractor) {
        super(commitUser, snapshotManager, scan, options);
        this.readerFactoryBuilder = KeyValueFileReaderFactory.builder(schemaManager, schemaId, keyType, valueType, options.fileFormat(), pathFactory, extractor);
        this.writerFactoryBuilder = KeyValueFileWriterFactory.builder(schemaId, keyType, valueType, options.fileFormat(), pathFactory, options.targetFileSize());
        this.keyComparatorSupplier = keyComparatorSupplier;
        this.mfFactory = mfFactory;
        this.options = options;
        this.pathFactory = pathFactory;
    }

    @Override
    public AbstractFileStoreWrite.WriterContainer<KeyValue> createWriterContainer(BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        MergeTreeWriter writer = this.createMergeTreeWriter(partition, bucket, this.scanExistingFileMetas(latestSnapshotId, partition, bucket), compactExecutor);
        return new AbstractFileStoreWrite.WriterContainer<KeyValue>(writer, latestSnapshotId);
    }

    @Override
    public AbstractFileStoreWrite.WriterContainer<KeyValue> createEmptyWriterContainer(BinaryRowData partition, int bucket, ExecutorService compactExecutor) {
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        MergeTreeWriter writer = this.createMergeTreeWriter(partition, bucket, Collections.emptyList(), compactExecutor);
        return new AbstractFileStoreWrite.WriterContainer<KeyValue>(writer, latestSnapshotId);
    }

    private MergeTreeWriter createMergeTreeWriter(BinaryRowData partition, int bucket, List<DataFileMeta> restoreFiles, ExecutorService compactExecutor) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating merge tree writer for partition {} bucket {} from restored files {}", new Object[]{partition, bucket, restoreFiles});
        }
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket);
        Comparator<RowData> keyComparator = this.keyComparatorSupplier.get();
        Levels levels = new Levels(keyComparator, restoreFiles, this.options.numLevels());
        CompactManager compactManager = this.createCompactManager(partition, bucket, new UniversalCompaction(this.options.maxSizeAmplificationPercent(), this.options.sortedRunSizeRatio(), this.options.numSortedRunCompactionTrigger(), this.options.maxSortedRunNum()), compactExecutor, levels);
        return new MergeTreeWriter(this.bufferSpillable(), this.options.localSortMaxNumFileHandles(), this.ioManager, compactManager, DataFileMeta.getMaxSequenceNumber(restoreFiles), keyComparator, this.mfFactory.create(), writerFactory, this.options.commitForceCompact(), this.options.changelogProducer());
    }

    private boolean bufferSpillable() {
        try {
            return this.options.writeBufferSpillable(this.pathFactory.root().getFileSystem().getKind() != FileSystemKind.FILE_SYSTEM);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private CompactManager createCompactManager(BinaryRowData partition, int bucket, CompactStrategy compactStrategy, ExecutorService compactExecutor, Levels levels) {
        if (this.options.writeOnly()) {
            return new NoopCompactManager();
        }
        Comparator<RowData> keyComparator = this.keyComparatorSupplier.get();
        MergeTreeCompactRewriter rewriter = this.createRewriter(partition, bucket, keyComparator);
        return new MergeTreeCompactManager(compactExecutor, levels, compactStrategy, keyComparator, this.options.targetFileSize(), this.options.numSortedRunStopTrigger(), rewriter);
    }

    private MergeTreeCompactRewriter createRewriter(BinaryRowData partition, int bucket, Comparator<RowData> keyComparator) {
        KeyValueFileReaderFactory readerFactory = this.readerFactoryBuilder.build(partition, bucket);
        KeyValueFileWriterFactory writerFactory = this.writerFactoryBuilder.build(partition, bucket);
        if (this.options.changelogProducer() == CoreOptions.ChangelogProducer.FULL_COMPACTION) {
            return new FullChangelogMergeTreeCompactRewriter(this.options.numLevels() - 1, readerFactory, writerFactory, keyComparator, this.mfFactory);
        }
        return new MergeTreeCompactRewriter(readerFactory, writerFactory, keyComparator, this.mfFactory);
    }
}

