package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.impl.HAManager;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.KeyCertOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.PemTrustOptions;
import io.vertx.core.net.PfxOptions;
import io.vertx.core.net.TCPSSLOptions;
import io.vertx.core.net.TrustOptions;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.cluster.AsyncMultiMap;
import io.vertx.core.spi.cluster.ChoosableIterable;
import io.vertx.core.spi.cluster.ClusterManager;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/* loaded from: input_file:io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.class */
public class ClusteredEventBus extends EventBusImpl {
    public static final String CLUSTER_PUBLIC_HOST_PROP_NAME = "vertx.cluster.public.host";
    public static final String CLUSTER_PUBLIC_PORT_PROP_NAME = "vertx.cluster.public.port";
    private static final String SERVER_ID_HA_KEY = "server_id";
    private static final String SUBS_MAP_NAME = "__vertx.subs";
    private final ClusterManager clusterManager;
    private final HAManager haManager;
    private final ConcurrentMap<ServerID, ConnectionHolder> connections;
    private final Context sendNoContext;
    private EventBusOptions options;
    private AsyncMultiMap<String, ClusterNodeInfo> subs;
    private Set<String> ownSubs;
    private ServerID serverID;
    private ClusterNodeInfo nodeInfo;
    private NetServer server;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ClusteredEventBus.class);
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});

    public ClusteredEventBus(VertxInternal vertxInternal, VertxOptions vertxOptions, ClusterManager clusterManager, HAManager hAManager) {
        super(vertxInternal);
        this.connections = new ConcurrentHashMap();
        this.ownSubs = new ConcurrentHashSet();
        this.options = vertxOptions.getEventBusOptions();
        this.clusterManager = clusterManager;
        this.haManager = hAManager;
        this.sendNoContext = vertxInternal.getOrCreateContext();
        setClusterViewChangedHandler(hAManager);
    }

    private NetServerOptions getServerOptions() {
        NetServerOptions netServerOptions = new NetServerOptions(this.options.toJson());
        setCertOptions(netServerOptions, this.options.getKeyCertOptions());
        setTrustOptions(netServerOptions, this.options.getTrustOptions());
        return netServerOptions;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setCertOptions(TCPSSLOptions tCPSSLOptions, KeyCertOptions keyCertOptions) {
        if (keyCertOptions == null) {
            return;
        }
        if (keyCertOptions instanceof JksOptions) {
            tCPSSLOptions.setKeyStoreOptions((JksOptions) keyCertOptions);
        } else if (keyCertOptions instanceof PfxOptions) {
            tCPSSLOptions.setPfxKeyCertOptions((PfxOptions) keyCertOptions);
        } else {
            tCPSSLOptions.setPemKeyCertOptions((PemKeyCertOptions) keyCertOptions);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setTrustOptions(TCPSSLOptions tCPSSLOptions, TrustOptions trustOptions) {
        if (trustOptions == null) {
            return;
        }
        if (trustOptions instanceof JksOptions) {
            tCPSSLOptions.setTrustStoreOptions((JksOptions) trustOptions);
        } else if (trustOptions instanceof PfxOptions) {
            tCPSSLOptions.setPfxTrustOptions((PfxOptions) trustOptions);
        } else {
            tCPSSLOptions.setPemTrustOptions((PemTrustOptions) trustOptions);
        }
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl, io.vertx.core.eventbus.EventBus
    public void start(Handler<AsyncResult<Void>> handler) {
        this.clusterManager.getAsyncMultiMap(SUBS_MAP_NAME, asyncResult -> {
            if (asyncResult.succeeded()) {
                this.subs = (AsyncMultiMap) asyncResult.result();
                this.server = this.vertx.createNetServer(getServerOptions());
                this.server.connectHandler(getServerHandler());
                this.server.listen(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        if (handler != null) {
                            handler.handle(Future.failedFuture(asyncResult.cause()));
                            return;
                        } else {
                            log.error(asyncResult.cause());
                            return;
                        }
                    }
                    this.serverID = new ServerID(getClusterPublicPort(this.options, this.server.actualPort()), getClusterPublicHost(this.options));
                    this.nodeInfo = new ClusterNodeInfo(this.clusterManager.getNodeID(), this.serverID);
                    this.haManager.addDataToAHAInfo(SERVER_ID_HA_KEY, new JsonObject().put("host", this.serverID.host).put("port", Integer.valueOf(this.serverID.port)));
                    if (handler != null) {
                        this.started = true;
                        handler.handle(Future.succeededFuture());
                    }
                });
                return;
            }
            if (handler != null) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                log.error(asyncResult.cause());
            }
        });
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl, io.vertx.core.eventbus.EventBus
    public void close(Handler<AsyncResult<Void>> handler) {
        super.close(asyncResult -> {
            if (this.server != null) {
                this.server.close(asyncResult -> {
                    if (asyncResult.failed()) {
                        log.error("Failed to close server", asyncResult.cause());
                    }
                    Iterator<ConnectionHolder> it = this.connections.values().iterator();
                    while (it.hasNext()) {
                        it.next().close();
                    }
                    if (handler != null) {
                        handler.handle(asyncResult);
                    }
                });
            } else if (handler != null) {
                handler.handle(asyncResult);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public MessageImpl createMessage(boolean z, String str, MultiMap multiMap, Object obj, String str2) {
        Objects.requireNonNull(str, "no null address accepted");
        return new ClusteredMessage(this.serverID, str, null, multiMap, obj, this.codecManager.lookupCodec(obj, str2), z, this);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void addRegistration(boolean z, String str, boolean z2, boolean z3, Handler<AsyncResult<Void>> handler) {
        if (!z || this.subs == null || z2 || z3) {
            handler.handle(Future.succeededFuture());
        } else {
            this.subs.add(str, this.nodeInfo, handler);
            this.ownSubs.add(str);
        }
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void removeRegistration(HandlerHolder handlerHolder, String str, Handler<AsyncResult<Void>> handler) {
        if (handlerHolder == null || this.subs == null || handlerHolder.isLocalOnly()) {
            callCompletionHandlerAsync(handler);
        } else {
            this.ownSubs.remove(str);
            removeSub(str, this.nodeInfo, handler);
        }
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void sendReply(EventBusImpl.SendContextImpl<T> sendContextImpl, MessageImpl messageImpl) {
        clusteredSendReply(((ClusteredMessage) messageImpl).getSender(), sendContextImpl);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void sendOrPub(EventBusImpl.SendContextImpl<T> sendContextImpl) {
        String address = sendContextImpl.message.address();
        Handler<AsyncResult<ChoosableIterable<ClusterNodeInfo>>> handler = asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("Failed to send message", asyncResult.cause());
                return;
            }
            ChoosableIterable<ClusterNodeInfo> choosableIterable = (ChoosableIterable) asyncResult.result();
            if (choosableIterable != null && !choosableIterable.isEmpty()) {
                sendToSubs(choosableIterable, sendContextImpl);
            } else {
                this.metrics.messageSent(address, !sendContextImpl.message.isSend(), true, false);
                deliverMessageLocally(sendContextImpl);
            }
        };
        if (Vertx.currentContext() == null) {
            this.sendNoContext.runOnContext(r7 -> {
                this.subs.get(address, handler);
            });
        } else {
            this.subs.get(address, handler);
        }
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected String generateReplyAddress() {
        return UUID.randomUUID().toString();
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected boolean isMessageLocal(MessageImpl messageImpl) {
        return !((ClusteredMessage) messageImpl).isFromWire();
    }

    private void setClusterViewChangedHandler(HAManager hAManager) {
        hAManager.setClusterViewChangedHandler(set -> {
            this.ownSubs.forEach(str -> {
                this.subs.add(str, this.nodeInfo, asyncResult -> {
                    if (asyncResult.failed()) {
                        log.warn("Failed to update subs map with self", asyncResult.cause());
                    }
                });
            });
            this.subs.removeAllMatching((Serializable) clusterNodeInfo -> {
                return !set.contains(clusterNodeInfo.nodeId);
            }, asyncResult -> {
                if (asyncResult.failed()) {
                    log.warn("Error removing subs", asyncResult.cause());
                }
            });
        });
    }

    private int getClusterPublicPort(EventBusOptions eventBusOptions, int i) {
        int intValue = Integer.getInteger(CLUSTER_PUBLIC_PORT_PROP_NAME, eventBusOptions.getClusterPublicPort()).intValue();
        if (intValue == -1) {
            intValue = i;
        }
        return intValue;
    }

    private String getClusterPublicHost(EventBusOptions eventBusOptions) {
        String property = System.getProperty(CLUSTER_PUBLIC_HOST_PROP_NAME, eventBusOptions.getClusterPublicHost());
        if (property == null) {
            property = eventBusOptions.getHost();
        }
        return property;
    }

    private Handler<NetSocket> getServerHandler() {
        return netSocket -> {
            final RecordParser newFixed = RecordParser.newFixed(4, null);
            newFixed.setOutput(new Handler<Buffer>() { // from class: io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.1
                int size = -1;

                @Override // io.vertx.core.Handler
                public void handle(Buffer buffer) {
                    if (this.size == -1) {
                        this.size = buffer.getInt(0);
                        newFixed.fixedSizeMode(this.size);
                        return;
                    }
                    ClusteredMessage clusteredMessage = new ClusteredMessage();
                    clusteredMessage.readFromWire(buffer, ClusteredEventBus.this.codecManager);
                    ClusteredEventBus.this.metrics.messageRead(clusteredMessage.address(), buffer.length());
                    newFixed.fixedSizeMode(4);
                    this.size = -1;
                    if (clusteredMessage.codec() == CodecManager.PING_MESSAGE_CODEC) {
                        netSocket.write(ClusteredEventBus.PONG);
                    } else {
                        ClusteredEventBus.this.deliverMessageLocally(clusteredMessage);
                    }
                }
            });
            netSocket.handler2((Handler<Buffer>) newFixed);
        };
    }

    private <T> void sendToSubs(ChoosableIterable<ClusterNodeInfo> choosableIterable, EventBusImpl.SendContextImpl<T> sendContextImpl) {
        String address = sendContextImpl.message.address();
        if (sendContextImpl.message.isSend()) {
            ClusterNodeInfo choose = choosableIterable.choose();
            ServerID serverID = choose == null ? null : choose.serverID;
            if (serverID == null || serverID.equals(this.serverID)) {
                this.metrics.messageSent(address, false, true, false);
                deliverMessageLocally(sendContextImpl);
                return;
            } else {
                this.metrics.messageSent(address, false, false, true);
                sendRemote(serverID, sendContextImpl.message);
                return;
            }
        }
        boolean z = false;
        boolean z2 = false;
        for (ClusterNodeInfo clusterNodeInfo : choosableIterable) {
            if (clusterNodeInfo.serverID.equals(this.serverID)) {
                z = true;
            } else {
                z2 = true;
                sendRemote(clusterNodeInfo.serverID, sendContextImpl.message);
            }
        }
        this.metrics.messageSent(address, true, z, z2);
        if (z) {
            deliverMessageLocally(sendContextImpl);
        }
    }

    private <T> void clusteredSendReply(ServerID serverID, EventBusImpl.SendContextImpl<T> sendContextImpl) {
        MessageImpl messageImpl = sendContextImpl.message;
        String address = messageImpl.address();
        if (serverID.equals(this.serverID)) {
            this.metrics.messageSent(address, false, true, false);
            deliverMessageLocally(sendContextImpl);
        } else {
            this.metrics.messageSent(address, false, false, true);
            sendRemote(serverID, messageImpl);
        }
    }

    private void sendRemote(ServerID serverID, MessageImpl messageImpl) {
        ConnectionHolder connectionHolder = this.connections.get(serverID);
        if (connectionHolder == null) {
            connectionHolder = new ConnectionHolder(this, serverID, this.options);
            ConnectionHolder putIfAbsent = this.connections.putIfAbsent(serverID, connectionHolder);
            if (putIfAbsent != null) {
                connectionHolder = putIfAbsent;
            } else {
                connectionHolder.connect();
            }
        }
        connectionHolder.writeMessage((ClusteredMessage) messageImpl);
    }

    private void removeSub(String str, ClusterNodeInfo clusterNodeInfo, Handler<AsyncResult<Void>> handler) {
        this.subs.remove(str, clusterNodeInfo, asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("Failed to remove sub", asyncResult.cause());
                return;
            }
            if (((Boolean) asyncResult.result()).booleanValue()) {
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                }
            } else if (handler != null) {
                handler.handle(Future.failedFuture("sub not found"));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConcurrentMap<ServerID, ConnectionHolder> connections() {
        return this.connections;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VertxInternal vertx() {
        return this.vertx;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusOptions options() {
        return this.options;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1610436718:
                if (implMethodName.equals("lambda$null$51267305$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("java/util/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("io/vertx/core/eventbus/impl/clustered/ClusteredEventBus") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;Lio/vertx/core/eventbus/impl/clustered/ClusterNodeInfo;)Z")) {
                    Set set = (Set) serializedLambda.getCapturedArg(0);
                    return clusterNodeInfo -> {
                        return !set.contains(clusterNodeInfo.nodeId);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
