package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.RunnableWithException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/TaskManagerCheckInSlotManagerTest.class */
public class TaskManagerCheckInSlotManagerTest extends TestLogger {
    private static final ResourceID resourceID = ResourceID.generate();
    private static final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
    private static final SlotID slotId = new SlotID(resourceID, 0);
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0d, 1);
    private static final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
    private static final SlotReport slotReport = new SlotReport(slotStatus);
    private final AtomicReference<CompletableFuture<Boolean>> canBeReleasedFuture = new AtomicReference<>();
    private final TaskExecutorGateway taskExecutorGateway;
    private final TaskExecutorConnection taskManagerConnection;
    private CompletableFuture<InstanceID> releaseFuture;
    private ResourceActions resourceManagerActions;
    private ManuallyTriggeredScheduledExecutor mainThreadExecutor;
    private final AtomicInteger allocateResourceCalls;
    private final AtomicInteger releaseResourceCalls;

    public TaskManagerCheckInSlotManagerTest() {
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        AtomicReference<CompletableFuture<Boolean>> atomicReference = this.canBeReleasedFuture;
        atomicReference.getClass();
        this.taskExecutorGateway = testingTaskExecutorGatewayBuilder.setCanBeReleasedSupplier(atomicReference::get).createTestingTaskExecutorGateway();
        this.taskManagerConnection = new TaskExecutorConnection(resourceID, this.taskExecutorGateway);
        this.allocateResourceCalls = new AtomicInteger(0);
        this.releaseResourceCalls = new AtomicInteger(0);
    }

    @Before
    public void setup() {
        this.canBeReleasedFuture.set(new CompletableFuture<>());
        this.releaseFuture = new CompletableFuture<>();
        this.allocateResourceCalls.getAndSet(0);
        this.releaseResourceCalls.getAndSet(0);
        this.resourceManagerActions = new TestingResourceActionsBuilder().setReleaseResourceConsumer((instanceID, exc) -> {
            this.releaseFuture.complete(instanceID);
            this.releaseResourceCalls.incrementAndGet();
        }).setAllocateResourceConsumer(workerResourceSpec -> {
            this.allocateResourceCalls.incrementAndGet();
        }).build();
        this.mainThreadExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testTaskManagerTimeout() throws Exception {
        checkTaskManagerTimeout(0);
    }

    @Test
    public void testTaskManagerTimeoutWithRedundantTaskManager() throws Exception {
        checkTaskManagerTimeout(1);
    }

    @Test
    public void testTaskManagerTimeoutWithZeroRedundantTaskManager() throws Exception {
        registerAndCheckMultiTaskManagers(0);
        Assert.assertThat(Integer.valueOf(this.allocateResourceCalls.get()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(this.releaseResourceCalls.get()), Matchers.is(1));
    }

    @Test
    public void testTaskManagerTimeoutWithOneRedundantTaskManager() throws Exception {
        registerAndCheckMultiTaskManagers(1);
        Assert.assertThat(Integer.valueOf(this.allocateResourceCalls.get()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(this.releaseResourceCalls.get()), Matchers.is(1));
    }

    @Test
    public void testTaskManagerTimeoutWithTwoRedundantTaskManager() throws Exception {
        registerAndCheckMultiTaskManagers(2);
        Assert.assertThat(Integer.valueOf(this.allocateResourceCalls.get()), Matchers.is(0));
        Assert.assertThat(Integer.valueOf(this.releaseResourceCalls.get()), Matchers.is(0));
    }

    @Test
    public void testTaskManagerTimeoutWithThreeRedundantTaskManager() throws Exception {
        registerAndCheckMultiTaskManagers(3);
        Assert.assertThat(Integer.valueOf(this.allocateResourceCalls.get()), Matchers.is(1));
        Assert.assertThat(Integer.valueOf(this.releaseResourceCalls.get()), Matchers.is(0));
    }

    @Test
    public void testTaskManagerIsNotReleasedBeforeItCanBe() throws Exception {
        SlotManagerImpl createAndStartSlotManagerWithTM = createAndStartSlotManagerWithTM();
        Throwable th = null;
        try {
            checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(createAndStartSlotManagerWithTM, false);
            verifyTmReleased(false);
            checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(createAndStartSlotManagerWithTM, true);
            verifyTmReleased(true);
            if (createAndStartSlotManagerWithTM != null) {
                if (0 == 0) {
                    createAndStartSlotManagerWithTM.close();
                    return;
                }
                try {
                    createAndStartSlotManagerWithTM.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndStartSlotManagerWithTM != null) {
                if (0 != 0) {
                    try {
                        createAndStartSlotManagerWithTM.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndStartSlotManagerWithTM.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testTaskManagerIsNotReleasedInCaseOfConcurrentAllocation() throws Exception {
        SlotManagerImpl createAndStartSlotManagerWithTM = createAndStartSlotManagerWithTM();
        Throwable th = null;
        try {
            checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(createAndStartSlotManagerWithTM, true, () -> {
                AllocationID allocationID = new AllocationID();
                createAndStartSlotManagerWithTM.registerSlotRequest(new SlotRequest(new JobID(), allocationID, resourceProfile, "foobar"));
                this.mainThreadExecutor.triggerAll();
                createAndStartSlotManagerWithTM.freeSlot(slotId, allocationID);
            });
            verifyTmReleased(false);
            checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(createAndStartSlotManagerWithTM, true);
            verifyTmReleased(true);
            if (createAndStartSlotManagerWithTM != null) {
                if (0 == 0) {
                    createAndStartSlotManagerWithTM.close();
                    return;
                }
                try {
                    createAndStartSlotManagerWithTM.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndStartSlotManagerWithTM != null) {
                if (0 != 0) {
                    try {
                        createAndStartSlotManagerWithTM.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndStartSlotManagerWithTM.close();
                }
            }
            throw th3;
        }
    }

    private void checkTaskManagerTimeout(int i) throws Exception {
        this.canBeReleasedFuture.set(CompletableFuture.completedFuture(true));
        SlotManagerImpl buildAndStartWithDirectExec = SlotManagerBuilder.newBuilder().setTaskManagerTimeout(Time.milliseconds(10L)).setRedundantTaskManagerNum(i).buildAndStartWithDirectExec(resourceManagerId, this.resourceManagerActions);
        Throwable th = null;
        try {
            try {
                buildAndStartWithDirectExec.registerTaskManager(this.taskManagerConnection, slotReport);
                Assert.assertThat(this.releaseFuture.get(), Matchers.is(Matchers.equalTo(this.taskManagerConnection.getInstanceID())));
                if (buildAndStartWithDirectExec != null) {
                    if (0 == 0) {
                        buildAndStartWithDirectExec.close();
                        return;
                    }
                    try {
                        buildAndStartWithDirectExec.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (buildAndStartWithDirectExec != null) {
                if (th != null) {
                    try {
                        buildAndStartWithDirectExec.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    buildAndStartWithDirectExec.close();
                }
            }
            throw th4;
        }
    }

    private void registerAndCheckMultiTaskManagers(int i) throws Exception {
        SlotManagerImpl createAndStartSlotManager = createAndStartSlotManager(i, 2);
        registerTaskManagerWithTwoSlots(createAndStartSlotManager, true, true);
        registerTaskManagerWithTwoSlots(createAndStartSlotManager, false, false);
        registerTaskManagerWithTwoSlots(createAndStartSlotManager, false, true);
        registerTaskManagerWithTwoSlots(createAndStartSlotManager, true, false);
        checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(createAndStartSlotManager, true);
    }

    private void registerTaskManagerWithTwoSlots(SlotManagerImpl slotManagerImpl, boolean z, boolean z2) {
        this.canBeReleasedFuture.set(new CompletableFuture<>());
        ResourceID generate = ResourceID.generate();
        ResourceProfile fromResources = ResourceProfile.fromResources(1.0d, 1);
        JobID jobID = new JobID();
        SlotID slotID = new SlotID(generate, 0);
        SlotStatus slotStatus2 = z ? new SlotStatus(slotID, fromResources) : new SlotStatus(slotID, fromResources, jobID, new AllocationID());
        SlotID slotID2 = new SlotID(generate, 1);
        SlotReport slotReport2 = new SlotReport(Arrays.asList(slotStatus2, z2 ? new SlotStatus(slotID2, fromResources) : new SlotStatus(slotID2, fromResources, jobID, new AllocationID())));
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        AtomicReference<CompletableFuture<Boolean>> atomicReference = this.canBeReleasedFuture;
        atomicReference.getClass();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(generate, testingTaskExecutorGatewayBuilder.setCanBeReleasedSupplier(atomicReference::get).createTestingTaskExecutorGateway());
        this.mainThreadExecutor.execute(() -> {
            slotManagerImpl.registerTaskManager(taskExecutorConnection, slotReport2);
        });
    }

    private SlotManagerImpl createAndStartSlotManagerWithTM() {
        SlotManagerImpl createAndStartSlotManager = createAndStartSlotManager(0, 1);
        this.mainThreadExecutor.execute(() -> {
            createAndStartSlotManager.registerTaskManager(this.taskManagerConnection, slotReport);
        });
        return createAndStartSlotManager;
    }

    private SlotManagerImpl createAndStartSlotManager(int i, int i2) {
        SlotManagerImpl build = SlotManagerBuilder.newBuilder().setScheduledExecutor(this.mainThreadExecutor).setTaskManagerTimeout(Time.milliseconds(0L)).setRedundantTaskManagerNum(i).setNumSlotsPerWorker(i2).build();
        build.start(resourceManagerId, this.mainThreadExecutor, this.resourceManagerActions);
        return build;
    }

    private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(SlotManagerImpl slotManagerImpl, boolean z) throws Exception {
        checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(slotManagerImpl, z, () -> {
        });
    }

    private void checkTaskManagerTimeoutWithCustomCanBeReleasedResponse(SlotManagerImpl slotManagerImpl, boolean z, RunnableWithException runnableWithException) throws Exception {
        this.canBeReleasedFuture.set(new CompletableFuture<>());
        ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor = this.mainThreadExecutor;
        slotManagerImpl.getClass();
        manuallyTriggeredScheduledExecutor.execute(slotManagerImpl::checkTaskManagerTimeoutsAndRedundancy);
        this.mainThreadExecutor.triggerAll();
        runnableWithException.run();
        this.canBeReleasedFuture.get().complete(Boolean.valueOf(z));
        this.mainThreadExecutor.triggerAll();
    }

    private void verifyTmReleased(boolean z) {
        Assert.assertThat(Boolean.valueOf(this.releaseFuture.isDone()), Matchers.is(Boolean.valueOf(z)));
        if (z) {
            Assert.assertThat(this.releaseFuture.join(), Matchers.is(Matchers.equalTo(this.taskManagerConnection.getInstanceID())));
        }
    }
}
