/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.wmassigners;

import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;

public class ProcTimeMiniBatchAssignerOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData>,
ProcessingTimeService.ProcessingTimeCallback {
    private static final long serialVersionUID = 1L;
    private final long intervalMs;
    private transient long currentWatermark;

    public ProcTimeMiniBatchAssignerOperator(long intervalMs) {
        this.intervalMs = intervalMs;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.currentWatermark = 0L;
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        this.getProcessingTimeService().registerTimer(now + this.intervalMs, this);
        this.getRuntimeContext().getMetricGroup().gauge("currentBatch", () -> this.currentWatermark);
    }

    @Override
    public void processElement(StreamRecord<RowData> element) throws Exception {
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        long currentBatch = now - now % this.intervalMs;
        if (currentBatch > this.currentWatermark) {
            this.currentWatermark = currentBatch;
            this.output.emitWatermark(new Watermark(currentBatch));
        }
        this.output.collect(element);
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        long now = this.getProcessingTimeService().getCurrentProcessingTime();
        long currentBatch = now - now % this.intervalMs;
        if (currentBatch > this.currentWatermark) {
            this.currentWatermark = currentBatch;
            this.output.emitWatermark(new Watermark(currentBatch));
        }
        this.getProcessingTimeService().registerTimer(currentBatch + this.intervalMs, this);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (mark.getTimestamp() == Long.MAX_VALUE && this.currentWatermark != Long.MAX_VALUE) {
            this.currentWatermark = Long.MAX_VALUE;
            this.output.emitWatermark(mark);
        }
    }
}

