package com.gentics.mesh.search.verticle;

import com.gentics.mesh.core.data.search.request.BulkRequest;
import com.gentics.mesh.core.data.search.request.Bulkable;
import com.gentics.mesh.core.data.search.request.SearchRequest;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.vertx.core.Vertx;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

/* loaded from: input_file:com/gentics/mesh/search/verticle/BulkOperator.class */
public class BulkOperator implements ObservableOperator<SearchRequest, SearchRequest> {
    private static final Logger log = LoggerFactory.getLogger(BulkOperator.class);
    private final Vertx vertx;
    private final long bulkTime;
    private final int requestLimit;
    private FlushSubscriber<SearchRequest> subscriber;

    /* loaded from: input_file:com/gentics/mesh/search/verticle/BulkOperator$FlushSubscriber.class */
    interface FlushSubscriber<T> extends Observer<T> {
        void flush();

        boolean bulking();
    }

    public BulkOperator(Vertx vertx, Duration duration, int i) {
        this.vertx = vertx;
        this.bulkTime = duration.toMillis();
        this.requestLimit = i;
    }

    public Observer<? super SearchRequest> apply(final Observer<? super SearchRequest> observer) {
        if (this.subscriber != null) {
            log.warn("More than one observer for the same operator detected. Flush will only work for the newest observer.");
        }
        this.subscriber = new FlushSubscriber<SearchRequest>() { // from class: com.gentics.mesh.search.verticle.BulkOperator.1
            private Long timer;
            Disposable sub;
            Queue<Bulkable> bulkableRequests = new ConcurrentLinkedQueue();

            public void onSubscribe(Disposable disposable) {
                this.sub = disposable;
                observer.onSubscribe(disposable);
            }

            public void onNext(SearchRequest searchRequest) {
                if (this.sub.isDisposed()) {
                    cleanup();
                    return;
                }
                if (!(searchRequest instanceof Bulkable)) {
                    BulkOperator.log.trace("Flushing {} requests because non-bulkable request of class {{}} has been received.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), searchRequest.getClass().getSimpleName()});
                    flush();
                    observer.onNext(searchRequest);
                    return;
                }
                if (this.bulkableRequests.isEmpty()) {
                    resetTimer();
                }
                this.bulkableRequests.add((Bulkable) searchRequest);
                if (this.bulkableRequests.size() >= BulkOperator.this.requestLimit) {
                    BulkOperator.log.trace("Flushing {} requests because size limit of {} has been reached.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), Integer.valueOf(BulkOperator.this.requestLimit)});
                    flush();
                }
            }

            private void resetTimer() {
                if (BulkOperator.this.bulkTime > 0) {
                    cancelTimer();
                    this.timer = Long.valueOf(BulkOperator.this.vertx.setTimer(BulkOperator.this.bulkTime, l -> {
                        BulkOperator.log.trace("Flushing {} requests because time limit of {}ms has been reached.", new Object[]{Integer.valueOf(this.bulkableRequests.size()), Long.valueOf(BulkOperator.this.bulkTime)});
                        flush();
                    }));
                }
            }

            private void cancelTimer() {
                if (this.timer != null) {
                    BulkOperator.this.vertx.cancelTimer(this.timer.longValue());
                    this.timer = null;
                }
            }

            @Override // com.gentics.mesh.search.verticle.BulkOperator.FlushSubscriber
            public void flush() {
                cancelTimer();
                if (this.bulkableRequests.isEmpty() || this.sub.isDisposed()) {
                    return;
                }
                BulkRequest bulkRequest = new BulkRequest(new ArrayList(this.bulkableRequests));
                this.bulkableRequests.clear();
                observer.onNext(bulkRequest);
            }

            @Override // com.gentics.mesh.search.verticle.BulkOperator.FlushSubscriber
            public boolean bulking() {
                return this.timer != null;
            }

            public void onError(Throwable th) {
                if (this.sub.isDisposed()) {
                    cleanup();
                } else {
                    observer.onError(th);
                }
            }

            public void onComplete() {
                if (this.sub.isDisposed()) {
                    cleanup();
                } else {
                    flush();
                    observer.onComplete();
                }
            }

            private void cleanup() {
                cancelTimer();
            }
        };
        return this.subscriber;
    }

    public void flush() {
        log.info("Manually flushing bulked requests");
        if (this.subscriber != null) {
            this.subscriber.flush();
        }
    }

    public boolean bulking() {
        if (this.subscriber != null) {
            return this.subscriber.bulking();
        }
        return false;
    }
}
