package net.oneandone.troilus.example.utils.reactive.sse;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import javax.servlet.ServletOutputStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SEEEventSubscriber.class */
class SEEEventSubscriber implements Subscriber<SSEEvent> {
    private static final Logger LOG = Logger.getLogger(SEEEventSubscriber.class.getName());
    private final AtomicBoolean isOpen = new AtomicBoolean(true);
    private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>(new IllegalStateSubscription());
    private final SSEWriteableChannel channel;

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SEEEventSubscriber$IllegalStateSubscription.class */
    private static final class IllegalStateSubscription implements Subscription {
        private IllegalStateSubscription() {
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            throw new IllegalStateException();
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            throw new IllegalStateException();
        }
    }

    public SEEEventSubscriber(ServletOutputStream servletOutputStream, ScheduledExecutorService scheduledExecutorService) {
        this.channel = new SSEWriteableChannel(servletOutputStream, th -> {
            onError(th);
        }, scheduledExecutorService);
    }

    @Override // org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        this.subscriptionRef.set(subscription);
        requestNext();
    }

    private void requestNext() {
        this.subscriptionRef.get().request(1L);
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(SSEEvent sSEEvent) {
        this.channel.writeEventAsync(sSEEvent).thenAccept(num -> {
            this.channel.whenWritePossibleAsync().thenAccept(sSEWriteableChannel -> {
                requestNext();
            });
        });
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        LOG.fine("error on source stream. stop streaming " + th.getMessage());
        close();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        close();
    }

    private void close() {
        if (this.isOpen.getAndSet(false)) {
            this.subscriptionRef.get().cancel();
            this.channel.close();
        }
    }
}
