package org.osgi.util.pushstream;

import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.osgi.util.promise.PromiseFactory;
import org.osgi.util.pushstream.AbstractPushStreamImpl;

/* loaded from: input_file:lib/slingcms.far:org/osgi/org.osgi.util.pushstream/1.1.0/org.osgi.util.pushstream-1.1.0.jar:org/osgi/util/pushstream/PushStreamProvider.class */
public final class PushStreamProvider {
    private final Lock lock = new ReentrantLock(true);
    private int schedulerReferences;
    private ScheduledExecutorService sharedScheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/slingcms.far:org/osgi/org.osgi.util.pushstream/1.1.0/org.osgi.util.pushstream-1.1.0.jar:org/osgi/util/pushstream/PushStreamProvider$MultiplexingConsumer.class */
    public static class MultiplexingConsumer<T> implements PushEventConsumer<T> {
        private final AtomicReference<PushEvent<T>> terminalEventStore;
        private final CopyOnWriteArrayList<PushEventConsumer<? super T>> consumers;

        public MultiplexingConsumer(AtomicReference<PushEvent<T>> atomicReference, CopyOnWriteArrayList<PushEventConsumer<? super T>> copyOnWriteArrayList) {
            this.terminalEventStore = atomicReference;
            this.consumers = copyOnWriteArrayList;
        }

