/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.control;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.support.NonBlocking;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.rx.Stream;
import reactor.rx.action.Action;
import reactor.rx.broadcast.Broadcaster;

public class ThrottleRequestWhenAction<T>
extends Action<T, T> {
    private final Broadcaster<Long> throttleStream;

    public ThrottleRequestWhenAction(Dispatcher dispatcher, Function<? super Stream<? extends Long>, ? extends Publisher<? extends Long>> predicate) {
        this.throttleStream = Broadcaster.create(null, dispatcher);
        Publisher afterRequestStream = (Publisher)predicate.apply(this.throttleStream);
        afterRequestStream.subscribe((Subscriber)new ThrottleSubscriber());
    }

    @Override
    public void requestMore(long elements) {
        this.throttleStream.onNext(elements);
    }

    @Override
    protected void doNext(T ev) {
        this.broadcastNext(ev);
    }

    @Override
    public void onComplete() {
        try {
            this.throttleStream.onComplete();
            this.doShutdown();
        }
        catch (Exception e) {
            this.doError(e);
        }
    }

    @Override
    public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
        return true;
    }

    protected void doRequest(long requested) {
        this.throttleStream.getDispatcher().dispatch((Object)requested, (Consumer)new Consumer<Long>(){

            public void accept(Long o) {
                if (ThrottleRequestWhenAction.this.upstreamSubscription != null) {
                    ThrottleRequestWhenAction.this.upstreamSubscription.request(o);
                }
            }
        }, null);
    }

    private class ThrottleSubscriber
    implements Subscriber<Long>,
    NonBlocking {
        Subscription s;

        private ThrottleSubscriber() {
        }

        public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
            return false;
        }

        public long getCapacity() {
            return ThrottleRequestWhenAction.this.capacity;
        }

        public void onSubscribe(Subscription s) {
            this.s = s;
            s.request(1L);
        }

        public void onNext(Long o) {
            if (o > 0L) {
                ThrottleRequestWhenAction.this.doRequest(o);
            }
            this.s.request(1L);
        }

        public void onError(Throwable t) {
            this.s.cancel();
            ThrottleRequestWhenAction.this.doError(t);
        }

        public void onComplete() {
            this.s.cancel();
            ThrottleRequestWhenAction.this.doComplete();
        }
    }
}

