package org.apache.flink.table.planner.runtime.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/utils/ParallelFiniteTestSource.class */
public class ParallelFiniteTestSource<T> extends RichSourceFunction<T> implements CheckpointListener, ParallelSourceFunction<T> {
    private final Iterable<T> elements;
    private volatile transient boolean running;
    private volatile transient long currentCheckpointId;

    public ParallelFiniteTestSource(Iterable<T> iterable) {
        this.elements = iterable;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.running = true;
        this.currentCheckpointId = 0L;
    }

    public boolean isTaskMessage(int i) {
        return i % getRuntimeContext().getNumberOfParallelSubtasks() == getRuntimeContext().getIndexOfThisSubtask();
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        emitElementsAndWaitForCheckpoints(sourceContext, 2L);
        emitElementsAndWaitForCheckpoints(sourceContext, 4L);
    }

    private void emitElementsAndWaitForCheckpoints(SourceFunction.SourceContext<T> sourceContext, long j) throws InterruptedException {
        Object checkpointLock = sourceContext.getCheckpointLock();
        synchronized (checkpointLock) {
            emitRecords(sourceContext);
            while (this.running && this.currentCheckpointId < j) {
                checkpointLock.wait(1L);
            }
        }
    }

    private void emitRecords(SourceFunction.SourceContext<T> sourceContext) {
        int i = 0;
        for (T t : this.elements) {
            if (isTaskMessage(i)) {
                sourceContext.collect(t);
            }
            i++;
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.currentCheckpointId = j;
    }
}
