package org.apache.flink.runtime.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/CoGroupTaskTest.class */
public class CoGroupTaskTest extends DriverTestBase<CoGroupFunction<Record, Record, Record>> {
    private static final long SORT_MEM = 3145728;
    private final RecordComparator comparator1;
    private final RecordComparator comparator2;
    private final DriverTestBase.CountingOutputCollector output;

    /* loaded from: input_file:org/apache/flink/runtime/operators/CoGroupTaskTest$MockDelayingCoGroupStub.class */
    public static final class MockDelayingCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1;

        public void coGroup(Iterable<Record> iterable, Iterable<Record> iterable2, Collector<Record> collector) {
            for (Record record : iterable) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
            }
            for (Record record2 : iterable2) {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e2) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/CoGroupTaskTest$MockFailingCoGroupStub.class */
    public static class MockFailingCoGroupStub extends RichCoGroupFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1;
        private int cnt = 0;

        public void coGroup(Iterable<Record> iterable, Iterable<Record> iterable2, Collector<Record> collector) {
            int i = 0;
            for (Record record : iterable) {
                i++;
            }
            for (Record record2 : iterable2) {
                if (i == 0) {
                    int i2 = this.cnt + 1;
                    this.cnt = i2;
                    if (i2 >= 10) {
                        throw new ExpectedTestException();
                    }
                    collector.collect(record2);
                } else {
                    for (int i3 = 0; i3 < i; i3++) {
                        int i4 = this.cnt + 1;
                        this.cnt = i4;
                        if (i4 >= 10) {
                            throw new ExpectedTestException();
                        }
                        collector.collect(record2);
                    }
                }
            }
        }
    }

    public CoGroupTaskTest(ExecutionConfig executionConfig) {
        super(executionConfig, 0L, 2, SORT_MEM);
        this.comparator1 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        this.comparator2 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        this.output = new DriverTestBase.CountingOutputCollector();
    }

    @Test
    public void testSortBoth1CoGroupTask() {
        int min = (2 * 1 * Math.min(100, 200)) + (100 > 200 ? (100 - 200) * 2 : (200 - 100) * 1);
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInputSorted(new UniformRecordGenerator(100, 2, false), this.comparator1.m534duplicate());
            addInputSorted(new UniformRecordGenerator(200, 1, false), this.comparator2.m534duplicate());
            testDriver(coGroupDriver, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", min, this.output.getNumberOfRecords());
    }

    @Test
    public void testSortBoth2CoGroupTask() {
        int min = (2 * 4 * Math.min(200, 200)) + (200 > 200 ? (200 - 200) * 2 : (200 - 200) * 4);
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInputSorted(new UniformRecordGenerator(200, 2, false), this.comparator1.m534duplicate());
            addInputSorted(new UniformRecordGenerator(200, 4, false), this.comparator2.m534duplicate());
            testDriver(coGroupDriver, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", min, this.output.getNumberOfRecords());
    }

    @Test
    public void testSortFirstCoGroupTask() {
        int min = (2 * 4 * Math.min(200, 200)) + (200 > 200 ? (200 - 200) * 2 : (200 - 200) * 4);
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInputSorted(new UniformRecordGenerator(200, 2, false), this.comparator1.m534duplicate());
            addInput(new UniformRecordGenerator(200, 4, true));
            testDriver(coGroupDriver, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", min, this.output.getNumberOfRecords());
    }

    @Test
    public void testSortSecondCoGroupTask() {
        int min = (2 * 4 * Math.min(200, 200)) + (200 > 200 ? (200 - 200) * 2 : (200 - 200) * 4);
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInput(new UniformRecordGenerator(200, 2, true));
            addInputSorted(new UniformRecordGenerator(200, 4, false), this.comparator2.m534duplicate());
            testDriver(coGroupDriver, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", min, this.output.getNumberOfRecords());
    }

    @Test
    public void testMergeCoGroupTask() {
        int min = (2 * 4 * Math.min(200, 200)) + (200 > 200 ? (200 - 200) * 2 : (200 - 200) * 4);
        setOutput(this.output);
        addInput(new UniformRecordGenerator(200, 2, true));
        addInput(new UniformRecordGenerator(200, 4, true));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        try {
            testDriver(new CoGroupDriver(), CoGroupTaskExternalITCase.MockCoGroupStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", min, this.output.getNumberOfRecords());
    }

    @Test
    public void testFailingSortCoGroupTask() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(100, 2, true));
        addInput(new UniformRecordGenerator(200, 1, true));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        try {
            testDriver(new CoGroupDriver(), MockFailingCoGroupStub.class);
            Assert.fail("Function exception was not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
    }

    @Test
    public void testCancelCoGroupTaskWhileSorting1() {
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        final CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInputSorted(new DelayingInfinitiveInputIterator(1000), this.comparator1.m534duplicate());
            addInput(new UniformRecordGenerator(10, 2, true));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CoGroupTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CoGroupTaskTest.this.testDriver(coGroupDriver, CoGroupTaskExternalITCase.MockCoGroupStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e2) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
    }

    @Test
    public void testCancelCoGroupTaskWhileSorting2() {
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        final CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInput(new UniformRecordGenerator(10, 2, true));
            addInputSorted(new DelayingInfinitiveInputIterator(1000), this.comparator2.m534duplicate());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CoGroupTaskTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CoGroupTaskTest.this.testDriver(coGroupDriver, CoGroupTaskExternalITCase.MockCoGroupStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e2) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
    }

    @Test
    public void testCancelCoGroupTaskWhileCoGrouping() {
        setOutput(this.output);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        final CoGroupDriver coGroupDriver = new CoGroupDriver();
        try {
            addInput(new UniformRecordGenerator(100, 5, true));
            addInput(new UniformRecordGenerator(100, 5, true));
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CoGroupTaskTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CoGroupTaskTest.this.testDriver(coGroupDriver, MockDelayingCoGroupStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e2) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
    }
}
