package org.apache.sling.distribution.queue.impl.resource;

import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProcessor;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/slingcms.far:org/apache/sling/org.apache.sling.distribution.core/0.5.0/org.apache.sling.distribution.core-0.5.0.jar:org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.class */
public class ResourceQueueProvider implements DistributionQueueProvider {
    public static final String TYPE = "resource";
    public static final String QUEUES_ROOT = "/var/sling/distribution/queues/";
    private ResourceResolverFactory resolverFactory;
    private String serviceName;
    private String agentRootPath;
    private Scheduler scheduler;
    private String agentName;
    private boolean isActive;
    private ServiceRegistration<Runnable> cleanupTask;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap();

    public ResourceQueueProvider(BundleContext bundleContext, ResourceResolverFactory resourceResolverFactory, String str, String str2, Scheduler scheduler, boolean z) {
        if (str == null || ((scheduler == null && z) || bundleContext == null || resourceResolverFactory == null || str2 == null)) {
            throw new IllegalArgumentException("all arguments are required");
        }
        this.resolverFactory = resourceResolverFactory;
        this.serviceName = str;
        this.agentName = str2;
        this.agentRootPath = QUEUES_ROOT + str2;
        this.scheduler = scheduler;
        this.isActive = z;
        register(bundleContext);
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    @NotNull
    public DistributionQueue getQueue(@NotNull String str) throws DistributionException {
        return this.queueMap.computeIfAbsent(str, str2 -> {
            return this.isActive ? new ActiveResourceQueue(this.resolverFactory, this.serviceName, str2, this.agentRootPath) : new ResourceQueue(this.resolverFactory, this.serviceName, str2, this.agentRootPath);
        });
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    @NotNull
    public DistributionQueue getQueue(@NotNull String str, @NotNull DistributionQueueType distributionQueueType) {
        try {
            return getQueue(str);
        } catch (DistributionException e) {
            throw new RuntimeException("could not create config for queue " + str, e);
        }
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    public void enableQueueProcessing(@NotNull DistributionQueueProcessor distributionQueueProcessor, String... strArr) throws DistributionException {
        if (!this.isActive) {
            throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
        }
        for (String str : strArr) {
            ScheduleOptions name = this.scheduler.NOW(-1, 1L).canRunConcurrently(false).onSingleInstanceOnly(true).name(getJobName(str));
            DistributionQueue queue = getQueue(str);
            Consumer consumer = null;
            if (this.isActive) {
                ActiveResourceQueue activeResourceQueue = (ActiveResourceQueue) queue;
                activeResourceQueue.getClass();
                consumer = activeResourceQueue::recordProcessingAttempt;
            }
            this.scheduler.schedule(new SimpleDistributionQueueProcessor(queue, distributionQueueProcessor, consumer), name);
        }
    }

    @Override // org.apache.sling.distribution.queue.impl.DistributionQueueProvider
    public void disableQueueProcessing() throws DistributionException {
        if (!this.isActive) {
            throw new DistributionException(new UnsupportedOperationException("disable Processing not supported for Passive Queues"));
        }
        for (ResourceQueue resourceQueue : this.queueMap.values()) {
            if (this.scheduler.unschedule(getJobName(resourceQueue.getName()))) {
                this.log.debug("queue processing on {} stopped", resourceQueue);
            } else {
                this.log.warn("could not disable queue processing on {}", resourceQueue);
            }
        }
    }

    private String getJobName(String str) {
        return "resource-queueProcessor-" + this.agentName + "-" + str;
    }

    private void register(BundleContext bundleContext) {
        ResourceQueueCleanupTask resourceQueueCleanupTask = new ResourceQueueCleanupTask(this.resolverFactory, this.serviceName, this.agentRootPath);
        Hashtable hashtable = new Hashtable();
        hashtable.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT, false);
        hashtable.put(Scheduler.PROPERTY_SCHEDULER_PERIOD, 300L);
        this.cleanupTask = bundleContext.registerService(Runnable.class, resourceQueueCleanupTask, hashtable);
    }

    public void close() {
        if (this.cleanupTask != null) {
            this.cleanupTask.unregister();
            this.cleanupTask = null;
        }
    }
}
