package org.springframework.xd.dirt.server;

import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.util.Assert;
import org.springframework.xd.dirt.cluster.ContainerMatcher;
import org.springframework.xd.dirt.container.store.ContainerRepository;
import org.springframework.xd.dirt.job.JobFactory;
import org.springframework.xd.dirt.module.ModuleDefinitionRepository;
import org.springframework.xd.dirt.stream.JobDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamDefinitionRepository;
import org.springframework.xd.dirt.stream.StreamFactory;
import org.springframework.xd.dirt.zookeeper.Paths;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnection;
import org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener;
import org.springframework.xd.dirt.zookeeper.ZooKeeperUtils;
import org.springframework.xd.module.options.ModuleOptionsMetadataResolver;

/* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor.class */
public class DeploymentSupervisor implements ApplicationListener<ApplicationEvent>, DisposableBean {
    private static final Logger logger = LoggerFactory.getLogger(DeploymentSupervisor.class);
    private final ZooKeeperConnection zkConnection;
    private final ContainerRepository containerRepository;
    private final StreamDefinitionRepository streamDefinitionRepository;
    private final JobDefinitionRepository jobDefinitionRepository;
    private final ModuleDefinitionRepository moduleDefinitionRepository;
    private final ModuleOptionsMetadataResolver moduleOptionsMetadataResolver;
    private volatile ApplicationContext applicationContext;
    private final ContainerMatcher containerMatcher;
    private volatile LeaderSelector leaderSelector;
    private final LeaderSelectorListener leaderListener = new LeaderListener();
    private final ConnectionListener connectionListener = new ConnectionListener();
    private final ExecutorService executorService = Executors.newSingleThreadExecutor(ThreadUtils.newThreadFactory("DeploymentSupervisorCacheListener"));
    private final DeploymentUnitStateCalculator stateCalculator;