        @Override // org.osgi.util.pushstream.PushEventConsumer
        public long accept(PushEvent<? extends T> pushEvent) throws Exception {
            if (pushEvent.isTerminal()) {
                if (!this.terminalEventStore.compareAndSet(null, pushEvent.nodata())) {
                    return -1L;
                }
                Iterator<PushEventConsumer<? super T>> it = this.consumers.iterator();
                while (it.hasNext()) {
                    PushEventConsumer<? super T> next = it.next();
                    if (this.consumers.remove(next)) {
                        try {
                            next.accept(pushEvent);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                return -1L;
            }
            long j = 0;
            Iterator<PushEventConsumer<? super T>> it2 = this.consumers.iterator();
            while (it2.hasNext()) {
                PushEventConsumer<? super T> next2 = it2.next();
                try {
                    long accept = next2.accept(pushEvent);
                    if (accept < 0 && this.consumers.remove(next2)) {
                        try {
                            next2.accept(PushEvent.close());
                        } catch (Exception e2) {
                            e2.printStackTrace();
                        }
                    } else if (accept > j) {
                        j = accept;
                    }
                } catch (Exception e3) {
                    if (this.consumers.remove(next2)) {
                        try {
                            next2.accept(PushEvent.error(e3));
                        } catch (Exception e4) {
                            e4.printStackTrace();
                        }
                    }
                }
            }
            return j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/slingcms.far:org/osgi/org.osgi.util.pushstream/1.1.0/org.osgi.util.pushstream-1.1.0.jar:org/osgi/util/pushstream/PushStreamProvider$PushEventPipe.class */
    public static final class PushEventPipe<T> implements PushEventConsumer<T>, PushEventSource<T> {
        volatile PushEventConsumer<? super T> delegate;

        PushEventPipe() {
        }

        @Override // org.osgi.util.pushstream.PushEventSource
        public AutoCloseable open(PushEventConsumer<? super T> pushEventConsumer) throws Exception {
            return () -> {
            };
        }

        @Override // org.osgi.util.pushstream.PushEventConsumer
        public long accept(PushEvent<? extends T> pushEvent) throws Exception {
            return this.delegate.accept(pushEvent);
        }
    }

    private ScheduledExecutorService acquireScheduler() {
        try {
            this.lock.lockInterruptibly();
            try {
                this.schedulerReferences++;
                if (this.schedulerReferences == 1) {
                    this.sharedScheduler = Executors.newSingleThreadScheduledExecutor();
                }
                return this.sharedScheduler;
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            throw new IllegalStateException("Unable to acquire the Scheduler", e);
        }
    }

    private void releaseScheduler() {
        try {
            this.lock.lockInterruptibly();
            try {
                this.schedulerReferences--;
                if (this.schedulerReferences == 0) {
                    this.sharedScheduler.shutdown();
                    this.sharedScheduler = null;
                }
                this.lock.unlock();
            } catch (Throwable th) {
                this.lock.unlock();
                throw th;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public <T> PushStream<T> createStream(PushEventSource<T> pushEventSource) {
        return createStream(pushEventSource, 1, null, null, new ArrayBlockingQueue(32), QueuePolicyOption.FAIL.getPolicy(), PushbackPolicyOption.LINEAR.getPolicy(1000L));
    }

    public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushStreamBuilder<T, U> buildStream(PushEventSource<T> pushEventSource) {
        return new PushStreamBuilderImpl(this, null, null, pushEventSource);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, U extends BlockingQueue<PushEvent<? extends T>>> PushStream<T> createStream(PushEventSource<T> pushEventSource, int i, Executor executor, ScheduledExecutorService scheduledExecutorService, U u, QueuePolicy<T, U> queuePolicy, PushbackPolicy<T, U> pushbackPolicy) {
        Executor executor2;
        boolean z;
        ScheduledExecutorService scheduledExecutorService2;
        boolean z2;
        if (pushEventSource == null) {
            throw new NullPointerException("There is no source of events");
        }
        if (i < 0) {
            throw new IllegalArgumentException("The supplied parallelism cannot be less than zero. It was " + i);
        }
        if (i == 0) {
            i = 1;
        }
        if (executor == null) {
            executor2 = Executors.newFixedThreadPool(i);
            z = true;
        } else {
            executor2 = (Executor) Objects.requireNonNull(executor);
            z = false;
        }
        if (scheduledExecutorService == null) {
            scheduledExecutorService2 = acquireScheduler();
            z2 = true;
        } else {
            scheduledExecutorService2 = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
            z2 = false;
        }
        if (u == null) {
            u = new ArrayBlockingQueue(32);
        }
        if (queuePolicy == null) {
            queuePolicy = QueuePolicyOption.FAIL.getPolicy();
        }
        if (pushbackPolicy == null) {
            pushbackPolicy = PushbackPolicyOption.LINEAR.getPolicy(1000L);
        }
        return cleanupThreads(z, executor2, z2, new BufferedPushStreamImpl(this, new PromiseFactory(executor2, scheduledExecutorService2), u, i, queuePolicy, pushbackPolicy, pushEventConsumer -> {
            try {
                return pushEventSource.open(pushEventConsumer);
            } catch (Exception e) {
                throw new RuntimeException("Unable to connect to event source", e);
            }
        }));
    }

    private <T> PushStream<T> cleanupThreads(boolean z, Executor executor, boolean z2, PushStream<T> pushStream) {
        if (z || z2) {
            pushStream = pushStream.onClose(() -> {
                if (z) {
                    ((ExecutorService) executor).shutdown();
                }
                if (z2) {
                    releaseScheduler();
                }
            }).map(obj -> {
                return obj;
            });
        }
        return pushStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> PushStream<T> createUnbufferedStream(PushEventSource<T> pushEventSource, Executor executor, ScheduledExecutorService scheduledExecutorService) {
        Executor executor2;
        boolean z;
        ScheduledExecutorService scheduledExecutorService2;
        boolean z2;
        if (executor == null) {
            executor2 = Executors.newFixedThreadPool(2);
            z = true;
        } else {
            executor2 = (Executor) Objects.requireNonNull(executor);
            z = false;
        }
        if (scheduledExecutorService == null) {
            scheduledExecutorService2 = acquireScheduler();
            z2 = true;
        } else {
            scheduledExecutorService2 = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
            z2 = false;
        }
        return cleanupThreads(z, executor2, z2, new UnbufferedPushStreamImpl(this, new PromiseFactory(executor2, scheduledExecutorService2), pushEventConsumer -> {
            try {
                return pushEventSource.open(pushEventConsumer);
            } catch (Exception e) {
                throw new RuntimeException("Unable to connect to event source", e);
            }
        }));
    }

    public <T> PushEventSource<T> createEventSourceFromStream(PushStream<T> pushStream) {
        return (PushEventSource) buildEventSourceFromStream(pushStream).build();
    }

    public <T, U extends BlockingQueue<PushEvent<? extends T>>> BufferBuilder<PushEventSource<T>, T, U> buildEventSourceFromStream(PushStream<T> pushStream) {
        final PushStreamBuilder<T, U> buildBuffer = pushStream.buildBuffer();
        return (BufferBuilder<PushEventSource<T>, T, U>) new BufferBuilder<PushEventSource<T>, T, U>() { // from class: org.osgi.util.pushstream.PushStreamProvider.1
            /* JADX WARN: Incorrect types in method signature: (TU;)Lorg/osgi/util/pushstream/BufferBuilder<Lorg/osgi/util/pushstream/PushEventSource<TT;>;TT;TU;>; */
            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder withBuffer(BlockingQueue blockingQueue) {
                buildBuffer.withBuffer(blockingQueue);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withQueuePolicy(QueuePolicy<T, U> queuePolicy) {
                buildBuffer.withQueuePolicy(queuePolicy);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withQueuePolicy(QueuePolicyOption queuePolicyOption) {
                buildBuffer.withQueuePolicy(queuePolicyOption);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withPushbackPolicy(PushbackPolicy<T, U> pushbackPolicy) {
                buildBuffer.withPushbackPolicy(pushbackPolicy);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withPushbackPolicy(PushbackPolicyOption pushbackPolicyOption, long j) {
                buildBuffer.withPushbackPolicy(pushbackPolicyOption, j);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withParallelism(int i) {
                buildBuffer.withParallelism(i);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withExecutor(Executor executor) {
                buildBuffer.withExecutor(executor);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public BufferBuilder<PushEventSource<T>, T, U> withScheduler(ScheduledExecutorService scheduledExecutorService) {
                buildBuffer.withScheduler(scheduledExecutorService);
                return this;
            }

            @Override // org.osgi.util.pushstream.BufferBuilder
            public PushEventSource<T> build() {
                AtomicBoolean atomicBoolean = new AtomicBoolean();
                AtomicReference atomicReference = new AtomicReference();
                CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
                BufferBuilder bufferBuilder = buildBuffer;
                return pushEventConsumer -> {
                    copyOnWriteArrayList.add(pushEventConsumer);
                    PushEvent<? extends T> pushEvent = (PushEvent) atomicReference.get();
                    if (pushEvent != null) {
                        if (copyOnWriteArrayList.remove(pushEventConsumer)) {
                            pushEventConsumer.accept(pushEvent);
                        }
                        return () -> {
                        };
                    }
                    if (!atomicBoolean.getAndSet(true)) {
                        ((PushStream) bufferBuilder.build()).forEachEvent(new MultiplexingConsumer(atomicReference, copyOnWriteArrayList));
                    }
                    return () -> {
                        if (copyOnWriteArrayList.remove(pushEventConsumer)) {
                            try {
                                pushEventConsumer.accept(PushEvent.close());
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    };
                };
            }
        };
    }

    public <T> SimplePushEventSource<T> createSimpleEventSource(Class<T> cls) {
        return createSimplePushEventSource(1, null, new ArrayBlockingQueue(32), QueuePolicyOption.FAIL.getPolicy(), () -> {
        });
    }

    public <T, U extends BlockingQueue<PushEvent<? extends T>>> BufferBuilder<SimplePushEventSource<T>, T, U> buildSimpleEventSource(Class<T> cls) {
        return new AbstractBufferBuilder<SimplePushEventSource<T>, T, U>() { // from class: org.osgi.util.pushstream.PushStreamProvider.2
            @Override // org.osgi.util.pushstream.BufferBuilder
            public SimplePushEventSource<T> build() {
                return PushStreamProvider.this.createSimplePushEventSource(this.concurrency, this.worker, this.buffer, this.bufferingPolicy, () -> {
                });
            }
        };
    }

    <T, U extends BlockingQueue<PushEvent<? extends T>>> SimplePushEventSource<T> createSimplePushEventSource(int i, Executor executor, U u, QueuePolicy<T, U> queuePolicy, Runnable runnable) {
        Executor executor2;
        boolean z;
        if (i < 0) {
            throw new IllegalArgumentException("The supplied parallelism cannot be less than zero. It was " + i);
        }
        if (i == 0) {
            i = 1;
        }
        if (executor == null) {
            executor2 = Executors.newFixedThreadPool(i);
            z = true;
        } else {
            executor2 = (Executor) Objects.requireNonNull(executor);
            z = false;
        }
        if (u == null) {
            u = new ArrayBlockingQueue(32);
        }
        if (queuePolicy == null) {
            queuePolicy = QueuePolicyOption.FAIL.getPolicy();
        }
        boolean z2 = z;
        Executor executor3 = executor2;
        return new SimplePushEventSourceImpl(new PromiseFactory(executor2, acquireScheduler()), queuePolicy, u, i, () -> {
            try {
                runnable.run();
            } catch (Exception e) {
            }
            if (z2) {
                ((ExecutorService) executor3).shutdown();
            }
            releaseScheduler();
        });
    }

    public <T> PushEventConsumer<T> createBufferedConsumer(PushEventConsumer<T> pushEventConsumer) {
        return (PushEventConsumer) buildBufferedConsumer(pushEventConsumer).build();
    }

    public <T, U extends BlockingQueue<PushEvent<? extends T>>> BufferBuilder<PushEventConsumer<T>, T, U> buildBufferedConsumer(final PushEventConsumer<T> pushEventConsumer) {
        return new AbstractBufferBuilder<PushEventConsumer<T>, T, U>() { // from class: org.osgi.util.pushstream.PushStreamProvider.3
            @Override // org.osgi.util.pushstream.BufferBuilder
            public PushEventConsumer<T> build() {
                PushEventPipe pushEventPipe = new PushEventPipe();
                PushStreamProvider.this.createStream(pushEventPipe, this.concurrency, this.worker, this.timer, this.buffer, this.bufferingPolicy, this.backPressure).forEachEvent(pushEventConsumer);
                return pushEventPipe;
            }
        };
    }

    public <T> PushStream<T> streamOf(Stream<T> stream) {
        return createUnbufferedStream(pushEventConsumer -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            stream.mapToLong(obj -> {
                try {
                    long accept = atomicBoolean.get() ? -1L : pushEventConsumer.accept(PushEvent.data(obj));
                    if (accept < 0) {
                        pushEventConsumer.accept(PushEvent.close());
                    }
                    return accept;
                } catch (Exception e) {
                    try {
                        pushEventConsumer.accept(PushEvent.error(e));
                        return -1L;
                    } catch (Exception e2) {
                        return -1L;
                    }
                }
            }).filter(j -> {
                return j < 0;
            }).findFirst().orElseGet(() -> {
                try {
                    return pushEventConsumer.accept(PushEvent.close());
                } catch (Exception e) {
                    return -1L;
                }
            });
            return () -> {
                atomicBoolean.set(true);
            };
        }, null, null);
    }

    public <T> PushStream<T> streamOf(Executor executor, ScheduledExecutorService scheduledExecutorService, final Stream<T> stream) {
        Executor executor2;
        boolean z;
        ScheduledExecutorService scheduledExecutorService2;
        boolean z2;
        if (executor == null) {
            executor2 = Executors.newFixedThreadPool(2);
            z = true;
        } else {
            executor2 = (Executor) Objects.requireNonNull(executor);
            z = false;
        }
        if (scheduledExecutorService == null) {
            scheduledExecutorService2 = acquireScheduler();
            z2 = true;
        } else {
            scheduledExecutorService2 = (ScheduledExecutorService) Objects.requireNonNull(scheduledExecutorService);
            z2 = false;
        }
        return cleanupThreads(z, executor2, z2, new UnbufferedPushStreamImpl<T, BlockingQueue<PushEvent<? extends T>>>(this, new PromiseFactory(executor2, scheduledExecutorService2), pushEventConsumer -> {
            return () -> {
            };
        }) { // from class: org.osgi.util.pushstream.PushStreamProvider.4
            @Override // org.osgi.util.pushstream.UnbufferedPushStreamImpl, org.osgi.util.pushstream.AbstractPushStreamImpl
            protected boolean begin() {
                if (!super.begin()) {
                    return false;
                }
                Iterator<T> it = stream.iterator();
                this.promiseFactory.executor().execute(() -> {
                    pushData(it);
                });
                return true;
            }

            private void pushData(Iterator<T> it) {
                long handleEvent;
                while (it.hasNext()) {
                    try {
                        handleEvent = this.closed.get() == AbstractPushStreamImpl.State.CLOSED ? -1L : handleEvent(PushEvent.data(it.next()));
                    } catch (Exception e) {
                        close(PushEvent.error(e));
                    }
                    if (handleEvent != 0) {
                        if (handleEvent < 0) {
                            close();
                            return;
                        } else {
                            this.promiseFactory.scheduledExecutor().schedule(() -> {
                                this.promiseFactory.executor().execute(() -> {
                                    pushData(it);
                                });
                            }, handleEvent, TimeUnit.MILLISECONDS);
                            return;
                        }
                    }
                }
                close();
            }
        });
    }
}
