package net.oneandone.troilus.example.utils.reactive.sse;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletInputStream;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SEEEventPublisher.class */
class SEEEventPublisher implements Publisher<SSEEvent> {
    private final AtomicReference<Optional<Subscriber<? super SSEEvent>>> subscriberRef = new AtomicReference<>(Optional.empty());
    private final ServletInputStream in;

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SEEEventPublisher$SEEEventReaderSubscription.class */
    private static final class SEEEventReaderSubscription implements Subscription {
        private final SSEReadableChannel channel;
        private final Subscriber<? super SSEEvent> subscriber;

        public SEEEventReaderSubscription(ServletInputStream servletInputStream, Subscriber<? super SSEEvent> subscriber) {
            this.subscriber = subscriber;
            this.channel = new SSEReadableChannel(servletInputStream, th -> {
                subscriber.onError(th);
            }, r3 -> {
                subscriber.onComplete();
            });
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            this.channel.close();
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            this.channel.readEventAsync().thenAccept(sSEEvent -> {
                this.subscriber.onNext(sSEEvent);
            });
        }
    }

    public SEEEventPublisher(ServletInputStream servletInputStream) {
        this.in = servletInputStream;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super SSEEvent> subscriber) {
        synchronized (this.subscriberRef) {
            if (this.subscriberRef.get().isPresent()) {
                throw new IllegalStateException("subscriber is already registered (publisher does not support multi-subscriptions)");
            }
            this.subscriberRef.set(Optional.of(subscriber));
            subscriber.onSubscribe(new SEEEventReaderSubscription(this.in, subscriber));
        }
    }
}
