package com.gentics.mesh.verticle;

import com.gentics.mesh.core.data.job.Job;
import io.reactivex.Completable;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.shareddata.Lock;

/* loaded from: input_file:com/gentics/mesh/verticle/AbstractJobVerticle.class */
public abstract class AbstractJobVerticle extends AbstractVerticle {
    public static final Logger log = LoggerFactory.getLogger(AbstractJobVerticle.class);
    public static final String STATUS_ACCEPTED = "accepted";
    public static final String STATUS_REJECTED = "rejected";
    protected boolean stopped = false;
    protected MessageConsumer<Object> jobConsumer;

    public void start() throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Starting verticle {" + getClass().getName() + "}");
        }
        this.stopped = false;
        registerJobHandler();
        super.start();
    }

    private void registerJobHandler() {
        this.jobConsumer = this.vertx.eventBus().consumer(getJobAdress(), message -> {
            invokeJobAction(message);
        });
    }

    public abstract String getJobAdress();

    public abstract String getLockName();

    public void invokeJobAction(Message<Object> message) {
        log.debug("Got job processing request. Getting lock to execute the request.");
        executeLocked(this.stopped ? Completable.error(new Throwable("Processing was stopped.")) : executeJob(message), message);
    }

    public abstract Completable executeJob(Message<Object> message);

    public void stop() throws Exception {
        this.stopped = true;
        if (this.jobConsumer != null) {
            this.jobConsumer.unregister();
        }
    }

    protected void executeLocked(Completable completable, Message<Object> message) {
        String lockName = getLockName();
        try {
            this.vertx.sharedData().getLockWithTimeout(lockName, 1000L, asyncResult -> {
                if (!asyncResult.failed()) {
                    Lock lock = (Lock) asyncResult.result();
                    if (message != null) {
                        message.reply(new JsonObject().put(Job.STATUS_PROPERTY_KEY, STATUS_ACCEPTED));
                    }
                    completable.doOnDispose(() -> {
                        log.debug("Releasing lock {" + lockName + "}");
                        lock.release();
                    }).doFinally(() -> {
                        log.debug("Releasing lock {" + lockName + "}");
                        lock.release();
                    }).subscribe(() -> {
                        log.debug("Action completed");
                    }, th -> {
                        log.error("Error while executing locked action", th);
                    });
                    return;
                }
                log.error("Error while acquiring global lock {" + lockName + "}", asyncResult.cause());
                if (message != null) {
                    message.reply(new JsonObject().put(Job.STATUS_PROPERTY_KEY, STATUS_REJECTED));
                }
            });
        } catch (Exception e) {
            log.error("Error while waiting for global lock {" + lockName + "}", e);
        }
    }
}
