package net.oneandone.troilus.example.utils.reactive.stream;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream.class */
public class PublisherBasedReadableStream<T> implements ReadableStream<T>, Subscription {
    private final AtomicBoolean isAutoRequest = new AtomicBoolean(true);
    private final AtomicReference<Subscription> subscriptionRef = new AtomicReference<>(new UnsetSubscription());
    private final AtomicBoolean isOpen = new AtomicBoolean();
    private final AtomicReference<Consumer<? super T>> consumerRef = new AtomicReference<>(new UnsetConsumer());
    private final AtomicReference<Consumer<? super Throwable>> errorConsumerRef = new AtomicReference<>(new DefaultErrorConsumer());

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream$DefaultErrorConsumer.class */
    private class DefaultErrorConsumer implements Consumer<Throwable> {
        private DefaultErrorConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(Throwable th) {
            PublisherBasedReadableStream.this.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream$MappingReadableStream.class */
    public static final class MappingReadableStream<T, V> extends PublisherBasedReadableStream<V> {
        private final ReadableStream<T> underlyingStream;
        private final Function<? super T, ? extends V> fn;

        MappingReadableStream(PublisherBasedReadableStream<T> publisherBasedReadableStream, Function<? super T, ? extends V> function) {
            super(new SingleSubscribePublisherAdapter(publisherBasedReadableStream));
            this.underlyingStream = publisherBasedReadableStream;
            this.fn = function;
        }

        @Override // net.oneandone.troilus.example.utils.reactive.stream.PublisherBasedReadableStream
        protected void onConsumingStarted() {
            this.underlyingStream.consume(obj -> {
                getConsumer().accept(this.fn.apply(obj));
            }, th -> {
                getErrorConsumer().accept(th);
            });
        }

        @Override // net.oneandone.troilus.example.utils.reactive.stream.PublisherBasedReadableStream, net.oneandone.troilus.example.utils.reactive.stream.ReadableStream
        public /* bridge */ /* synthetic */ ReadableStream map(Function function) {
            return super.map(function);
        }
    }

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream$Reader.class */
    private final class Reader implements Subscriber<T> {
        private Reader() {
        }

        @Override // org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            PublisherBasedReadableStream.this.subscriptionRef.set(subscription);
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            PublisherBasedReadableStream.this.close();
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            PublisherBasedReadableStream.this.getErrorConsumer().accept(th);
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(T t) {
            try {
                PublisherBasedReadableStream.this.getConsumer().accept(t);
            } finally {
                if (PublisherBasedReadableStream.this.isAutoRequest.get()) {
                    PublisherBasedReadableStream.this.request(1L);
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream$SingleSubscribePublisherAdapter.class */
    private static final class SingleSubscribePublisherAdapter<T> implements Publisher<T> {
        private final Subscription subscription;

        public SingleSubscribePublisherAdapter(Subscription subscription) {
            this.subscription = subscription;
        }

        @Override // org.reactivestreams.Publisher
        public void subscribe(Subscriber<? super T> subscriber) {
            subscriber.onSubscribe(this.subscription);
        }
    }

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream$UnsetConsumer.class */
    private static class UnsetConsumer<T> implements Consumer<T> {
        private UnsetConsumer() {
        }

        @Override // java.util.function.Consumer
        public void accept(T t) {
            throw new IllegalStateException();
        }
    }

    /* loaded from: input_file:WEB-INF/classes/net/oneandone/troilus/example/utils/reactive/stream/PublisherBasedReadableStream$UnsetSubscription.class */
    private static class UnsetSubscription implements Subscription {
        private UnsetSubscription() {
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            throw new IllegalStateException();
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            throw new IllegalStateException();
        }
    }

    public PublisherBasedReadableStream(Publisher<T> publisher) {
        publisher.subscribe(new Reader());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isOpen.getAndSet(false)) {
            return;
        }
        cancel();
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
        this.subscriptionRef.get().request(j);
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        this.subscriptionRef.get().cancel();
    }

    @Override // net.oneandone.troilus.example.utils.reactive.stream.ReadableStream
    public <V> PublisherBasedReadableStream<V> map(Function<? super T, ? extends V> function) {
        return new MappingReadableStream(this, function);
    }

    @Override // net.oneandone.troilus.example.utils.reactive.stream.ReadableStream
    public void consume(Subscriber<? super T> subscriber) {
        this.isAutoRequest.set(false);
        consume(obj -> {
            subscriber.onNext(obj);
        }, th -> {
            subscriber.onError(th);
        });
        subscriber.onSubscribe(this);
    }

    @Override // net.oneandone.troilus.example.utils.reactive.stream.ReadableStream
    public void consume(Consumer<? super T> consumer) {
        consume(consumer, this.errorConsumerRef.get());
    }

    @Override // net.oneandone.troilus.example.utils.reactive.stream.ReadableStream
    public void consume(Consumer<? super T> consumer, Consumer<? super Throwable> consumer2) {
        this.consumerRef.set(consumer);
        this.errorConsumerRef.set(consumer2);
        if (this.isAutoRequest.get()) {
            request(1L);
        }
        onConsumingStarted();
    }

    protected void onConsumingStarted() {
    }

    protected Consumer<? super T> getConsumer() {
        return this.consumerRef.get();
    }

    protected Consumer<? super Throwable> getErrorConsumer() {
        return this.errorConsumerRef.get();
    }
}
