package org.apache.nifi.controller.queue.clustered.partition;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.queue.BlockingSwappablePriorityQueue;
import org.apache.nifi.controller.queue.DropFlowFileAction;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.class */
public class StandardRebalancingPartition implements RebalancingPartition {
    private static final Logger logger = LoggerFactory.getLogger(StandardRebalancingPartition.class);
    private static final String SWAP_PARTITION_NAME = "rebalance";
    private final String queueIdentifier;
    private final BlockingSwappablePriorityQueue queue;
    private final LoadBalancedFlowFileQueue flowFileQueue;
    private final String description;
    private volatile boolean stopped = true;
    private RebalanceTask rebalanceTask;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition$RebalanceTask.class */
    public class RebalanceTask implements Runnable {
        private volatile boolean stopped;
        private final Set<FlowFileRecord> expiredRecords;
        private final long pollWaitMillis = 100;

        private RebalanceTask() {
            this.stopped = false;
            this.expiredRecords = new HashSet();
            this.pollWaitMillis = 100L;
        }

        public void stop() {
            this.stopped = true;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.stopped) {
                this.expiredRecords.clear();
                try {
                    FlowFileRecord poll = StandardRebalancingPartition.this.queue.poll(this.expiredRecords, -1L, 100L, PollStrategy.ALL_FLOWFILES);
                    if (poll == null) {
                        StandardRebalancingPartition.this.flowFileQueue.handleExpiredRecords(this.expiredRecords);
                        if (StandardRebalancingPartition.this.isComplete()) {
                            StandardRebalancingPartition.logger.debug("Rebalance Task completed for {}", this);
                            return;
                        }
                    } else {
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(poll);
                        arrayList.addAll(StandardRebalancingPartition.this.queue.poll(999, this.expiredRecords, -1L, PollStrategy.ALL_FLOWFILES));
                        StandardRebalancingPartition.this.flowFileQueue.handleExpiredRecords(this.expiredRecords);
                        StandardRebalancingPartition.logger.debug("{} Rebalancing {}", this, arrayList);
                        StandardRebalancingPartition.this.flowFileQueue.distributeToPartitions(arrayList);
                        StandardRebalancingPartition.this.queue.acknowledge(arrayList);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public StandardRebalancingPartition(FlowFileSwapManager flowFileSwapManager, int i, EventReporter eventReporter, LoadBalancedFlowFileQueue loadBalancedFlowFileQueue, DropFlowFileAction dropFlowFileAction) {
        this.queue = new BlockingSwappablePriorityQueue(flowFileSwapManager, i, eventReporter, loadBalancedFlowFileQueue, dropFlowFileAction, SWAP_PARTITION_NAME);
        this.queueIdentifier = loadBalancedFlowFileQueue.getIdentifier();
        this.flowFileQueue = loadBalancedFlowFileQueue;
        this.description = "RebalancingPartition[queueId=" + this.queueIdentifier + "]";
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public Optional<NodeIdentifier> getNodeIdentifier() {
        return Optional.empty();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public QueueSize size() {
        return this.queue.size();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public long getTotalActiveQueuedDuration(long j) {
        return this.queue.getTotalQueuedDuration(j);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public long getMinLastQueueDate() {
        return this.queue.getMinLastQueueDate();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public SwapSummary recoverSwappedFlowFiles() {
        return this.queue.recoverSwappedFlowFiles();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public String getSwapPartitionName() {
        return SWAP_PARTITION_NAME;
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void put(FlowFileRecord flowFileRecord) {
        this.queue.put(flowFileRecord);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void putAll(Collection<FlowFileRecord> collection) {
        this.queue.putAll(collection);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void dropFlowFiles(DropFlowFileRequest dropFlowFileRequest, String str) {
        this.queue.dropFlowFiles(dropFlowFileRequest, str);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public void setPriorities(List<FlowFilePrioritizer> list) {
        this.queue.setPriorities(list);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public synchronized void start(FlowFilePartitioner flowFilePartitioner) {
        this.stopped = false;
        rebalanceFromQueue();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public synchronized void stop() {
        this.stopped = true;
        if (this.rebalanceTask != null) {
            this.rebalanceTask.stop();
        }
        this.rebalanceTask = null;
    }

    private synchronized void rebalanceFromQueue() {
        if (this.stopped) {
            logger.debug("Will not rebalance from queue because {} is stopped", this);
            return;
        }
        if (this.rebalanceTask != null) {
            logger.debug("Rebalance Task already exists for {}", this);
            return;
        }
        this.rebalanceTask = new RebalanceTask();
        Thread thread = new Thread(this.rebalanceTask);
        thread.setName("Rebalance queued data for Connection " + this.queueIdentifier);
        thread.start();
        logger.debug("No Rebalance Task currently exists for {}. Starting new Rebalance Thread {}", this, thread);
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition
    public void rebalance(FlowFileQueueContents flowFileQueueContents) {
        if (flowFileQueueContents.getActiveFlowFiles().isEmpty() && flowFileQueueContents.getSwapLocations().isEmpty()) {
            return;
        }
        logger.debug("Adding {} to Rebalance queue for {}", flowFileQueueContents, this);
        this.queue.inheritQueueContents(flowFileQueueContents);
        rebalanceFromQueue();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition
    public void rebalance(Collection<FlowFileRecord> collection) {
        logger.debug("Adding {} to Rebalance queue for {}", collection, this);
        this.queue.putAll(collection);
        rebalanceFromQueue();
    }

    @Override // org.apache.nifi.controller.queue.clustered.partition.QueuePartition
    public FlowFileQueueContents packageForRebalance(String str) {
        return this.queue.packageForRebalance(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized boolean isComplete() {
        if (!this.queue.isEmpty()) {
            return false;
        }
        this.rebalanceTask = null;
        return true;
    }

    public String toString() {
        return this.description;
    }
}
