package net.oneandone.troilus.example.utils.reactive.sse;

import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Queues;
import java.io.FilterInputStream;
import java.io.IOException;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.servlet.ReadListener;
import javax.servlet.ServletInputStream;

/* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SSEReadableChannel.class */
public class SSEReadableChannel {
    private final SSEInputStream serverSentEventsStream;
    private final Consumer<Throwable> errorConsumer;
    private final Consumer<Void> completionConsumer;
    private final Queue<CompletableFuture<SSEEvent>> pendingReads = Queues.newLinkedBlockingQueue();
    private long numProcessedImmediate = 0;
    private long numProcessedPendings = 0;

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SSEReadableChannel$NonBlockingInputStream.class */
    private static final class NonBlockingInputStream extends FilterInputStream {
        private final ServletInputStream is;

        public NonBlockingInputStream(ServletInputStream servletInputStream) {
            super(servletInputStream);
            this.is = servletInputStream;
        }

        @Override // java.io.FilterInputStream, java.io.InputStream
        public int read(byte[] bArr) throws IOException {
            if (isNetworkdataAvailable()) {
                return super.read(bArr);
            }
            return 0;
        }

        @Override // java.io.FilterInputStream, java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (isNetworkdataAvailable()) {
                return super.read(bArr, i, i2);
            }
            return 0;
        }

        private boolean isNetworkdataAvailable() {
            try {
                return this.is.isReady();
            } catch (IllegalStateException e) {
                return false;
            }
        }
    }

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/sse/SSEReadableChannel$ServletReadListener.class */
    private final class ServletReadListener implements ReadListener {
        private ServletReadListener() {
        }

        @Override // javax.servlet.ReadListener
        public void onAllDataRead() throws IOException {
            SSEReadableChannel.this.completionConsumer.accept(null);
        }

        @Override // javax.servlet.ReadListener
        public void onError(Throwable th) {
            SSEReadableChannel.this.onError(th);
        }

        @Override // javax.servlet.ReadListener
        public void onDataAvailable() throws IOException {
            SSEReadableChannel.this.proccessPendingReads();
        }
    }

    public SSEReadableChannel(ServletInputStream servletInputStream, Consumer<Throwable> consumer, Consumer<Void> consumer2) {
        this.errorConsumer = consumer;
        this.completionConsumer = consumer2;
        this.serverSentEventsStream = new SSEInputStream(new NonBlockingInputStream(servletInputStream));
        servletInputStream.setReadListener(new ServletReadListener());
    }

    public CompletableFuture<SSEEvent> readEventAsync() {
        CompletableFuture<SSEEvent> completableFuture = new CompletableFuture<>();
        synchronized (this.pendingReads) {
            try {
                Optional<SSEEvent> next = this.serverSentEventsStream.next();
                if (next.isPresent()) {
                    this.numProcessedImmediate++;
                    completableFuture.complete(next.get());
                } else {
                    this.pendingReads.add(completableFuture);
                }
            } catch (IOException | RuntimeException e) {
                onError(e);
            }
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void proccessPendingReads() {
        synchronized (this.pendingReads) {
            while (!this.pendingReads.isEmpty()) {
                try {
                    Optional<SSEEvent> next = this.serverSentEventsStream.next();
                    if (!next.isPresent()) {
                        return;
                    }
                    this.numProcessedPendings++;
                    this.pendingReads.poll().complete(next.get());
                } catch (IOException | RuntimeException e) {
                    onError(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onError(Throwable th) {
        this.errorConsumer.accept(th);
        close();
    }

    public void close() {
        this.serverSentEventsStream.close();
        synchronized (this.pendingReads) {
            this.pendingReads.clear();
        }
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("completionConsumer", this.completionConsumer).add("errorConsumer", this.errorConsumer).add("pendingReads", Joiner.on(", ").join(this.pendingReads)).add("numProcessedTotal", this.numProcessedImmediate + this.numProcessedPendings).add("numProcessedImmediate", this.numProcessedImmediate).add("numProcessedPendings", this.numProcessedPendings).toString();
    }
}
