/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.file.schema;

import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.DataType;
import org.apache.flink.table.store.file.schema.RowDataType;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.AtomicFileWriter;
import org.apache.flink.table.store.file.utils.FileUtils;
import org.apache.flink.table.store.file.utils.JsonSerdeUtil;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

public class SchemaManager
implements Serializable {
    private static final String SCHEMA_PREFIX = "schema-";
    private final Path tableRoot;
    @Nullable
    private transient Lock lock;

    public SchemaManager(Path tableRoot) {
        this.tableRoot = tableRoot;
    }

    public SchemaManager withLock(@Nullable Lock lock) {
        this.lock = lock;
        return this;
    }

    public Optional<TableSchema> latest() {
        try {
            return FileUtils.listVersionedFiles(this.schemaDirectory(), SCHEMA_PREFIX).reduce(Math::max).map(this::schema);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public List<TableSchema> listAll() {
        return this.listAllIds().stream().map(this::schema).collect(Collectors.toList());
    }

    public List<Long> listAllIds() {
        try {
            return FileUtils.listVersionedFiles(this.schemaDirectory(), SCHEMA_PREFIX).collect(Collectors.toList());
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public TableSchema commitNewVersion(UpdateSchema updateSchema) throws Exception {
        int highestFieldId;
        List<DataField> fields;
        long id;
        TableSchema newSchema;
        boolean success;
        RowType rowType = updateSchema.rowType();
        List<String> partitionKeys = updateSchema.partitionKeys();
        List<String> primaryKeys = updateSchema.primaryKeys();
        Map<String, String> options = updateSchema.options();
        do {
            Optional<TableSchema> latest;
            if ((latest = this.latest()).isPresent()) {
                TableSchema oldTableSchema = latest.get();
                Preconditions.checkArgument(oldTableSchema.primaryKeys().equals(primaryKeys), "Primary key modification is not supported, old primaryKeys is %s, new primaryKeys is %s", oldTableSchema.primaryKeys(), primaryKeys);
                if (!updateSchema.rowType().getFields().equals(oldTableSchema.logicalRowType().getFields()) || !updateSchema.partitionKeys().equals(oldTableSchema.partitionKeys())) {
                    throw new UnsupportedOperationException("TODO: support update field types and partition keys. ");
                }
                fields = oldTableSchema.fields();
                id = oldTableSchema.id() + 1L;
                highestFieldId = oldTableSchema.highestFieldId();
                continue;
            }
            fields = TableSchema.newFields(rowType);
            highestFieldId = TableSchema.currentHighestFieldId(fields);
            id = 0L;
        } while (!(success = this.commit(newSchema = new TableSchema(id, fields, highestFieldId, partitionKeys, primaryKeys, options, updateSchema.comment()))));
        return newSchema;
    }

    public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
        HashMap<String, String> newOptions;
        AtomicInteger highestFieldId;
        ArrayList<DataField> newFields;
        TableSchema schema;
        TableSchema newSchema;
        boolean success;
        do {
            schema = this.latest().orElseThrow(() -> new RuntimeException("Table not exists: " + this.tableRoot));
            newOptions = new HashMap<String, String>(schema.options());
            newFields = new ArrayList<DataField>(schema.fields());
            highestFieldId = new AtomicInteger(schema.highestFieldId());
            for (SchemaChange change : changes) {
                SchemaChange update;
                if (change instanceof SchemaChange.SetOption) {
                    SchemaChange.SetOption setOption = (SchemaChange.SetOption)change;
                    this.checkAlterTableOption(setOption.key());
                    newOptions.put(setOption.key(), setOption.value());
                    continue;
                }
                if (change instanceof SchemaChange.RemoveOption) {
                    SchemaChange.RemoveOption removeOption = (SchemaChange.RemoveOption)change;
                    this.checkAlterTableOption(removeOption.key());
                    newOptions.remove(removeOption.key());
                    continue;
                }
                if (change instanceof SchemaChange.AddColumn) {
                    SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn)change;
                    int id = highestFieldId.incrementAndGet();
                    DataType dataType = TableSchema.toDataType(addColumn.logicalType(), highestFieldId);
                    newFields.add(new DataField(id, addColumn.fieldName(), dataType, addColumn.description()));
                    continue;
                }
                if (change instanceof SchemaChange.UpdateColumnType) {
                    update = (SchemaChange.UpdateColumnType)change;
                    this.updateColumn(newFields, ((SchemaChange.UpdateColumnType)update).fieldName(), arg_0 -> SchemaManager.lambda$commitChanges$1((SchemaChange.UpdateColumnType)update, arg_0));
                    continue;
                }
                if (change instanceof SchemaChange.UpdateColumnNullability) {
                    update = (SchemaChange.UpdateColumnNullability)change;
                    if (((SchemaChange.UpdateColumnNullability)update).fieldNames().length == 1 && ((SchemaChange.UpdateColumnNullability)update).newNullability() && schema.primaryKeys().contains(((SchemaChange.UpdateColumnNullability)update).fieldNames()[0])) {
                        throw new UnsupportedOperationException("Cannot change nullability of primary key");
                    }
                    this.updateNestedColumn(newFields, ((SchemaChange.UpdateColumnNullability)update).fieldNames(), 0, arg_0 -> SchemaManager.lambda$commitChanges$2((SchemaChange.UpdateColumnNullability)update, arg_0));
                    continue;
                }
                if (change instanceof SchemaChange.UpdateColumnComment) {
                    update = (SchemaChange.UpdateColumnComment)change;
                    this.updateNestedColumn(newFields, ((SchemaChange.UpdateColumnComment)update).fieldNames(), 0, arg_0 -> SchemaManager.lambda$commitChanges$3((SchemaChange.UpdateColumnComment)update, arg_0));
                    continue;
                }
                throw new UnsupportedOperationException("Unsupported change: " + change.getClass());
            }
        } while (!(success = this.commit(newSchema = new TableSchema(schema.id() + 1L, newFields, highestFieldId.get(), schema.partitionKeys(), schema.primaryKeys(), newOptions, schema.comment()))));
        return newSchema;
    }

    private void updateNestedColumn(List<DataField> newFields, String[] updateFieldNames, int index, Function<DataField, DataField> updateFunc) {
        boolean found = false;
        for (int i = 0; i < newFields.size(); ++i) {
            DataField field = newFields.get(i);
            if (!field.name().equals(updateFieldNames[index])) continue;
            found = true;
            if (index == updateFieldNames.length - 1) {
                newFields.set(i, updateFunc.apply(field));
                break;
            }
            assert (field.type() instanceof RowDataType);
            this.updateNestedColumn(((RowDataType)field.type()).fields(), updateFieldNames, index + 1, updateFunc);
        }
        if (!found) {
            throw new RuntimeException("Can not find column: " + Arrays.asList(updateFieldNames));
        }
    }

    private void updateColumn(List<DataField> newFields, String updateFieldName, Function<DataField, DataField> updateFunc) {
        this.updateNestedColumn(newFields, new String[]{updateFieldName}, 0, updateFunc);
    }

    private boolean commit(TableSchema newSchema) throws Exception {
        Path schemaPath = this.toSchemaPath(newSchema.id());
        FileSystem fs = schemaPath.getFileSystem();
        Callable<Boolean> callable = () -> {
            if (fs.exists(schemaPath)) {
                return false;
            }
            return AtomicFileWriter.writeFileUtf8(schemaPath, newSchema.toString());
        };
        if (this.lock == null) {
            return callable.call();
        }
        return this.lock.runWithLock(callable);
    }

    public TableSchema schema(long id) {
        try {
            return JsonSerdeUtil.fromJson(FileUtils.readFileUtf8(this.toSchemaPath(id)), TableSchema.class);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private Path schemaDirectory() {
        return new Path(this.tableRoot + "/schema");
    }

    private Path toSchemaPath(long id) {
        return new Path(this.tableRoot + "/schema/" + SCHEMA_PREFIX + id);
    }

    private void checkAlterTableOption(String key) {
        if (CoreOptions.BUCKET_KEY.key().equals(key) || CoreOptions.WRITE_MODE.key().equals(key)) {
            throw new UnsupportedOperationException(String.format("Change %s is not supported yet.", key));
        }
    }

    private static /* synthetic */ DataField lambda$commitChanges$3(SchemaChange.UpdateColumnComment update, DataField field) {
        return new DataField(field.id(), field.name(), field.type(), update.newDescription());
    }

    private static /* synthetic */ DataField lambda$commitChanges$2(SchemaChange.UpdateColumnNullability update, DataField field) {
        return new DataField(field.id(), field.name(), field.type().copy(update.newNullability()), field.description());
    }

    private static /* synthetic */ DataField lambda$commitChanges$1(SchemaChange.UpdateColumnType update, DataField field) {
        AtomicInteger dummyId = new AtomicInteger(0);
        DataType newType = TableSchema.toDataType(update.newLogicalType(), new AtomicInteger(0));
        if (dummyId.get() != 0) {
            throw new RuntimeException(String.format("Update column to nested row type '%s' is not supported.", update.newLogicalType()));
        }
        return new DataField(field.id(), field.name(), newType);
    }
}

