/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.table.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

public abstract class AbstractTableWrite<T>
implements TableWrite {
    private final FileStoreWrite<T> write;
    private final SinkRecordConverter recordConverter;
    protected final Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers;
    private final ExecutorService compactExecutor;
    private boolean overwrite = false;

    protected AbstractTableWrite(FileStoreWrite<T> write, SinkRecordConverter recordConverter) {
        this.write = write;
        this.recordConverter = recordConverter;
        this.writers = new HashMap<BinaryRowData, Map<Integer, RecordWriter<T>>>();
        this.compactExecutor = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new ExecutorThreadFactory("compaction-thread"));
    }

    @Override
    public TableWrite withOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
        return this;
    }

    @Override
    public SinkRecordConverter recordConverter() {
        return this.recordConverter;
    }

    @Override
    public SinkRecord write(RowData rowData) throws Exception {
        SinkRecord record = this.recordConverter.convert(rowData);
        RecordWriter<T> writer = this.getWriter(record.partition(), record.bucket());
        this.writeSinkRecord(record, writer);
        return record;
    }

    @Override
    public List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception {
        ArrayList<FileCommittable> result = new ArrayList<FileCommittable>();
        Iterator<Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>>> partIter = this.writers.entrySet().iterator();
        while (partIter.hasNext()) {
            Map.Entry<BinaryRowData, Map<Integer, RecordWriter<T>>> partEntry = partIter.next();
            BinaryRowData partition = partEntry.getKey();
            Iterator<Map.Entry<Integer, RecordWriter<T>>> bucketIter = partEntry.getValue().entrySet().iterator();
            while (bucketIter.hasNext()) {
                Map.Entry<Integer, RecordWriter<T>> entry = bucketIter.next();
                int bucket = entry.getKey();
                RecordWriter<T> writer = entry.getValue();
                FileCommittable committable = new FileCommittable(partition, bucket, writer.prepareCommit(endOfInput));
                result.add(committable);
                if (!committable.increment().newFiles().isEmpty()) continue;
                writer.close();
                bucketIter.remove();
            }
            if (!partEntry.getValue().isEmpty()) continue;
            partIter.remove();
        }
        return result;
    }

    @Override
    public void close() throws Exception {
        for (Map<Integer, RecordWriter<T>> bucketWriters : this.writers.values()) {
            for (RecordWriter<T> writer : bucketWriters.values()) {
                writer.close();
            }
        }
        this.writers.clear();
        this.compactExecutor.shutdownNow();
    }

    @VisibleForTesting
    public Map<BinaryRowData, Map<Integer, RecordWriter<T>>> writers() {
        return this.writers;
    }

    protected abstract void writeSinkRecord(SinkRecord var1, RecordWriter<T> var2) throws Exception;

    private RecordWriter<T> getWriter(BinaryRowData partition, int bucket) {
        Map<Integer, RecordWriter<T>> buckets = this.writers.get(partition);
        if (buckets == null) {
            buckets = new HashMap<Integer, RecordWriter<T>>();
            this.writers.put(partition.copy(), buckets);
        }
        return buckets.computeIfAbsent(bucket, k -> this.createWriter(partition.copy(), bucket));
    }

    private RecordWriter<T> createWriter(BinaryRowData partition, int bucket) {
        RecordWriter<T> writer = this.overwrite ? this.write.createEmptyWriter(partition.copy(), bucket, this.compactExecutor) : this.write.createWriter(partition.copy(), bucket, this.compactExecutor);
        this.notifyNewWriter(writer);
        return writer;
    }

    protected void notifyNewWriter(RecordWriter<T> writer) {
    }
}