    /* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor$ConnectionListener.class */
    private class ConnectionListener implements ZooKeeperConnectionListener {
        private ConnectionListener() {
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onConnect(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection established", DeploymentSupervisor.this.getId());
            DeploymentSupervisor.this.requestLeadership(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onResume(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection resumed", DeploymentSupervisor.this.getId());
            DeploymentSupervisor.this.requestLeadership(curatorFramework);
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onDisconnect(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection terminated", DeploymentSupervisor.this.getId());
            try {
                DeploymentSupervisor.this.destroy();
            } catch (Exception e) {
                DeploymentSupervisor.logger.warn("exception occurred while closing leader selector", e);
            }
        }

        @Override // org.springframework.xd.dirt.zookeeper.ZooKeeperConnectionListener
        public void onSuspend(CuratorFramework curatorFramework) {
            DeploymentSupervisor.logger.info("Admin {} connection suspended", DeploymentSupervisor.this.getId());
            try {
                DeploymentSupervisor.this.destroy();
            } catch (Exception e) {
                DeploymentSupervisor.logger.warn("exception occurred while closing leader selector", e);
            }
        }
    }

    /* loaded from: input_file:org/springframework/xd/dirt/server/DeploymentSupervisor$LeaderListener.class */
    class LeaderListener extends LeaderSelectorListenerAdapter {
        LeaderListener() {
        }

        public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
            DeploymentSupervisor.logger.info("Leader Admin {} is watching for stream/job deployment requests.", DeploymentSupervisor.this.getId());
            cleanupDeployments(curatorFramework);
            PathChildrenCache pathChildrenCache = null;
            PathChildrenCache pathChildrenCache2 = null;
            PathChildrenCache pathChildrenCache3 = null;
            PathChildrenCache pathChildrenCache4 = null;
            try {
                try {
                    StreamFactory streamFactory = new StreamFactory(DeploymentSupervisor.this.streamDefinitionRepository, DeploymentSupervisor.this.moduleDefinitionRepository, DeploymentSupervisor.this.moduleOptionsMetadataResolver);
                    JobFactory jobFactory = new JobFactory(DeploymentSupervisor.this.jobDefinitionRepository, DeploymentSupervisor.this.moduleDefinitionRepository, DeploymentSupervisor.this.moduleOptionsMetadataResolver);
                    String build = Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.REQUESTED);
                    Paths.ensurePath(curatorFramework, build);
                    Paths.ensurePath(curatorFramework, Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.ALLOCATED));
                    pathChildrenCache4 = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, build);
                    pathChildrenCache4.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                    StreamDeploymentListener streamDeploymentListener = new StreamDeploymentListener(DeploymentSupervisor.this.zkConnection, pathChildrenCache4, DeploymentSupervisor.this.containerRepository, streamFactory, DeploymentSupervisor.this.containerMatcher, DeploymentSupervisor.this.stateCalculator);
                    pathChildrenCache2 = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, Paths.STREAM_DEPLOYMENTS);
                    pathChildrenCache2.getListenable().addListener(streamDeploymentListener);
                    pathChildrenCache2.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                    streamDeploymentListener.recalculateStreamStates(curatorFramework, pathChildrenCache2);
                    JobDeploymentListener jobDeploymentListener = new JobDeploymentListener(DeploymentSupervisor.this.zkConnection, pathChildrenCache4, DeploymentSupervisor.this.containerRepository, jobFactory, DeploymentSupervisor.this.containerMatcher, DeploymentSupervisor.this.stateCalculator);
                    pathChildrenCache3 = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, Paths.JOB_DEPLOYMENTS);
                    pathChildrenCache3.getListenable().addListener(jobDeploymentListener);
                    pathChildrenCache3.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
                    jobDeploymentListener.recalculateJobStates(curatorFramework, pathChildrenCache3);
                    ContainerListener containerListener = new ContainerListener(DeploymentSupervisor.this.zkConnection, DeploymentSupervisor.this.containerRepository, streamFactory, jobFactory, pathChildrenCache2, pathChildrenCache3, pathChildrenCache4, DeploymentSupervisor.this.containerMatcher, DeploymentSupervisor.this.stateCalculator);
                    pathChildrenCache = DeploymentSupervisor.this.instantiatePathChildrenCache(curatorFramework, Paths.CONTAINERS);
                    pathChildrenCache.getListenable().addListener(containerListener);
                    pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
                    Thread.sleep(Long.MAX_VALUE);
                    if (pathChildrenCache != null) {
                        pathChildrenCache.close();
                    }
                    if (pathChildrenCache2 != null) {
                        pathChildrenCache2.close();
                    }
                    if (pathChildrenCache3 != null) {
                        pathChildrenCache3.close();
                    }
                    if (pathChildrenCache4 != null) {
                        pathChildrenCache4.close();
                    }
                } catch (InterruptedException e) {
                    DeploymentSupervisor.logger.info("Leadership canceled due to thread interrupt");
                    Thread.currentThread().interrupt();
                    if (pathChildrenCache != null) {
                        pathChildrenCache.close();
                    }
                    if (pathChildrenCache2 != null) {
                        pathChildrenCache2.close();
                    }
                    if (pathChildrenCache3 != null) {
                        pathChildrenCache3.close();
                    }
                    if (pathChildrenCache4 != null) {
                        pathChildrenCache4.close();
                    }
                }
            } catch (Throwable th) {
                if (pathChildrenCache != null) {
                    pathChildrenCache.close();
                }
                if (pathChildrenCache2 != null) {
                    pathChildrenCache2.close();
                }
                if (pathChildrenCache3 != null) {
                    pathChildrenCache3.close();
                }
                if (pathChildrenCache4 != null) {
                    pathChildrenCache4.close();
                }
                throw th;
            }
        }

        private void cleanupDeployments(CuratorFramework curatorFramework) throws Exception {
            HashSet hashSet = new HashSet();
            try {
                hashSet.addAll((Collection) curatorFramework.getChildren().forPath(Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.ALLOCATED)));
                hashSet.removeAll((Collection) curatorFramework.getChildren().forPath(Paths.build(Paths.CONTAINERS)));
            } catch (KeeperException.NoNodeException e) {
            }
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                try {
                    curatorFramework.delete().deletingChildrenIfNeeded().forPath(Paths.build(Paths.MODULE_DEPLOYMENTS, Paths.ALLOCATED, (String) it.next()));
                } catch (KeeperException.NoNodeException e2) {
                }
            }
        }
    }

    public DeploymentSupervisor(ZooKeeperConnection zooKeeperConnection, ContainerRepository containerRepository, StreamDefinitionRepository streamDefinitionRepository, JobDefinitionRepository jobDefinitionRepository, ModuleDefinitionRepository moduleDefinitionRepository, ModuleOptionsMetadataResolver moduleOptionsMetadataResolver, ContainerMatcher containerMatcher, DeploymentUnitStateCalculator deploymentUnitStateCalculator) {
        Assert.notNull(zooKeeperConnection, "ZooKeeperConnection must not be null");
        Assert.notNull(containerRepository, "ContainerRepository must not be null");
        Assert.notNull(streamDefinitionRepository, "StreamDefinitionRepository must not be null");
        Assert.notNull(moduleDefinitionRepository, "ModuleDefinitionRepository must not be null");
        Assert.notNull(moduleOptionsMetadataResolver, "moduleOptionsMetadataResolver must not be null");
        Assert.notNull(containerMatcher, "containerMatcher must not be null");
        Assert.notNull(deploymentUnitStateCalculator, "stateCalculator must not be null");
        this.zkConnection = zooKeeperConnection;
        this.containerRepository = containerRepository;
        this.streamDefinitionRepository = streamDefinitionRepository;
        this.jobDefinitionRepository = jobDefinitionRepository;
        this.moduleDefinitionRepository = moduleDefinitionRepository;
        this.moduleOptionsMetadataResolver = moduleOptionsMetadataResolver;
        this.containerMatcher = containerMatcher;
        this.stateCalculator = deploymentUnitStateCalculator;
    }

    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        if (applicationEvent instanceof ContextRefreshedEvent) {
            this.applicationContext = ((ContextRefreshedEvent) applicationEvent).getApplicationContext();
            if (this.zkConnection.isConnected()) {
                requestLeadership(this.zkConnection.getClient());
            }
            this.zkConnection.addListener(this.connectionListener);
            return;
        }
        if (!(applicationEvent instanceof ContextStoppedEvent) || this.leaderSelector == null) {
            return;
        }
        this.leaderSelector.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getId() {
        return this.applicationContext.getId();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void requestLeadership(CuratorFramework curatorFramework) {
        try {
            Paths.ensurePath(curatorFramework, Paths.MODULE_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.STREAM_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.JOB_DEPLOYMENTS);
            Paths.ensurePath(curatorFramework, Paths.CONTAINERS);
            Paths.ensurePath(curatorFramework, Paths.STREAMS);
            Paths.ensurePath(curatorFramework, Paths.JOBS);
            if (this.leaderSelector == null) {
                this.leaderSelector = new LeaderSelector(curatorFramework, Paths.build(Paths.ADMINS), this.leaderListener);
                this.leaderSelector.setId(getId());
                this.leaderSelector.start();
            }
        } catch (Exception e) {
            throw ZooKeeperUtils.wrapThrowable(e);
        }
    }

    public void destroy() {
        if (this.leaderSelector != null) {
            this.leaderSelector.close();
            this.leaderSelector = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PathChildrenCache instantiatePathChildrenCache(CuratorFramework curatorFramework, String str) {
        return new PathChildrenCache(curatorFramework, str, true, false, this.executorService);
    }
}
