/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.table;

import java.util.Collections;
import java.util.List;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.mergetree.compact.ValueCountMergeFunction;
import org.apache.flink.table.store.file.operation.KeyValueFileStoreScan;
import org.apache.flink.table.store.file.predicate.Predicate;
import org.apache.flink.table.store.file.schema.AtomicDataType;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReader;
import org.apache.flink.table.store.table.AbstractFileStoreTable;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.sink.TableWriteImpl;
import org.apache.flink.table.store.table.source.AbstractDataTableScan;
import org.apache.flink.table.store.table.source.KeyValueTableRead;
import org.apache.flink.table.store.table.source.MergeTreeSplitGenerator;
import org.apache.flink.table.store.table.source.SplitGenerator;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.ValueCountRowDataRecordIterator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public class ChangelogValueCountFileStoreTable
extends AbstractFileStoreTable {
    private static final long serialVersionUID = 1L;
    private transient KeyValueFileStore lazyStore;

    ChangelogValueCountFileStoreTable(Path path, TableSchema tableSchema) {
        super(path, tableSchema);
    }

    @Override
    protected FileStoreTable copy(TableSchema newTableSchema) {
        return new ChangelogValueCountFileStoreTable(this.path, newTableSchema);
    }

    public KeyValueFileStore store() {
        if (this.lazyStore == null) {
            ValueCountTableKeyValueFieldsExtractor extractor = ValueCountTableKeyValueFieldsExtractor.EXTRACTOR;
            RowType countType = RowDataType.toRowType(false, extractor.valueFields(this.tableSchema));
            this.lazyStore = new KeyValueFileStore(this.schemaManager(), this.tableSchema.id(), new CoreOptions(this.tableSchema.options()), this.tableSchema.logicalPartitionType(), this.tableSchema.logicalBucketKeyType(), RowDataType.toRowType(false, extractor.keyFields(this.tableSchema)), countType, extractor, ValueCountMergeFunction.factory());
        }
        return this.lazyStore;
    }

    @Override
    public AbstractDataTableScan newScan() {
        final KeyValueFileStoreScan scan = this.store().newScan();
        return new AbstractDataTableScan(scan, this.tableSchema, this.store().pathFactory(), this.options()){

            @Override
            protected SplitGenerator splitGenerator(FileStorePathFactory pathFactory) {
                return new MergeTreeSplitGenerator(ChangelogValueCountFileStoreTable.this.store().newKeyComparator(), ChangelogValueCountFileStoreTable.this.store().options().splitTargetSize(), ChangelogValueCountFileStoreTable.this.store().options().splitOpenFileCost());
            }

            @Override
            protected void withNonPartitionFilter(Predicate predicate) {
                scan.withKeyFilter(predicate);
            }
        };
    }

    @Override
    public TableRead newRead() {
        return new KeyValueTableRead(this.store().newRead()){

            @Override
            public TableRead withFilter(Predicate predicate) {
                this.read.withFilter(predicate);
                return this;
            }

            @Override
            public TableRead withProjection(int[][] projection) {
                this.read.withKeyProjection(projection);
                return this;
            }

            @Override
            protected RecordReader.RecordIterator<RowData> rowDataRecordIteratorFromKv(RecordReader.RecordIterator<KeyValue> kvRecordIterator) {
                return new ValueCountRowDataRecordIterator(kvRecordIterator);
            }
        };
    }

    @Override
    public TableWrite newWrite(String commitUser) {
        KeyValue kv = new KeyValue();
        return new TableWriteImpl<KeyValue>(this.store().newWrite(commitUser), new SinkRecordConverter(this.tableSchema), record -> {
            switch (record.row().getRowKind()) {
                case INSERT: 
                case UPDATE_AFTER: {
                    kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(1L));
                    break;
                }
                case UPDATE_BEFORE: 
                case DELETE: {
                    kv.replace(record.row(), RowKind.INSERT, GenericRowData.of(-1L));
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("Unknown row kind " + (Object)((Object)record.row().getRowKind()));
                }
            }
            return kv;
        });
    }

    static class ValueCountTableKeyValueFieldsExtractor
    implements KeyValueFieldsExtractor {
        private static final long serialVersionUID = 1L;
        static final ValueCountTableKeyValueFieldsExtractor EXTRACTOR = new ValueCountTableKeyValueFieldsExtractor();

        private ValueCountTableKeyValueFieldsExtractor() {
        }

        @Override
        public List<DataField> keyFields(TableSchema schema) {
            return schema.fields();
        }

        @Override
        public List<DataField> valueFields(TableSchema schema) {
            return Collections.singletonList(new DataField(0, "_VALUE_COUNT", new AtomicDataType(new BigIntType(false))));
        }
    }
}

