package org.apache.nifi.controller.scheduling;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.tasks.ConnectableTask;
import org.apache.nifi.controller.tasks.ReportingTaskWrapper;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.processor.exception.ProcessException;
import org.quartz.CronExpression;

/* loaded from: input_file:org/apache/nifi/controller/scheduling/QuartzSchedulingAgent.class */
public class QuartzSchedulingAgent extends AbstractTimeBasedSchedulingAgent {
    private final Map<Object, Map<Integer, ScheduledFuture<?>>> quartzFutures;

    public QuartzSchedulingAgent(FlowController flowController, FlowEngine flowEngine, RepositoryContextFactory repositoryContextFactory) {
        super(flowEngine, flowController, repositoryContextFactory);
        this.quartzFutures = new HashMap();
    }

    public void shutdown() {
        this.quartzFutures.values().forEach(map -> {
            map.values().forEach(scheduledFuture -> {
                if (scheduledFuture.isCancelled()) {
                    return;
                }
                scheduledFuture.cancel(true);
            });
        });
        this.flowEngine.shutdown();
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public void doSchedule(final ReportingTaskNode reportingTaskNode, final LifecycleState lifecycleState) {
        final Map<Integer, ScheduledFuture<?>> computeIfAbsent = this.quartzFutures.computeIfAbsent(reportingTaskNode, obj -> {
            return new HashMap();
        });
        if (!computeIfAbsent.values().isEmpty()) {
            throw new IllegalStateException("Cannot schedule " + reportingTaskNode.getReportingTask().getIdentifier() + " because it is already scheduled to run");
        }
        String schedulingPeriod = reportingTaskNode.getSchedulingPeriod();
        try {
            final CronExpression cronExpression = new CronExpression(schedulingPeriod);
            final ReportingTaskWrapper reportingTaskWrapper = new ReportingTaskWrapper(reportingTaskNode, lifecycleState, this.flowController.getExtensionManager());
            final Date timeAfter = cronExpression.getTimeAfter(new Date());
            computeIfAbsent.put(0, this.flowEngine.schedule(new Runnable() { // from class: org.apache.nifi.controller.scheduling.QuartzSchedulingAgent.1
                private Date nextSchedule;

                {
                    this.nextSchedule = timeAfter;
                }

                @Override // java.lang.Runnable
                public void run() {
                    reportingTaskWrapper.run();
                    this.nextSchedule = QuartzSchedulingAgent.getNextSchedule(this.nextSchedule, cronExpression);
                    long delay = QuartzSchedulingAgent.getDelay(this.nextSchedule);
                    QuartzSchedulingAgent.this.logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{reportingTaskNode, this.nextSchedule, Long.valueOf(delay)});
                    ScheduledFuture schedule = QuartzSchedulingAgent.this.flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
                    lifecycleState.replaceFuture((ScheduledFuture) computeIfAbsent.put(0, schedule), schedule);
                }
            }, timeAfter.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
            lifecycleState.setScheduled(true);
            lifecycleState.setFutures(computeIfAbsent.values());
            this.logger.info("Scheduled Reporting Task {} to run threads on schedule {}", reportingTaskNode, schedulingPeriod);
        } catch (Exception e) {
            throw new IllegalStateException("Cannot schedule Reporting Task " + reportingTaskNode.getReportingTask().getIdentifier() + " to run because its scheduling period is not valid");
        }
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public synchronized void doSchedule(final Connectable connectable, final LifecycleState lifecycleState) {
        final Map<Integer, ScheduledFuture<?>> computeIfAbsent = this.quartzFutures.computeIfAbsent(connectable, obj -> {
            return new HashMap();
        });
        if (!computeIfAbsent.values().isEmpty()) {
            throw new IllegalStateException("Cannot schedule " + connectable + " because it is already scheduled to run");
        }
        String evaluateParameters = connectable.evaluateParameters(connectable.getSchedulingPeriod());
        try {
            final CronExpression cronExpression = new CronExpression(evaluateParameters);
            for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) {
                final ConnectableTask connectableTask = new ConnectableTask(this, connectable, this.flowController, this.contextFactory, lifecycleState);
                final AtomicInteger atomicInteger = new AtomicInteger(i);
                final Date timeAfter = cronExpression.getTimeAfter(new Date());
                computeIfAbsent.put(Integer.valueOf(atomicInteger.get()), this.flowEngine.schedule(new Runnable() { // from class: org.apache.nifi.controller.scheduling.QuartzSchedulingAgent.2
                    private Date nextSchedule;

                    {
                        this.nextSchedule = timeAfter;
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            connectableTask.invoke();
                            this.nextSchedule = QuartzSchedulingAgent.getNextSchedule(this.nextSchedule, cronExpression);
                            long delay = QuartzSchedulingAgent.getDelay(this.nextSchedule);
                            QuartzSchedulingAgent.this.logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", new Object[]{connectable, this.nextSchedule, Long.valueOf(delay)});
                            ScheduledFuture schedule = QuartzSchedulingAgent.this.flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS);
                            lifecycleState.replaceFuture((ScheduledFuture) computeIfAbsent.put(Integer.valueOf(atomicInteger.get()), schedule), schedule);
                        } catch (RuntimeException e) {
                            throw e;
                        } catch (Exception e2) {
                            throw new ProcessException(e2);
                        }
                    }
                }, timeAfter.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
            }
            lifecycleState.setFutures(computeIfAbsent.values());
            this.logger.info("Scheduled {} to run with {} threads on schedule {}", new Object[]{connectable, Integer.valueOf(connectable.getMaxConcurrentTasks()), evaluateParameters});
        } catch (Exception e) {
            throw new IllegalStateException("Cannot schedule " + connectable + " to run because its scheduling period is not valid");
        }
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public synchronized void doUnschedule(Connectable connectable, LifecycleState lifecycleState) {
        unschedule((Object) connectable, lifecycleState);
    }

    @Override // org.apache.nifi.controller.scheduling.AbstractSchedulingAgent
    public synchronized void doUnschedule(ReportingTaskNode reportingTaskNode, LifecycleState lifecycleState) {
        unschedule((Object) reportingTaskNode, lifecycleState);
    }

    private void unschedule(Object obj, LifecycleState lifecycleState) {
        this.quartzFutures.remove(obj);
        lifecycleState.getFutures().forEach(scheduledFuture -> {
            if (scheduledFuture.isCancelled()) {
                return;
            }
            scheduledFuture.cancel(false);
        });
        lifecycleState.setScheduled(false);
        this.logger.info("Stopped scheduling {} to run", obj);
    }

    public void onEvent(Connectable connectable) {
    }

    public void setMaxThreadCount(int i) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Date getNextSchedule(Date date, CronExpression cronExpression) {
        Date date2 = new Date();
        return cronExpression.getTimeAfter(date2.after(date) ? date2 : date);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static long getDelay(Date date) {
        return Math.max(date.getTime() - System.currentTimeMillis(), 0L);
    }
}
