/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.mergetree;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.store.codegen.CodeGenUtils;
import org.apache.flink.table.store.codegen.NormalizedKeyComputer;
import org.apache.flink.table.store.codegen.RecordComparator;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueSerializer;
import org.apache.flink.table.store.file.mergetree.WriteBuffer;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.ReducerMergeFunctionWrapper;
import org.apache.flink.table.store.file.sort.BinaryExternalSortBuffer;
import org.apache.flink.table.store.file.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.store.file.sort.SortBuffer;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.MutableObjectIterator;

public class SortBufferWriteBuffer
implements WriteBuffer {
    private final RowType keyType;
    private final RowType valueType;
    private final KeyValueSerializer serializer;
    private final SortBuffer buffer;

    public SortBufferWriteBuffer(RowType keyType, RowType valueType, MemorySegmentPool memoryPool, boolean spillable, int sortMaxFan, IOManager ioManager) {
        this.keyType = keyType;
        this.valueType = valueType;
        this.serializer = new KeyValueSerializer(keyType, valueType);
        ArrayList<LogicalType> sortKeyTypes = new ArrayList<LogicalType>(keyType.getChildren());
        sortKeyTypes.add((LogicalType)new BigIntType(false));
        NormalizedKeyComputer normalizedKeyComputer = CodeGenUtils.newNormalizedKeyComputer(sortKeyTypes, "MemTableKeyComputer");
        RecordComparator keyComparator = CodeGenUtils.newRecordComparator(sortKeyTypes, "MemTableComparator");
        if (memoryPool.freePages() < 3) {
            throw new IllegalArgumentException("Write buffer requires a minimum of 3 page memory, please increase write buffer memory size.");
        }
        RowDataSerializer serializer = InternalSerializers.create((RowType)KeyValue.schema(keyType, valueType));
        BinaryInMemorySortBuffer inMemorySortBuffer = BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, (AbstractRowDataSerializer<RowData>)serializer, keyComparator, memoryPool);
        this.buffer = ioManager != null && spillable ? new BinaryExternalSortBuffer(new BinaryRowDataSerializer(serializer.getArity()), keyComparator, memoryPool.pageSize(), inMemorySortBuffer, ioManager, sortMaxFan) : inMemorySortBuffer;
    }

    @Override
    public boolean put(long sequenceNumber, RowKind valueKind, RowData key, RowData value) throws IOException {
        return this.buffer.write(this.serializer.toRow(key, sequenceNumber, valueKind, value));
    }

    @Override
    public int size() {
        return this.buffer.size();
    }

    @Override
    public long memoryOccupancy() {
        return this.buffer.getOccupancy();
    }

    @Override
    public boolean flushMemory() throws IOException {
        return this.buffer.flushMemory();
    }

    @Override
    public void forEach(Comparator<RowData> keyComparator, MergeFunction<KeyValue> mergeFunction, @Nullable WriteBuffer.KvConsumer rawConsumer, WriteBuffer.KvConsumer mergedConsumer) throws IOException {
        MergeIterator mergeIterator = new MergeIterator(rawConsumer, this.buffer.sortedIterator(), keyComparator, mergeFunction);
        while (mergeIterator.hasNext()) {
            mergedConsumer.accept(mergeIterator.next());
        }
    }

    @Override
    public void clear() {
        this.buffer.clear();
    }

    @VisibleForTesting
    SortBuffer buffer() {
        return this.buffer;
    }

    private class RawIterator
    implements Iterator<KeyValue> {
        private final MutableObjectIterator<BinaryRowData> kvIter;
        private final KeyValueSerializer current;
        private BinaryRowData currentRow;
        private boolean advanced;

        private RawIterator(MutableObjectIterator<BinaryRowData> kvIter) {
            this.kvIter = kvIter;
            this.current = new KeyValueSerializer(SortBufferWriteBuffer.this.keyType, SortBufferWriteBuffer.this.valueType);
            this.currentRow = new BinaryRowData(SortBufferWriteBuffer.this.keyType.getFieldCount() + 2 + SortBufferWriteBuffer.this.valueType.getFieldCount());
            this.advanced = false;
        }

        @Override
        public boolean hasNext() {
            if (!this.advanced) {
                this.advanceNext();
            }
            return this.currentRow != null;
        }

        @Override
        public KeyValue next() {
            if (!this.hasNext()) {
                return null;
            }
            this.advanced = false;
            return this.current.getReusedKv();
        }

        private void advanceNext() {
            try {
                this.currentRow = (BinaryRowData)this.kvIter.next((Object)this.currentRow);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (this.currentRow != null) {
                this.current.fromRow((RowData)this.currentRow);
            }
            this.advanced = true;
        }
    }

    private class MergeIterator {
        @Nullable
        private final WriteBuffer.KvConsumer rawConsumer;
        private final MutableObjectIterator<BinaryRowData> kvIter;
        private final Comparator<RowData> keyComparator;
        private final ReducerMergeFunctionWrapper mergeFunctionWrapper;
        private KeyValueSerializer previous;
        private BinaryRowData previousRow;
        private KeyValueSerializer current;
        private BinaryRowData currentRow;
        private KeyValue result;
        private boolean advanced;

        private MergeIterator(WriteBuffer.KvConsumer rawConsumer, MutableObjectIterator<BinaryRowData> kvIter, Comparator<RowData> keyComparator, MergeFunction<KeyValue> mergeFunction) throws IOException {
            this.rawConsumer = rawConsumer;
            this.kvIter = kvIter;
            this.keyComparator = keyComparator;
            this.mergeFunctionWrapper = new ReducerMergeFunctionWrapper(mergeFunction);
            int totalFieldCount = SortBufferWriteBuffer.this.keyType.getFieldCount() + 2 + SortBufferWriteBuffer.this.valueType.getFieldCount();
            this.previous = new KeyValueSerializer(SortBufferWriteBuffer.this.keyType, SortBufferWriteBuffer.this.valueType);
            this.previousRow = new BinaryRowData(totalFieldCount);
            this.current = new KeyValueSerializer(SortBufferWriteBuffer.this.keyType, SortBufferWriteBuffer.this.valueType);
            this.currentRow = new BinaryRowData(totalFieldCount);
            this.readOnce();
            this.advanced = false;
        }

        public boolean hasNext() throws IOException {
            this.advanceIfNeeded();
            return this.previousRow != null;
        }

        public KeyValue next() throws IOException {
            this.advanceIfNeeded();
            if (this.previousRow == null) {
                return null;
            }
            this.advanced = false;
            return this.result;
        }

        private void advanceIfNeeded() throws IOException {
            if (this.advanced) {
                return;
            }
            this.advanced = true;
            do {
                this.swapSerializers();
                if (this.previousRow == null) {
                    return;
                }
                this.mergeFunctionWrapper.reset();
                this.mergeFunctionWrapper.add(this.previous.getReusedKv());
                while (this.readOnce() && this.keyComparator.compare(this.previous.getReusedKv().key(), this.current.getReusedKv().key()) == 0) {
                    this.mergeFunctionWrapper.add(this.current.getReusedKv());
                    this.swapSerializers();
                }
                this.result = this.mergeFunctionWrapper.getResult();
            } while (this.result == null);
        }

        private boolean readOnce() throws IOException {
            try {
                this.currentRow = (BinaryRowData)this.kvIter.next((Object)this.currentRow);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            if (this.currentRow != null) {
                this.current.fromRow((RowData)this.currentRow);
                if (this.rawConsumer != null) {
                    this.rawConsumer.accept(this.current.getReusedKv());
                }
            }
            return this.currentRow != null;
        }

        private void swapSerializers() {
            KeyValueSerializer tmp = this.previous;
            BinaryRowData tmpRow = this.previousRow;
            this.previous = this.current;
            this.previousRow = this.currentRow;
            this.current = tmp;
            this.currentRow = tmpRow;
        }
    }
}

