/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.common.table.log;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.SchemeAwareFSDataInputStream;
import org.apache.hudi.common.fs.TimedFSDataInputStream;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormatVersion;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
import org.apache.hudi.common.table.log.block.HoodieCorruptBlock;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.CorruptedLogFileException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class HoodieLogFileReader
implements HoodieLogFormat.Reader {
    public static final int DEFAULT_BUFFER_SIZE = 0x1000000;
    private static final int BLOCK_SCAN_READ_BUFFER_SIZE = 0x100000;
    private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
    private final FSDataInputStream inputStream;
    private final HoodieLogFile logFile;
    private final byte[] magicBuffer = new byte[6];
    private final Schema readerSchema;
    private final String keyField;
    private boolean readBlockLazily;
    private long reverseLogFilePosition;
    private long lastReverseLogFilePosition;
    private boolean reverseReader;
    private boolean enableInlineReading;
    private boolean closed = false;
    private transient Thread shutdownThread = null;

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily) throws IOException {
        this(fs, logFile, readerSchema, bufferSize, readBlockLazily, false);
    }

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader) throws IOException {
        this(fs, logFile, readerSchema, bufferSize, readBlockLazily, reverseReader, false, "_hoodie_record_key");
    }

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema, int bufferSize, boolean readBlockLazily, boolean reverseReader, boolean enableInlineReading, String keyField) throws IOException {
        FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), bufferSize);
        this.logFile = logFile;
        this.inputStream = this.getFSDataInputStream(fsDataInputStream, fs, bufferSize);
        this.readerSchema = readerSchema;
        this.readBlockLazily = readBlockLazily;
        this.reverseReader = reverseReader;
        this.enableInlineReading = enableInlineReading;
        this.keyField = keyField;
        if (this.reverseReader) {
            this.reverseLogFilePosition = this.lastReverseLogFilePosition = logFile.getFileSize();
        }
        this.addShutDownHook();
    }

    public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSchema) throws IOException {
        this(fs, logFile, readerSchema, 0x1000000, false, false);
    }

    private FSDataInputStream getFSDataInputStream(FSDataInputStream fsDataInputStream, FileSystem fs, int bufferSize) {
        if (FSUtils.isGCSFileSystem(fs)) {
            return new SchemeAwareFSDataInputStream((InputStream)this.getFSDataInputStreamForGCS(fsDataInputStream, bufferSize), true);
        }
        if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
            return new TimedFSDataInputStream(this.logFile.getPath(), new FSDataInputStream((InputStream)new BufferedFSInputStream((FSInputStream)fsDataInputStream.getWrappedStream(), bufferSize)));
        }
        return fsDataInputStream;
    }

    private FSDataInputStream getFSDataInputStreamForGCS(FSDataInputStream fsDataInputStream, int bufferSize) {
        if (fsDataInputStream.getWrappedStream() instanceof FSInputStream) {
            return new TimedFSDataInputStream(this.logFile.getPath(), new FSDataInputStream((InputStream)new BufferedFSInputStream((FSInputStream)fsDataInputStream.getWrappedStream(), bufferSize)));
        }
        if (fsDataInputStream.getWrappedStream() instanceof FSDataInputStream && ((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream() instanceof FSInputStream) {
            FSInputStream inputStream = (FSInputStream)((FSDataInputStream)fsDataInputStream.getWrappedStream()).getWrappedStream();
            return new TimedFSDataInputStream(this.logFile.getPath(), new FSDataInputStream((InputStream)new BufferedFSInputStream(inputStream, bufferSize)));
        }
        return fsDataInputStream;
    }

    @Override
    public HoodieLogFile getLogFile() {
        return this.logFile;
    }

    private void addShutDownHook() {
        this.shutdownThread = new Thread(() -> {
            try {
                this.close();
            }
            catch (Exception e) {
                LOG.warn((Object)("unable to close input stream for log file " + this.logFile), (Throwable)e);
            }
        });
        Runtime.getRuntime().addShutdownHook(this.shutdownThread);
    }

    private HoodieLogBlock readBlock() throws IOException {
        int blocksize;
        HoodieLogBlock.HoodieLogBlockType blockType = null;
        Map<HoodieLogBlock.HeaderMetadataType, String> header = null;
        try {
            blocksize = (int)this.inputStream.readLong();
        }
        catch (EOFException | CorruptedLogFileException e) {
            return this.createCorruptBlock();
        }
        boolean isCorrupted = this.isBlockCorrupt(blocksize);
        if (isCorrupted) {
            return this.createCorruptBlock();
        }
        HoodieLogFormat.LogFormatVersion nextBlockVersion = this.readVersion();
        if (nextBlockVersion.getVersion() != 0) {
            int type = this.inputStream.readInt();
            ValidationUtils.checkArgument(type < HoodieLogBlock.HoodieLogBlockType.values().length, "Invalid block byte type found " + type);
            blockType = HoodieLogBlock.HoodieLogBlockType.values()[type];
        }
        if (nextBlockVersion.hasHeader()) {
            header = HoodieLogBlock.getLogMetadata((DataInputStream)this.inputStream);
        }
        int contentLength = blocksize;
        if (nextBlockVersion.getVersion() != 0) {
            contentLength = (int)this.inputStream.readLong();
        }
        long contentPosition = this.inputStream.getPos();
        byte[] content = HoodieLogBlock.readOrSkipContent(this.inputStream, contentLength, this.readBlockLazily);
        Map<HoodieLogBlock.HeaderMetadataType, String> footer = null;
        if (nextBlockVersion.hasFooter()) {
            footer = HoodieLogBlock.getLogMetadata((DataInputStream)this.inputStream);
        }
        long logBlockLength = 0L;
        if (nextBlockVersion.hasLogBlockLength()) {
            logBlockLength = this.inputStream.readLong();
        }
        long blockEndPos = this.inputStream.getPos();
        switch (Objects.requireNonNull(blockType)) {
            case AVRO_DATA_BLOCK: {
                if (nextBlockVersion.getVersion() == 0) {
                    return HoodieAvroDataBlock.getBlock(content, this.readerSchema);
                }
                return new HoodieAvroDataBlock(this.logFile, this.inputStream, Option.ofNullable(content), this.readBlockLazily, contentPosition, contentLength, blockEndPos, this.readerSchema, header, footer, this.keyField);
            }
            case HFILE_DATA_BLOCK: {
                return new HoodieHFileDataBlock(this.logFile, this.inputStream, Option.ofNullable(content), this.readBlockLazily, contentPosition, contentLength, blockEndPos, this.readerSchema, header, footer, this.enableInlineReading, this.keyField);
            }
            case DELETE_BLOCK: {
                return HoodieDeleteBlock.getBlock(this.logFile, this.inputStream, Option.ofNullable(content), this.readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer);
            }
            case COMMAND_BLOCK: {
                return HoodieCommandBlock.getBlock(this.logFile, this.inputStream, Option.ofNullable(content), this.readBlockLazily, contentPosition, contentLength, blockEndPos, header, footer);
            }
        }
        throw new HoodieNotSupportedException("Unsupported Block " + (Object)((Object)blockType));
    }

    private HoodieLogBlock createCorruptBlock() throws IOException {
        LOG.info((Object)("Log " + this.logFile + " has a corrupted block at " + this.inputStream.getPos()));
        long currentPos = this.inputStream.getPos();
        long nextBlockOffset = this.scanForNextAvailableBlockOffset();
        this.inputStream.seek(currentPos);
        LOG.info((Object)("Next available block in " + this.logFile + " starts at " + nextBlockOffset));
        int corruptedBlockSize = (int)(nextBlockOffset - currentPos);
        long contentPosition = this.inputStream.getPos();
        byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(this.inputStream, corruptedBlockSize, this.readBlockLazily);
        return HoodieCorruptBlock.getBlock(this.logFile, this.inputStream, Option.ofNullable(corruptedBytes), this.readBlockLazily, contentPosition, corruptedBlockSize, corruptedBlockSize, new HashMap<HoodieLogBlock.HeaderMetadataType, String>(), new HashMap<HoodieLogBlock.HeaderMetadataType, String>());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean isBlockCorrupt(int blocksize) throws IOException {
        long currentPos = this.inputStream.getPos();
        try {
            this.inputStream.seek(currentPos + (long)blocksize);
        }
        catch (EOFException e) {
            LOG.info((Object)("Found corrupted block in file " + this.logFile + " with block size(" + blocksize + ") running past EOF"));
            this.inputStream.seek(currentPos);
            return true;
        }
        this.inputStream.seek(this.inputStream.getPos() - 8L);
        long blockSizeFromFooter = this.inputStream.readLong() - (long)this.magicBuffer.length;
        if ((long)blocksize != blockSizeFromFooter) {
            LOG.info((Object)("Found corrupted block in file " + this.logFile + ". Header block size(" + blocksize + ") did not match the footer block size(" + blockSizeFromFooter + ")"));
            this.inputStream.seek(currentPos);
            return true;
        }
        try {
            this.readMagic();
            boolean bl = false;
            return bl;
        }
        catch (CorruptedLogFileException e) {
            LOG.info((Object)("Found corrupted block in file " + this.logFile + ". No magic hash found right after footer block size entry"));
            boolean bl = true;
            return bl;
        }
        finally {
            this.inputStream.seek(currentPos);
        }
    }

    private long scanForNextAvailableBlockOffset() throws IOException {
        byte[] dataBuf = new byte[0x100000];
        boolean eof = false;
        while (true) {
            long currentPos = this.inputStream.getPos();
            try {
                Arrays.fill(dataBuf, (byte)0);
                this.inputStream.readFully(dataBuf, 0, dataBuf.length);
            }
            catch (EOFException e) {
                eof = true;
            }
            long pos = Bytes.indexOf((byte[])dataBuf, (byte[])HoodieLogFormat.MAGIC);
            if (pos >= 0L) {
                return currentPos + pos;
            }
            if (eof) {
                return this.inputStream.getPos();
            }
            this.inputStream.seek(currentPos + (long)dataBuf.length - (long)HoodieLogFormat.MAGIC.length);
        }
    }

    @Override
    public void close() throws IOException {
        if (!this.closed) {
            this.inputStream.close();
            if (null != this.shutdownThread) {
                Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
            }
            this.closed = true;
        }
    }

    @Override
    public boolean hasNext() {
        try {
            return this.readMagic();
        }
        catch (IOException e) {
            throw new HoodieIOException("IOException when reading logfile " + this.logFile, e);
        }
    }

    private HoodieLogFormat.LogFormatVersion readVersion() throws IOException {
        return new HoodieLogFormatVersion(this.inputStream.readInt());
    }

    private boolean readMagic() throws IOException {
        try {
            boolean hasMagic = this.hasNextMagic();
            if (!hasMagic) {
                throw new CorruptedLogFileException(this.logFile + " could not be read. Did not find the magic bytes at the start of the block");
            }
            return hasMagic;
        }
        catch (EOFException e) {
            return false;
        }
    }

    private boolean hasNextMagic() throws IOException {
        this.inputStream.readFully(this.magicBuffer, 0, 6);
        return Arrays.equals(this.magicBuffer, HoodieLogFormat.MAGIC);
    }

    @Override
    public HoodieLogBlock next() {
        try {
            return this.readBlock();
        }
        catch (IOException io) {
            throw new HoodieIOException("IOException when reading logblock from log file " + this.logFile, io);
        }
    }

    @Override
    public boolean hasPrev() {
        try {
            if (!this.reverseReader) {
                throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
            }
            this.reverseLogFilePosition = this.lastReverseLogFilePosition;
            this.reverseLogFilePosition -= 8L;
            this.lastReverseLogFilePosition = this.reverseLogFilePosition;
            this.inputStream.seek(this.reverseLogFilePosition);
        }
        catch (Exception e) {
            return false;
        }
        return true;
    }

    @Override
    public HoodieLogBlock prev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        long blockSize = this.inputStream.readLong();
        long blockEndPos = this.inputStream.getPos();
        try {
            this.inputStream.seek(this.reverseLogFilePosition - blockSize);
        }
        catch (Exception e) {
            this.inputStream.seek(blockEndPos);
            throw new CorruptedLogFileException("Found possible corrupted block, cannot read log file in reverse, fallback to forward reading of logfile");
        }
        boolean hasNext = this.hasNext();
        this.reverseLogFilePosition -= blockSize;
        this.lastReverseLogFilePosition = this.reverseLogFilePosition;
        return this.next();
    }

    public long moveToPrev() throws IOException {
        if (!this.reverseReader) {
            throw new HoodieNotSupportedException("Reverse log reader has not been enabled");
        }
        this.inputStream.seek(this.lastReverseLogFilePosition);
        long blockSize = this.inputStream.readLong();
        this.inputStream.seek(this.reverseLogFilePosition - blockSize);
        this.reverseLogFilePosition -= blockSize;
        this.lastReverseLogFilePosition = this.reverseLogFilePosition;
        return this.reverseLogFilePosition;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Remove not supported for HoodieLogFileReader");
    }
}

