package org.apache.nifi.controller.queue.clustered;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.DropFlowFileRequest;
import org.apache.nifi.controller.queue.DropFlowFileState;
import org.apache.nifi.controller.queue.FlowFileQueueContents;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
import org.apache.nifi.controller.queue.PollStrategy;
import org.apache.nifi.controller.queue.QueueDiagnostics;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardQueueDiagnostics;
import org.apache.nifi.controller.queue.SwappablePriorityQueue;
import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
import org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.NonLocalPartitionPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition;
import org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition;
import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
import org.apache.nifi.controller.queue.clustered.partition.StandardRebalancingPartition;
import org.apache.nifi.controller.queue.clustered.partition.SwappablePriorityQueueLocalPartition;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.status.FlowFileAvailability;
import org.apache.nifi.controller.swap.StandardSwapSummary;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.processor.FlowFileFilter;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.reporting.Severity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.class */
public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue implements LoadBalancedFlowFileQueue {
    private static final int NODE_SWAP_THRESHOLD = 1000;
    private final List<FlowFilePrioritizer> prioritizers;
    private final ConnectionEventListener eventListener;
    private final AtomicReference<QueueSize> totalSize;
    private final LocalQueuePartition localPartition;
    private final RebalancingPartition rebalancingPartition;
    private final FlowFileSwapManager swapManager;
    private final EventReporter eventReporter;
    private final ClusterCoordinator clusterCoordinator;
    private final AsyncLoadBalanceClientRegistry clientRegistry;
    private final FlowFileRepository flowFileRepo;
    private final ProvenanceEventRepository provRepo;
    private final ContentRepository contentRepo;
    private final Set<NodeIdentifier> nodeIdentifiers;
    private final ReadWriteLock partitionLock;
    private final Lock partitionReadLock;
    private final Lock partitionWriteLock;
    private QueuePartition[] queuePartitions;
    private volatile FlowFilePartitioner partitioner;
    private boolean stopped;
    private volatile boolean offloaded;
    private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class);
    private static final Comparator<NodeIdentifier> loadBalanceEndpointComparator = Comparator.comparing((v0) -> {
        return v0.getLoadBalanceAddress();
    }).thenComparing((v0) -> {
        return v0.getLoadBalancePort();
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState = new int[NodeConnectionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[NodeConnectionState.OFFLOADING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[NodeConnectionState.CONNECTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[NodeConnectionState.OFFLOADED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[NodeConnectionState.DISCONNECTED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[NodeConnectionState.DISCONNECTING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy = new int[LoadBalanceStrategy.values().length];
            try {
                $SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy[LoadBalanceStrategy.DO_NOT_LOAD_BALANCE.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy[LoadBalanceStrategy.PARTITION_BY_ATTRIBUTE.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy[LoadBalanceStrategy.ROUND_ROBIN.ordinal()] = 3;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy[LoadBalanceStrategy.SINGLE_NODE.ordinal()] = 4;
            } catch (NoSuchFieldError e9) {
            }
        }
    }

    /* loaded from: input_file:org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue$ClusterEventListener.class */
    private class ClusterEventListener implements ClusterTopologyEventListener {
        private ClusterEventListener() {
        }

        public void onNodeAdded(NodeIdentifier nodeIdentifier) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                if (SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers.contains(nodeIdentifier)) {
                    SocketLoadBalancedFlowFileQueue.logger.debug("Node Identifier {} added to cluster but already known in set: {}", nodeIdentifier, SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                    return;
                }
                HashSet hashSet = new HashSet(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                hashSet.removeIf(nodeIdentifier2 -> {
                    return nodeIdentifier2.getId().equals(nodeIdentifier.getId());
                });
                hashSet.add(nodeIdentifier);
                SocketLoadBalancedFlowFileQueue.logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", new Object[]{nodeIdentifier, SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, hashSet});
                SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(hashSet, false);
            } finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }

        public void onNodeRemoved(NodeIdentifier nodeIdentifier) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                HashSet hashSet = new HashSet(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                if (hashSet.remove(nodeIdentifier)) {
                    SocketLoadBalancedFlowFileQueue.logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", new Object[]{nodeIdentifier, SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, hashSet});
                    SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(hashSet, false);
                    SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
                }
            } finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }

        public void onLocalNodeIdentifierSet(NodeIdentifier nodeIdentifier) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            if (nodeIdentifier == null) {
                return;
            }
            try {
                if (!SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers.contains(nodeIdentifier)) {
                    HashSet hashSet = new HashSet(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                    hashSet.add(nodeIdentifier);
                    SocketLoadBalancedFlowFileQueue.logger.debug("Local Node Identifier has now been determined to be {}. Adding to set of Node Identifiers for {}", nodeIdentifier, SocketLoadBalancedFlowFileQueue.this);
                    SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(hashSet, false);
                }
                SocketLoadBalancedFlowFileQueue.logger.debug("Local Node Identifier set to {}; current partitions = {}", nodeIdentifier, SocketLoadBalancedFlowFileQueue.this.queuePartitions);
                QueuePartition[] queuePartitionArr = SocketLoadBalancedFlowFileQueue.this.queuePartitions;
                int length = queuePartitionArr.length;
                int i = 0;
                while (true) {
                    if (i >= length) {
                        break;
                    }
                    QueuePartition queuePartition = queuePartitionArr[i];
                    Optional<NodeIdentifier> nodeIdentifier2 = queuePartition.getNodeIdentifier();
                    if (nodeIdentifier2.isPresent() && nodeIdentifier2.get().equals(nodeIdentifier)) {
                        if (!(queuePartition instanceof LocalQueuePartition)) {
                            SocketLoadBalancedFlowFileQueue.logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions", new Object[]{SocketLoadBalancedFlowFileQueue.this, nodeIdentifier, queuePartition});
                            HashSet hashSet2 = new HashSet(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers);
                            hashSet2.add(nodeIdentifier);
                            SocketLoadBalancedFlowFileQueue.this.setNodeIdentifiers(hashSet2, true);
                            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
                            return;
                        }
                        SocketLoadBalancedFlowFileQueue.logger.debug("{} Local Node Identifier set to {} and QueuePartition with this identifier is already a Local Queue Partition", SocketLoadBalancedFlowFileQueue.this, nodeIdentifier);
                    }
                    i++;
                }
                SocketLoadBalancedFlowFileQueue.logger.debug("{} Local Node Identifier set to {} but found no Queue Partition with that Node Identifier.", SocketLoadBalancedFlowFileQueue.this, nodeIdentifier);
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            } finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }

        public void onNodeStateChange(NodeIdentifier nodeIdentifier, NodeConnectionState nodeConnectionState) {
            SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.lock();
            try {
                if (!SocketLoadBalancedFlowFileQueue.this.offloaded) {
                    switch (AnonymousClass2.$SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[nodeConnectionState.ordinal()]) {
                        case 1:
                            onNodeRemoved(nodeIdentifier);
                            break;
                        case 2:
                            onNodeAdded(nodeIdentifier);
                            break;
                    }
                } else {
                    switch (AnonymousClass2.$SwitchMap$org$apache$nifi$cluster$coordination$node$NodeConnectionState[nodeConnectionState.ordinal()]) {
                        case 1:
                        case 3:
                        case 4:
                        case 5:
                            onNodeRemoved(nodeIdentifier);
                            break;
                        case 2:
                            if (nodeIdentifier != null && nodeIdentifier.equals(SocketLoadBalancedFlowFileQueue.this.clusterCoordinator.getLocalNodeIdentifier())) {
                                SocketLoadBalancedFlowFileQueue.this.resetOffloadedQueue();
                                break;
                            }
                            break;
                    }
                }
            } finally {
                SocketLoadBalancedFlowFileQueue.this.partitionWriteLock.unlock();
            }
        }
    }

    public SocketLoadBalancedFlowFileQueue(String str, ConnectionEventListener connectionEventListener, ProcessScheduler processScheduler, FlowFileRepository flowFileRepository, ProvenanceEventRepository provenanceEventRepository, ContentRepository contentRepository, ResourceClaimManager resourceClaimManager, ClusterCoordinator clusterCoordinator, AsyncLoadBalanceClientRegistry asyncLoadBalanceClientRegistry, FlowFileSwapManager flowFileSwapManager, int i, EventReporter eventReporter) {
        super(str, processScheduler, flowFileRepository, provenanceEventRepository, resourceClaimManager);
        this.prioritizers = new ArrayList();
        this.totalSize = new AtomicReference<>(new QueueSize(0, 0L));
        this.partitionLock = new ReentrantReadWriteLock();
        this.partitionReadLock = this.partitionLock.readLock();
        this.partitionWriteLock = this.partitionLock.writeLock();
        this.stopped = true;
        this.offloaded = false;
        this.eventListener = connectionEventListener;
        this.eventReporter = eventReporter;
        this.swapManager = flowFileSwapManager;
        this.flowFileRepo = flowFileRepository;
        this.provRepo = provenanceEventRepository;
        this.contentRepo = contentRepository;
        this.clusterCoordinator = clusterCoordinator;
        this.clientRegistry = asyncLoadBalanceClientRegistry;
        this.localPartition = new SwappablePriorityQueueLocalPartition(flowFileSwapManager, i, eventReporter, this, this::drop);
        this.rebalancingPartition = new StandardRebalancingPartition(flowFileSwapManager, i, eventReporter, this, this::drop);
        this.nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : new TreeSet<>(loadBalanceEndpointComparator);
        if (clusterCoordinator != null) {
            this.nodeIdentifiers.addAll(clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[0]));
        }
        ArrayList<NodeIdentifier> arrayList = new ArrayList(this.nodeIdentifiers);
        arrayList.sort(Comparator.comparing((v0) -> {
            return v0.getApiAddress();
        }));
        if (arrayList.isEmpty()) {
            this.queuePartitions = new QueuePartition[]{this.localPartition};
        } else {
            ArrayList arrayList2 = new ArrayList();
            NodeIdentifier localNodeIdentifier = clusterCoordinator.getLocalNodeIdentifier();
            for (NodeIdentifier nodeIdentifier : arrayList) {
                if (nodeIdentifier.equals(localNodeIdentifier)) {
                    arrayList2.add(this.localPartition);
                } else {
                    arrayList2.add(createRemotePartition(nodeIdentifier));
                }
            }
            if (!arrayList2.contains(this.localPartition)) {
                arrayList2.add(this.localPartition);
            }
            this.queuePartitions = (QueuePartition[]) arrayList2.toArray(new QueuePartition[0]);
        }
        this.partitioner = new LocalPartitionPartitioner();
        if (clusterCoordinator != null) {
            clusterCoordinator.registerEventListener(new ClusterEventListener());
        }
        this.rebalancingPartition.start(this.partitioner);
    }

    @Override // org.apache.nifi.controller.queue.AbstractFlowFileQueue
    public synchronized void setLoadBalanceStrategy(LoadBalanceStrategy loadBalanceStrategy, String str) {
        LoadBalanceStrategy loadBalanceStrategy2 = getLoadBalanceStrategy();
        String partitioningAttribute = getPartitioningAttribute();
        super.setLoadBalanceStrategy(loadBalanceStrategy, str);
        if ((loadBalanceStrategy == loadBalanceStrategy2 && Objects.equals(str, partitioningAttribute)) || this.clusterCoordinator == null || this.offloaded) {
            return;
        }
        setFlowFilePartitioner(getPartitionerForLoadBalancingStrategy(loadBalanceStrategy, str));
    }

    private FlowFilePartitioner getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy loadBalanceStrategy, String str) {
        FlowFilePartitioner firstNodePartitioner;
        switch (AnonymousClass2.$SwitchMap$org$apache$nifi$controller$queue$LoadBalanceStrategy[loadBalanceStrategy.ordinal()]) {
            case 1:
                firstNodePartitioner = new LocalPartitionPartitioner();
                break;
            case 2:
                firstNodePartitioner = new CorrelationAttributePartitioner(str);
                break;
            case 3:
                firstNodePartitioner = new RoundRobinPartitioner();
                break;
            case 4:
                firstNodePartitioner = new FirstNodePartitioner();
                break;
            default:
                throw new IllegalArgumentException();
        }
        return firstNodePartitioner;
    }

    public void offloadQueue() {
        NodeIdentifier nodeIdentifier;
        NodeConnectionStatus connectionStatus;
        if (this.clusterCoordinator == null) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Setting queue {} on node {} as offloaded. Current size: {}, Partition Sizes: {}", new Object[]{this, this.clusterCoordinator.getLocalNodeIdentifier(), size(), getPartitionSizes()});
        }
        this.offloaded = true;
        this.partitionWriteLock.lock();
        try {
            HashSet hashSet = new HashSet();
            for (QueuePartition queuePartition : this.queuePartitions) {
                Optional<NodeIdentifier> nodeIdentifier2 = queuePartition.getNodeIdentifier();
                if (nodeIdentifier2.isPresent() && (connectionStatus = this.clusterCoordinator.getConnectionStatus((nodeIdentifier = nodeIdentifier2.get()))) != null && connectionStatus.getState() == NodeConnectionState.CONNECTED) {
                    hashSet.add(nodeIdentifier);
                }
            }
            if (!hashSet.isEmpty()) {
                setNodeIdentifiers(hashSet, false);
            }
            setFlowFilePartitioner(new NonLocalPartitionPartitioner());
            if (logger.isDebugEnabled()) {
                logger.debug("Queue {} has now updated Partition on node {} for offload. Current size: {}, Partition Sizes: {}", new Object[]{this, this.clusterCoordinator.getLocalNodeIdentifier(), size(), getPartitionSizes()});
            }
        } finally {
            this.partitionWriteLock.unlock();
        }
    }

    private Map<QueuePartition, QueueSize> getPartitionSizes() {
        this.partitionReadLock.lock();
        try {
            HashMap hashMap = new HashMap();
            for (QueuePartition queuePartition : this.queuePartitions) {
                hashMap.put(queuePartition, queuePartition.size());
            }
            return hashMap;
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    public void resetOffloadedQueue() {
        if (this.clusterCoordinator != null && this.offloaded) {
            this.offloaded = false;
            logger.debug("Queue {} on node {} was previously offloaded, resetting offloaded status to {}", new Object[]{this, this.clusterCoordinator.getLocalNodeIdentifier(), Boolean.valueOf(this.offloaded)});
            setFlowFilePartitioner(getPartitionerForLoadBalancingStrategy(getLoadBalanceStrategy(), getPartitioningAttribute()));
            logger.debug("Queue {} is no longer offloaded, restored load balance strategy to {} and partitioning attribute to \"{}\"", new Object[]{this, getLoadBalanceStrategy(), getPartitioningAttribute()});
        }
    }

    public synchronized void startLoadBalancing() {
        logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this);
        if (this.stopped) {
            this.stopped = false;
            this.partitionReadLock.lock();
            try {
                this.rebalancingPartition.start(this.partitioner);
                for (QueuePartition queuePartition : this.queuePartitions) {
                    queuePartition.start(this.partitioner);
                }
            } finally {
                this.partitionReadLock.unlock();
            }
        }
    }

    public synchronized void stopLoadBalancing() {
        logger.debug("{} stopped. Will no longer distribute FlowFiles across the cluster", this);
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        this.partitionReadLock.lock();
        try {
            this.rebalancingPartition.stop();
            for (QueuePartition queuePartition : this.queuePartitions) {
                queuePartition.stop();
            }
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    public boolean isActivelyLoadBalancing() {
        QueueSize size = size();
        if (size.getObjectCount() == 0) {
            return false;
        }
        return size.getObjectCount() > this.localPartition.size().getObjectCount();
    }

    private QueuePartition createRemotePartition(final NodeIdentifier nodeIdentifier) {
        final SwappablePriorityQueue swappablePriorityQueue = new SwappablePriorityQueue(this.swapManager, NODE_SWAP_THRESHOLD, this.eventReporter, this, this::drop, nodeIdentifier.getId());
        RemoteQueuePartition remoteQueuePartition = new RemoteQueuePartition(nodeIdentifier, swappablePriorityQueue, new TransferFailureDestination() { // from class: org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue.1
            @Override // org.apache.nifi.controller.queue.clustered.TransferFailureDestination
            public void putAll(Collection<FlowFileRecord> collection, FlowFilePartitioner flowFilePartitioner) {
                if (collection.isEmpty()) {
                    return;
                }
                if (isRebalanceOnFailure(flowFilePartitioner)) {
                    SocketLoadBalancedFlowFileQueue.logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", Integer.valueOf(collection.size()), nodeIdentifier);
                    SocketLoadBalancedFlowFileQueue.this.rebalancingPartition.rebalance(collection);
                } else {
                    SocketLoadBalancedFlowFileQueue.logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", new Object[]{Integer.valueOf(collection.size()), nodeIdentifier, flowFilePartitioner});
                    swappablePriorityQueue.putAll(collection);
                }
            }

            @Override // org.apache.nifi.controller.queue.clustered.TransferFailureDestination
            public void putAll(Function<String, FlowFileQueueContents> function, FlowFilePartitioner flowFilePartitioner) {
                if (!isRebalanceOnFailure(flowFilePartitioner)) {
                    SocketLoadBalancedFlowFileQueue.logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", nodeIdentifier, flowFilePartitioner);
                    return;
                }
                FlowFileQueueContents apply = function.apply(SocketLoadBalancedFlowFileQueue.this.rebalancingPartition.getSwapPartitionName());
                SocketLoadBalancedFlowFileQueue.this.rebalancingPartition.rebalance(apply);
                SocketLoadBalancedFlowFileQueue.logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition", new Object[]{Integer.valueOf(apply.getActiveFlowFiles().size()), Integer.valueOf(apply.getSwapLocations().size()), nodeIdentifier});
            }

            @Override // org.apache.nifi.controller.queue.clustered.TransferFailureDestination
            public boolean isRebalanceOnFailure(FlowFilePartitioner flowFilePartitioner) {
                return flowFilePartitioner.isRebalanceOnFailure() || !flowFilePartitioner.equals(SocketLoadBalancedFlowFileQueue.this.partitioner);
            }
        }, this.flowFileRepo, this.provRepo, this.contentRepo, this.clientRegistry, this);
        if (!this.stopped) {
            remoteQueuePartition.start(this.partitioner);
        }
        return remoteQueuePartition;
    }

    public synchronized List<FlowFilePrioritizer> getPriorities() {
        return new ArrayList(this.prioritizers);
    }

    public synchronized void setPriorities(List<FlowFilePrioritizer> list) {
        this.prioritizers.clear();
        this.prioritizers.addAll(list);
        this.partitionReadLock.lock();
        try {
            for (QueuePartition queuePartition : this.queuePartitions) {
                queuePartition.setPriorities(list);
            }
            this.rebalancingPartition.setPriorities(list);
            this.partitionReadLock.unlock();
        } catch (Throwable th) {
            this.partitionReadLock.unlock();
            throw th;
        }
    }

    public SwapSummary recoverSwappedFlowFiles() {
        Set<String> emptySet;
        this.partitionReadLock.lock();
        try {
            ArrayList<SwapSummary> arrayList = new ArrayList(this.queuePartitions.length);
            try {
                emptySet = this.swapManager.getSwappedPartitionNames(this);
                logger.debug("For {}, partition names to recover are {}", this, emptySet);
            } catch (IOException e) {
                logger.error("Failed to determine the names of the Partitions that have swapped FlowFiles for queue with ID {}.", getIdentifier(), e);
                if (this.eventReporter != null) {
                    this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine the names of Partitions that have swapped FlowFiles for queue with ID " + getIdentifier() + "; see logs for more detials");
                }
                emptySet = Collections.emptySet();
            }
            for (QueuePartition queuePartition : this.queuePartitions) {
                emptySet.remove(queuePartition.getSwapPartitionName());
                arrayList.add(queuePartition.recoverSwappedFlowFiles());
            }
            emptySet.remove(this.rebalancingPartition.getSwapPartitionName());
            arrayList.add(this.rebalancingPartition.recoverSwappedFlowFiles());
            for (String str : emptySet) {
                logger.info("Found Swap Files for FlowFile Queue with Identifier {} and Partition {} that has not been recovered yet. Will recover Swap Files for this Partition even though no partition exists with this name yet", getIdentifier(), str);
                try {
                    for (String str2 : this.swapManager.recoverSwapLocations(this, str)) {
                        SwapSummary swapSummary = this.swapManager.getSwapSummary(str2);
                        arrayList.add(swapSummary);
                        this.rebalancingPartition.rebalance(new FlowFileQueueContents(Collections.emptyList(), Collections.singletonList(this.swapManager.changePartitionName(str2, this.rebalancingPartition.getSwapPartitionName())), swapSummary.getQueueSize()));
                    }
                } catch (IOException e2) {
                    logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {} and Partition {}", new Object[]{getIdentifier(), str, e2});
                    if (this.eventReporter != null) {
                        this.eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + getIdentifier() + "; see logs for more detials");
                    }
                }
            }
            Long l = null;
            QueueSize queueSize = new QueueSize(0, 0L);
            ArrayList arrayList2 = new ArrayList();
            Long l2 = null;
            long j = 0;
            for (SwapSummary swapSummary2 : arrayList) {
                Long maxFlowFileId = swapSummary2.getMaxFlowFileId();
                if (maxFlowFileId != null && (l == null || maxFlowFileId.longValue() > l.longValue())) {
                    l = maxFlowFileId;
                }
                queueSize = queueSize.add(swapSummary2.getQueueSize());
                arrayList2.addAll(swapSummary2.getResourceClaims());
                if (l2 == null) {
                    l2 = swapSummary2.getMinLastQueueDate();
                } else if (swapSummary2.getMinLastQueueDate() != null) {
                    l2 = Long.valueOf(Long.min(l2.longValue(), swapSummary2.getMinLastQueueDate().longValue()));
                }
                j += swapSummary2.getTotalLastQueueDate().longValue();
            }
            adjustSize(queueSize.getObjectCount(), queueSize.getByteCount());
            StandardSwapSummary standardSwapSummary = new StandardSwapSummary(queueSize, l, arrayList2, l2, Long.valueOf(j));
            this.partitionReadLock.unlock();
            return standardSwapSummary;
        } catch (Throwable th) {
            this.partitionReadLock.unlock();
            throw th;
        }
    }

    public void purgeSwapFiles() {
        this.swapManager.purge();
    }

    public QueueSize size() {
        return this.totalSize.get();
    }

    public long getTotalQueuedDuration(long j) {
        long j2 = 0;
        for (QueuePartition queuePartition : this.queuePartitions) {
            j2 += queuePartition.getTotalActiveQueuedDuration(j);
        }
        return j2;
    }

    public long getMinLastQueueDate() {
        long j = 0;
        for (QueuePartition queuePartition : this.queuePartitions) {
            j = j == 0 ? queuePartition.getMinLastQueueDate() : Long.min(j, queuePartition.getMinLastQueueDate());
        }
        return j;
    }

    public boolean isEmpty() {
        return size().getObjectCount() == 0;
    }

    public FlowFileAvailability getFlowFileAvailability() {
        return this.localPartition.getFlowFileAvailability();
    }

    public boolean isActiveQueueEmpty() {
        return this.localPartition.isActiveQueueEmpty();
    }

    public QueueDiagnostics getQueueDiagnostics() {
        this.partitionReadLock.lock();
        try {
            LocalQueuePartitionDiagnostics queueDiagnostics = this.localPartition.getQueueDiagnostics();
            ArrayList arrayList = new ArrayList(this.queuePartitions.length - 1);
            for (QueuePartition queuePartition : this.queuePartitions) {
                if (queuePartition instanceof RemoteQueuePartition) {
                    arrayList.add(((RemoteQueuePartition) queuePartition).getDiagnostics());
                }
            }
            StandardQueueDiagnostics standardQueueDiagnostics = new StandardQueueDiagnostics(queueDiagnostics, arrayList);
            this.partitionReadLock.unlock();
            return standardQueueDiagnostics;
        } catch (Throwable th) {
            this.partitionReadLock.unlock();
            throw th;
        }
    }

    protected LocalQueuePartition getLocalPartition() {
        return this.localPartition;
    }

    protected int getPartitionCount() {
        this.partitionReadLock.lock();
        try {
            return this.queuePartitions.length;
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    protected QueuePartition getPartition(int i) {
        this.partitionReadLock.lock();
        if (i >= 0) {
            try {
                if (i < this.queuePartitions.length) {
                    return this.queuePartitions[i];
                }
            } finally {
                this.partitionReadLock.unlock();
            }
        }
        throw new IndexOutOfBoundsException();
    }

    private void adjustSize(int i, long j) {
        boolean z = false;
        while (!z) {
            QueueSize queueSize = this.totalSize.get();
            z = this.totalSize.compareAndSet(queueSize, queueSize.add(i, j));
        }
    }

    public void onTransfer(Collection<FlowFileRecord> collection) {
        adjustSize(-collection.size(), -collection.stream().mapToLong((v0) -> {
            return v0.getSize();
        }).sum());
    }

    public void onAbort(Collection<FlowFileRecord> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        adjustSize(-collection.size(), -collection.stream().mapToLong((v0) -> {
            return v0.getSize();
        }).sum());
    }

    public boolean isLocalPartitionFull() {
        return isFull(this.localPartition.size());
    }

    private QueuePartition getPartition(FlowFileRecord flowFileRecord) {
        QueuePartition partition = this.partitioner.getPartition(flowFileRecord, this.queuePartitions, this.localPartition);
        logger.debug("{} Assigning {} to Partition: {}", new Object[]{this, flowFileRecord, partition});
        return partition;
    }

    public void setNodeIdentifiers(Set<NodeIdentifier> set, boolean z) {
        this.partitionWriteLock.lock();
        if (!z) {
            try {
                if (this.nodeIdentifiers.equals(set)) {
                    logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the new set of Node Identifiers is the same as the existing set", this);
                    this.partitionWriteLock.unlock();
                    return;
                }
            } finally {
                this.partitionWriteLock.unlock();
            }
        }
        logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", new Object[]{this, Integer.valueOf(this.queuePartitions.length), this.nodeIdentifiers, set});
        for (QueuePartition queuePartition : this.queuePartitions) {
            queuePartition.stop();
        }
        TreeSet treeSet = new TreeSet(loadBalanceEndpointComparator);
        treeSet.addAll(this.nodeIdentifiers);
        treeSet.removeAll(set);
        logger.debug("{} The following Node Identifiers were removed from the cluster: {}", this, treeSet);
        Function function = nodeIdentifier -> {
            return nodeIdentifier.getLoadBalanceAddress() + ":" + nodeIdentifier.getLoadBalancePort();
        };
        HashMap hashMap = new HashMap();
        for (QueuePartition queuePartition2 : this.queuePartitions) {
            queuePartition2.getNodeIdentifier().ifPresent(nodeIdentifier2 -> {
            });
        }
        ArrayList arrayList = new ArrayList(set);
        arrayList.sort(Comparator.comparing(nodeIdentifier3 -> {
            return nodeIdentifier3.getApiAddress() + ":" + nodeIdentifier3.getApiPort();
        }));
        QueuePartition[] queuePartitionArr = arrayList.isEmpty() ? new QueuePartition[]{this.localPartition} : new QueuePartition[arrayList.size()];
        boolean z2 = false;
        for (int i = 0; i < arrayList.size(); i++) {
            NodeIdentifier nodeIdentifier4 = (NodeIdentifier) arrayList.get(i);
            String str = (String) function.apply(nodeIdentifier4);
            if (nodeIdentifier4.equals(this.clusterCoordinator.getLocalNodeIdentifier())) {
                queuePartitionArr[i] = this.localPartition;
                z2 = true;
                QueuePartition queuePartition3 = (QueuePartition) hashMap.get(str);
                if (queuePartition3 != null && queuePartition3 != this.localPartition) {
                    FlowFileQueueContents packageForRebalance = queuePartition3.packageForRebalance(this.localPartition.getSwapPartitionName());
                    logger.debug("Transferred data from {} to {}", queuePartition3, this.localPartition);
                    this.localPartition.inheritQueueContents(packageForRebalance);
                }
            } else {
                QueuePartition queuePartition4 = (QueuePartition) hashMap.get(str);
                queuePartitionArr[i] = queuePartition4 == null ? createRemotePartition(nodeIdentifier4) : queuePartition4;
            }
        }
        if (!z2) {
            QueuePartition[] queuePartitionArr2 = new QueuePartition[queuePartitionArr.length + 1];
            System.arraycopy(queuePartitionArr, 0, queuePartitionArr2, 0, queuePartitionArr.length);
            queuePartitionArr2[queuePartitionArr2.length - 1] = this.localPartition;
            queuePartitionArr = queuePartitionArr2;
        }
        if (this.partitioner.isRebalanceOnClusterResize()) {
            for (QueuePartition queuePartition5 : this.queuePartitions) {
                logger.debug("Rebalancing {}", queuePartition5);
                rebalance(queuePartition5);
            }
        } else {
            Iterator it = treeSet.iterator();
            while (it.hasNext()) {
                QueuePartition queuePartition6 = (QueuePartition) hashMap.get((String) function.apply((NodeIdentifier) it.next()));
                if (queuePartition6 != null) {
                    logger.debug("Rebalancing {}", queuePartition6);
                    rebalance(queuePartition6);
                }
            }
        }
        Iterator it2 = treeSet.iterator();
        while (it2.hasNext()) {
            QueuePartition queuePartition7 = (QueuePartition) hashMap.get((String) function.apply((NodeIdentifier) it2.next()));
            if (queuePartition7 instanceof RemoteQueuePartition) {
                ((RemoteQueuePartition) queuePartition7).onRemoved();
            }
        }
        this.nodeIdentifiers.clear();
        this.nodeIdentifiers.addAll(set);
        this.queuePartitions = queuePartitionArr;
        logger.debug("{} Restarting the {} queue partitions now that node identifiers have been updated", this, Integer.valueOf(this.queuePartitions.length));
        if (!this.stopped) {
            for (QueuePartition queuePartition8 : queuePartitionArr) {
                queuePartition8.start(this.partitioner);
            }
        }
    }

    protected void rebalance(QueuePartition queuePartition) {
        logger.debug("Rebalancing Partition {}", queuePartition);
        this.rebalancingPartition.rebalance(queuePartition.packageForRebalance(this.rebalancingPartition.getSwapPartitionName()));
    }

    public void put(FlowFileRecord flowFileRecord) {
        putAndGetPartition(flowFileRecord);
    }

    protected QueuePartition putAndGetPartition(FlowFileRecord flowFileRecord) {
        this.partitionReadLock.lock();
        try {
            adjustSize(1, flowFileRecord.getSize());
            QueuePartition partition = getPartition(flowFileRecord);
            partition.put(flowFileRecord);
            this.eventListener.triggerDestinationEvent();
            return partition;
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    public void receiveFromPeer(Collection<FlowFileRecord> collection) throws IllegalClusterStateException {
        this.partitionReadLock.lock();
        try {
            if (this.offloaded) {
                throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is in the process of offloading");
            }
            if (!this.clusterCoordinator.isConnected()) {
                throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is not connected to cluster");
            }
            if (this.partitioner.isRebalanceOnClusterResize()) {
                logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", collection);
                putAll(collection);
            } else {
                logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", collection);
                adjustSize(collection.size(), collection.stream().mapToLong((v0) -> {
                    return v0.getSize();
                }).sum());
                this.localPartition.putAll(collection);
            }
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    public void putAll(Collection<FlowFileRecord> collection) {
        putAllAndGetPartitions(collection);
    }

    protected Map<QueuePartition, List<FlowFileRecord>> putAllAndGetPartitions(Collection<FlowFileRecord> collection) {
        this.partitionReadLock.lock();
        try {
            adjustSize(collection.size(), collection.stream().mapToLong((v0) -> {
                return v0.getSize();
            }).sum());
            Map<QueuePartition, List<FlowFileRecord>> distributeToPartitionsAndGet = distributeToPartitionsAndGet(collection);
            this.partitionReadLock.unlock();
            this.eventListener.triggerDestinationEvent();
            return distributeToPartitionsAndGet;
        } catch (Throwable th) {
            this.partitionReadLock.unlock();
            this.eventListener.triggerDestinationEvent();
            throw th;
        }
    }

    public void distributeToPartitions(Collection<FlowFileRecord> collection) {
        distributeToPartitionsAndGet(collection);
    }

    public Map<QueuePartition, List<FlowFileRecord>> distributeToPartitionsAndGet(Collection<FlowFileRecord> collection) {
        if (collection == null || collection.isEmpty()) {
            return Collections.emptyMap();
        }
        this.partitionReadLock.lock();
        try {
            if (this.partitioner.isPartitionStatic()) {
                QueuePartition partition = getPartition(collection.iterator().next());
                partition.putAll(collection);
                Map<QueuePartition, List<FlowFileRecord>> singletonMap = Collections.singletonMap(partition, collection instanceof List ? (List) collection : new ArrayList(collection));
                logger.debug("Partitioner {} is static so Partitioned FlowFiles as: {}", this.partitioner, singletonMap);
                this.partitionReadLock.unlock();
                return singletonMap;
            }
            Map<QueuePartition, List<FlowFileRecord>> map = (Map) collection.stream().collect(Collectors.groupingBy(this::getPartition));
            logger.debug("Partitioned FlowFiles as: {}", map);
            for (Map.Entry<QueuePartition, List<FlowFileRecord>> entry : map.entrySet()) {
                entry.getKey().putAll(entry.getValue());
            }
            return map;
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    protected void setFlowFilePartitioner(FlowFilePartitioner flowFilePartitioner) {
        this.partitionWriteLock.lock();
        try {
            if (this.partitioner.equals(flowFilePartitioner)) {
                return;
            }
            this.partitioner = flowFilePartitioner;
            for (QueuePartition queuePartition : this.queuePartitions) {
                rebalance(queuePartition);
            }
            this.partitionWriteLock.unlock();
        } finally {
            this.partitionWriteLock.unlock();
        }
    }

    public FlowFileRecord poll(Set<FlowFileRecord> set, PollStrategy pollStrategy) {
        FlowFileRecord poll = this.localPartition.poll(set, pollStrategy);
        onAbort(set);
        return poll;
    }

    public List<FlowFileRecord> poll(int i, Set<FlowFileRecord> set, PollStrategy pollStrategy) {
        List<FlowFileRecord> poll = this.localPartition.poll(i, set, pollStrategy);
        onAbort(set);
        return poll;
    }

    public List<FlowFileRecord> poll(FlowFileFilter flowFileFilter, Set<FlowFileRecord> set, PollStrategy pollStrategy) {
        List<FlowFileRecord> poll = this.localPartition.poll(flowFileFilter, set, pollStrategy);
        onAbort(set);
        return poll;
    }

    public void acknowledge(FlowFileRecord flowFileRecord) {
        this.localPartition.acknowledge(flowFileRecord);
        adjustSize(-1, -flowFileRecord.getSize());
        this.eventListener.triggerSourceEvent();
    }

    public void acknowledge(Collection<FlowFileRecord> collection) {
        this.localPartition.acknowledge(collection);
        if (!collection.isEmpty()) {
            adjustSize(-collection.size(), -collection.stream().mapToLong((v0) -> {
                return v0.getSize();
            }).sum());
        }
        this.eventListener.triggerSourceEvent();
    }

    public boolean isUnacknowledgedFlowFile() {
        return this.localPartition.isUnacknowledgedFlowFile();
    }

    public FlowFileRecord getFlowFile(String str) throws IOException {
        return this.localPartition.getFlowFile(str);
    }

    public boolean isPropagateBackpressureAcrossNodes() {
        return !this.offloaded;
    }

    public void handleExpiredRecords(Collection<FlowFileRecord> collection) {
        if (collection == null || collection.isEmpty()) {
            return;
        }
        logger.info("{} {} FlowFiles have expired and will be removed", new Object[]{this, Integer.valueOf(collection.size())});
        ArrayList arrayList = new ArrayList(collection.size());
        ArrayList arrayList2 = new ArrayList(collection.size());
        for (FlowFileRecord flowFileRecord : collection) {
            StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(this, flowFileRecord);
            standardRepositoryRecord.markForDelete();
            arrayList.add(standardRepositoryRecord);
            ProvenanceEventBuilder eventTime = new StandardProvenanceEventRecord.Builder().fromFlowFile(flowFileRecord).setEventType(ProvenanceEventType.EXPIRE).setDetails("Expiration Threshold = " + getFlowFileExpiration()).setComponentType("Load-Balanced Connection").setComponentId(getIdentifier()).setEventTime(System.currentTimeMillis());
            ContentClaim contentClaim = flowFileRecord.getContentClaim();
            if (contentClaim != null) {
                ResourceClaim resourceClaim = contentClaim.getResourceClaim();
                eventTime.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
                eventTime.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), Long.valueOf(contentClaim.getOffset() + flowFileRecord.getContentClaimOffset()), flowFileRecord.getSize());
            }
            arrayList2.add(eventTime.build());
            logger.debug("{} terminated due to FlowFile expiration; life of FlowFile = {} ms", new Object[]{flowFileRecord, Long.valueOf(System.currentTimeMillis() - flowFileRecord.getEntryDate())});
        }
        try {
            this.flowFileRepo.updateRepository(arrayList);
            this.provRepo.registerEvents(arrayList2);
            adjustSize(-collection.size(), -collection.stream().mapToLong((v0) -> {
                return v0.getSize();
            }).sum());
        } catch (IOException e) {
            logger.warn("Encountered {} expired FlowFiles but failed to update FlowFile Repository. This FlowFiles may re-appear in the queue after NiFi is restarted and will be expired again at that point.", Integer.valueOf(arrayList.size()), e);
        }
    }

    @Override // org.apache.nifi.controller.queue.AbstractFlowFileQueue
    protected List<FlowFileRecord> getListableFlowFiles() {
        return this.localPartition.getListableFlowFiles();
    }

    @Override // org.apache.nifi.controller.queue.AbstractFlowFileQueue
    protected void dropFlowFiles(DropFlowFileRequest dropFlowFileRequest, String str) {
        this.partitionReadLock.lock();
        try {
            dropFlowFileRequest.setOriginalSize(size());
            dropFlowFileRequest.setState(DropFlowFileState.DROPPING_FLOWFILES);
            int i = 0;
            long j = 0;
            try {
                QueuePartition[] queuePartitionArr = this.queuePartitions;
                int length = queuePartitionArr.length;
                int i2 = 0;
                while (true) {
                    if (i2 >= length) {
                        break;
                    }
                    QueuePartition queuePartition = queuePartitionArr[i2];
                    DropFlowFileRequest dropFlowFileRequest2 = new DropFlowFileRequest(dropFlowFileRequest.getRequestIdentifier() + "-" + this.localPartition.getNodeIdentifier());
                    queuePartition.dropFlowFiles(dropFlowFileRequest2, str);
                    adjustSize(-dropFlowFileRequest2.getDroppedSize().getObjectCount(), -dropFlowFileRequest2.getDroppedSize().getByteCount());
                    dropFlowFileRequest.setDroppedSize(new QueueSize(dropFlowFileRequest.getDroppedSize().getObjectCount() + dropFlowFileRequest2.getDroppedSize().getObjectCount(), dropFlowFileRequest.getDroppedSize().getByteCount() + dropFlowFileRequest2.getDroppedSize().getByteCount()));
                    i += dropFlowFileRequest2.getDroppedSize().getObjectCount();
                    j += dropFlowFileRequest2.getDroppedSize().getByteCount();
                    dropFlowFileRequest.setDroppedSize(new QueueSize(i, j));
                    dropFlowFileRequest.setCurrentSize(size());
                    if (dropFlowFileRequest2.getState() == DropFlowFileState.CANCELED) {
                        dropFlowFileRequest.cancel();
                        break;
                    } else {
                        if (dropFlowFileRequest2.getState() == DropFlowFileState.FAILURE) {
                            dropFlowFileRequest.setState(DropFlowFileState.FAILURE, dropFlowFileRequest2.getFailureReason());
                            break;
                        }
                        i2++;
                    }
                }
                if (dropFlowFileRequest.getState() == DropFlowFileState.DROPPING_FLOWFILES) {
                    dropFlowFileRequest.setState(DropFlowFileState.COMPLETE);
                }
            } catch (Exception e) {
                logger.error("Failed to drop FlowFiles for {}", this, e);
                dropFlowFileRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.getMessage() + ". See log for more details.");
            }
        } finally {
            this.partitionReadLock.unlock();
        }
    }

    public void lock() {
        this.partitionReadLock.lock();
    }

    public void unlock() {
        this.partitionReadLock.unlock();
    }

    public String toString() {
        return "FlowFileQueue[id=" + getIdentifier() + ", Load Balance Strategy=" + getLoadBalanceStrategy() + ", size=" + size() + "]";
    }
}
