nacos服务注册

fxz大约 42 分钟

nacos服务注册

基于nacos2.x版本分析

总结

nacos2.0以后,临时实例的注册方式采用grpc。

客户端注册时会使用redo机制进行注册、注销操作。

服务端一个client只能注册一个服务实例。保存了服务和命名空间、服务和实例、客户端和服务相关的映射关系。

注册过程会触发nacos集群数据同步、订阅客户端推送等动作。

nacos架构概述

数据模型

服务和服务实例

在服务发现领域中,服务指的是由应用程序提供的一个或一组软件功能的一种抽象概念。它和应用有所不同,应用的范围更广,和服务属于包含关系,即一个应用可能会提供多个服务。为了能够更细粒度地区分和控制服务,Nacos 选择服务作为注册中心的最基本概念。

而服务实例(以下简称实例)是某个服务的具体提供能力的节点,一个实例仅从属于一个服务,而一个服务可以包含一个或多个实例。在许多场景下,实例又被称为服务提供者(Provider),而使用该服务的实例被称为服务消费者(Consumer)。

定义服务

在 Nacos 中,服务的定义包括以下几个内容:

  • 命名空间(Namespace):Nacos 数据模型中最顶层、也是包含范围最广的概念,用于在类似环境或租户等需要强制隔离的场景中定义。Nacos 的服务也需要使用命名空间来进行隔离。
  • 分组(Group):Nacos 数据模型中次于命名空间的一种隔离概念,区别于命名空间的强制隔离属性,分组属于一个弱隔离概念,主要用于逻辑区分一些服务使用场景或不同应用的同名服务,最常用的情况主要是同一个服务的测试分组和生产分组、或者将应用名作为分组以防止不同应用提供的服务重名。
  • 服务名(Name):该服务实际的名字,一般用于描述该服务提供了某种功能或能力。
image.png
image.png

之所以 Nacos 将服务的定义拆分为命名空间分组服务名,除了方便隔离使用场景外,还有方便用户发现唯一服务的优点。在注册中心的实际使用场景上,同个公司的不同开发者可能会开发出类似作用的服务,如果仅仅使用服务名来做服务的定义和表示,容易在一些通用服务上出现冲突,比如登陆服务等。

通常推荐使用由运行环境作为命名空间、应用名作为分组和服务功能作为服务名的组合来确保该服务的天然唯一性,当然使用者可以忽略命名空间和分组,仅使用服务名作为服务唯一标示,这就需要使用者在定义服务名时额外增加自己的规则来确保在使用中能够唯一定位到该服务而不会发现到错误的服务上。

服务元数据

服务的定义只是为服务设置了一些基本的信息,用于描述服务以及方便快速的找到服务,而服务的元数据是进一步定义了 Nacos 中服务的细节属性和描述信息。主要包含:

  • 健康保护阈值(ProtectThreshold):为了防止因过多实例故障,导致所有流量全部流入剩余实例,继而造成流量压力将剩余实例被压垮形成的雪崩效应。应将健康保护阈值定义为一个 0 到 1 之间的浮点数。当域名健康实例数占总服务实例数的比例小于该值时,无论实例是否健康,都会将这个实例返回给客户端。这样做虽然损失了一部分流量,但是保证了集群中剩余健康实例能正常工作。
  • 实例选择器(Selector):用于在获取服务下的实例列表时,过滤和筛选实例。该选择器也被称为路由器,目前Nacos支持通过将实例的部分信息存储在外部元数据管理 CMDB 中,并在发现服务时使用 CMDB 中存储的元数据标签来进行筛选的能力。
  • 拓展数据(extendData):用于用户在注册实例时自定义扩展的元数据内容,形式为 K-V 。可以在服务中拓展服务的元数据信息,方便用户实现自己的自定义逻辑。
image.png
image.png

定义实例

由于服务实例是具体提供服务的节点,因此 Nacos 在设计实例的定义时,主要需要存储该实例的一些网络相关的基础信息,主要包含以下内容:

  • 网络IP地址:该实例的 IP 地址,在 Nacos2.0 版本后支持设置为域名。
  • 网络端口:该实例的端口信息。
  • 健康状态(Healthy):用于表示该实例是否为健康状态,会在 Nacos 中通过健康检查的手段进行维护,具体内容将在Nacos 健康检查机制章节中详细说明。
  • 集群(Cluster):用于标示该实例归属于哪个逻辑集群。
  • 拓展数据(extendData):用于用户自定义扩展的元数据内容,形式为K-V。可以在实例中拓展该实例的元数据信息,方便用户实现自己的自定义逻辑和标示该实例。

实例元数据

和服务元数据不同,实例的元数据主要作用于实例运维相关的数据信息。主要包含:

  • 权重(Weight):实例级别的配置。权重为浮点数,范围为 0-10000。权重越大,分配给该实例的流量越大。
  • 上线状态(Enabled):标记该实例是否接受流量,优先级大于权重和健康状态。用于运维人员在不变动实例本身的情况下,快速地手动将某个实例从服务中移除。
  • 拓展数据(extendData):不同于实例定义中的拓展数据,这个拓展数据是给予运维人员在不变动实例本身的情况下,快速地修改和新增实例的扩展数据,从而达到运维实例的作用。

在 Nacos2.0 版本中,实例数据被拆分为实例定义和实例元数据,主要是因为这两类数据其实是同一个实例的两种不同场景:开发运行场景及运维场景。对于上下线及权重这种属性,一般认为在实例已经在运行时,需要运维人员手动修改和维护的数据,而 IP,端口和集群等信息,一般情况下在实例启动并注册后,则不会在进行变更。将这两部分数据合并后,就能够得到实例的完整信息,也是 Nacos1.0 版本中的实例数据结构。

同时在 Nacos2.0 版本中,定义实例的这部分数据,会受到持久化属性的的影响,而实例元数据部分,则一定会进行持久化;这是因为运维操作需要保证操作的原子性,不能够因为外部环境的影响而导致操作被重置,例如在Nacos1.0 版本中,运维人员因为实例所处的网络存在问题,操作一个实例下线以此摘除流量,但是同样因为网络问题,该实例与 Nacos 的通信也受到影响,导致实例注销后重新注册,这可能导致上线状态被重新注册而覆盖,失去了运维人员操作的优先级。

当然,这部分元数据也不应该无限制的存储下去,如果实例确实已经移除,元数据也应该移除,为此,在 Nacos 2.0 版本后,通过该接口更新的元数据会在对应实例删除后,依旧存在一段时间,如果在此期间实例重新注册,该元数据依旧生效;您可以通过 nacos.naming.clean.expired-metadata.expired-time 及nacos.naming.clean.expired-metadata.interval 对记忆时间进行修改。

image.png
image.png

持久化属性

Nacos 提供两种类型的服务:持久化服务和非持久化服务分别给类DNS 的基础的服务组件场景和上层实际业务服务场景使用。为了标示该服务是哪种类型的服务,需要在创建服务时选择服务的持久化属性。考虑到目前大多数使用动态服务发现的场景为非持久化服务的类型(如Spring Cloud,Dubbo,Service Mesh等),Nacos 将缺省值设置为了非持久化服务。

在 Nacos2.0 版本后,持久化属性的定义被抽象到服务中,一个服务只能被定义成持久化服务或非持久化服务,一旦定义完成,在服务生命周期结束之前,无法更改其持久化属性。

持久化属性将会影响服务及实例的数据是否会被 Nacos 进行持久化存储,设置为持久化之后,实例将不会再被自动移除,需要使用者手动移除实例。

集群是 Nacos 中一组服务实例的一个逻辑抽象的概念,它介于服务和实例之间,是一部分服务属性的下沉和实例属性的抽象。

定义集群

在 Nacos 中,集群中主要保存了有关健康检查的一些信息和数据:

  • 健康检查类型(HealthCheckType):使用哪种类型的健康检查方式,目前支持:TCP,HTTP,MySQL;设置为NONE可以关闭健康检查。
  • 健康检查端口(HealthCheckPort):设置用于健康检查的端口。
  • 是否使用实例端口进行健康检查(UseInstancePort):如果使用实例端口进行健康检查,将会使用实例定义中的网络端口进行健康检查,而不再使用上述设置的健康检查端口进行。
  • 拓展数据(extendData):用于用户自定义扩展的元数据内容,形式为 K-V 。可以自定义扩展该集群的元数据信息,方便用户实现自己的自定义逻辑和标示该集群。
image.png
image.png

生命周期

在注册中心中,实例数据都和服务实例的状态绑定,因此服务实例的状态直接决定了注册中心中实例数据的生命周期。而服务作为实例的聚合抽象,生命周期也会由服务实例的状态来决定。

服务的生命周期

服务的生命周期相对比较简单,是从用户向注册中心发起服务注册的请求开始。在Nacos中,发起服务注册有两种方式,一种是直接创建服务,一种是注册实例时自动创建服务;前者可以让发起者在创建时期就制定一部分服务的元数据信息,而后者只会使用默认的元数据创建服务。

在生命周期期间,用户可以向服务中新增,删除服务实例,同时也能够对服务的元数据进行修改。

当用户主动发起删除服务的请求或一定时间内服务下没有实例(无论健康与否)后,服务才结束其生命周期,等待下一次的创建。

实例的生命周期

实例的生命周期开始于注册实例的请求。但是根据不同的持久化属性,实例后续的生命周期有一定的不同。

持久化的实例,会通过健康检查的状态维护健康状态,但是不会自动的终止该实例的生命周期;在生命周期结束之前,持久化实例均可以被修改数据,甚至主动修改其健康状态。唯一终止持久化实例生命周期的方式就是注销实例的请求。

而非持久化的实例,会根据版本的不同,采用不同的方式维持健康状态:如果是 Nacos1.0 的版本,会通过定时的心跳请求来进行续约,当超过一定时间内没有心跳进行续约时,该非持久化实例则终止生命周期;如果是Nacos2.0 的版本,会通过 gRPC 的长连接来维持状态,当连接发生中断时,该非持久化实例则终止生命周期。当然,非持久化实例也可以通过注销实例的请求,主动终止其生命周期,但是由于长连接和心跳续约的存在,可能导致前一个实例数据的生命周期刚被终止移除,立刻又因为心跳和长连接的补偿请求,再次开启实例的生命周期,给人一种注销失败的假象。

集群的生命周期

集群的生命周期则相对复杂,由于集群作为服务和实例的一个中间层,因此集群的生命周期与实例和服务的生命周期均有关。

集群的生命周期开始与该集群第一个实例的生命周期同时开始,因为一个实例必定归属于一个集群,哪怕是默认的集群,因此当第一个实例的生命周期开始时,也就是集群生命周期的开始;

当一个集群下不存在实例时,集群的生命周期也不会立刻结束,而是会等到这个服务的生命周期结束时,才会一起结束生命周期。

元数据的生命周期

由于元数据的其对应的数据模型是紧密关联的,所以元数据的生命周期基本和对应的数据模型保持一致。但是也如前文所说,元数据通常为运维人员的主动操作的数据,会被 Nacos 进行一段时间内的记忆,因此元数据的生命周期的终止相比对应的数据要滞后;若这滞后期间内,对应的数据又重新开始生命周期,则该元数据的生命周期将被立刻重置,不再终止。

image.png
image.png

客户端服务注册

nacos client提供了NacosNamingService进行服务注册,由spring cloud服务注册和发现open in new window 也可以发现,客户端进行服务注册使用的都是NacosNamingService。

NacosNamingService

public class NacosNamingService implements NamingService {
    
   	.................
    

  	// 一般使用这个构造 参数是注册中心等相关信息
    public NacosNamingService(Properties properties) throws NacosException {
        init(properties);
    }
    
  	// 初始化一些必要的参数
    private void init(Properties properties) throws NacosException {
      	// 先异步加载一些耗时组件
        PreInitUtils.asyncPreLoadCostComponent();

        final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
        ValidatorUtils.checkInitParam(nacosClientProperties);
        this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);
        InitUtils.initSerialization();
        InitUtils.initWebRootContext(nacosClientProperties);
        initLogName(nacosClientProperties);
        
        this.notifierEventScope = UUID.randomUUID().toString();
        this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);
				
      	// 会注册一个事件类型和事件发布者的映射
        NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);
      	// 会注册一个事件订阅者
        NotifyCenter.registerSubscriber(changeNotifier);

      	// 本地服务信息的缓存
        this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);
      	// clientProxy
        this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties,
                changeNotifier);
    }
    
 
    // 注册一个实例
    public void registerInstance(......) throws NacosException {
			......
    }
    
    // 批量注册服务实例
    public void batchRegisterInstance(.......)
            throws NacosException {
      .......
    }
    
  	// 批量注销服务实例
    public void batchDeregisterInstance(......)
            throws NacosException {
     	......
    }
    
    // 注销服务实例
    public void deregisterInstance(......) throws NacosException {
      ......
    }
    
    // 获取全部符合条件的服务实例
    public List<Instance> getAllInstances(......) throws NacosException {
     	.......
    }
    
   	// 查询健康实例
    public List<Instance> selectInstances(......) throws NacosException {
        ......
    }

   	// 选择一个健康的实例
    public Instance selectOneHealthyInstance(......) throws NacosException {
        ......
    }
    
    // 订阅
    public void subscribe(......) throws NacosException {
        ......
    }
     
    // 取消订阅
    public void unsubscribe(......) throws NacosException {
        ......
    }
    
    
    //获取所有(或指定)服务名称
    public ListView<String> getServicesOfServer(......) throws NacosException {
      ......
    }
    
    // 获取所有订阅的服务 
    public List<ServiceInfo> getSubscribeServices() {
        ......
    }
    
}

可以看到NacosNamingService中重载了很多方法,其中注册都会走到下面的逻辑:

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    // 检查一些参数是否合法
    NamingUtils.checkInstanceIsLegal(instance);
    checkAndStripGroupNamePrefix(instance, groupName);
    
    // 注册服务
    clientProxy.registerService(serviceName, groupName, instance);
}

clientProxy是在init方法中初始化的,类型为NamingClientProxyDelegate。

NamingClientProxy

NamingClientProxy的作用就是向服务端进行请求。NamingClientProxyDelegate会根据实例的类型选择clientProxy执行注册。对于实例来讲,默认都是临时实例,nacos2.0以后临时实例使用grpc的通信方式。

NamingClientProxyDelegate:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 根据实例的类型选择clientProxy执行注册
  	getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
}

 private NamingClientProxy getExecuteClientProxy(Instance instance) {
   			// nacos2.0以后临时实例使用grpc的通信方式
        if (instance.isEphemeral() || grpcClientProxy.isAbilitySupportedByServer(AbilityKey.SERVER_SUPPORT_PERSISTENT_INSTANCE_BY_GRPC)) {
            return grpcClientProxy;
        }
   
        return httpClientProxy;
}

我们看一下grpc的实现:

NamingGrpcClientProxy:

public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,
            instance);
    if (instance.isEphemeral()) {
      	// 注册临时实例
        registerServiceForEphemeral(serviceName, groupName, instance);
    } else {
      	// 注册持久化实例
        doRegisterServiceForPersistent(serviceName, groupName, instance);
    }
}

     // 注册临时实例
    private void registerServiceForEphemeral(String serviceName, String groupName, Instance instance) throws NacosException {
        // 缓存redo信息
        redoService.cacheInstanceForRedo(serviceName, groupName, instance);
        
        // 执行注册
        doRegisterService(serviceName, groupName, instance);
    }

	 // 执行注册请求
   public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
        // 执行注册请求
        InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
                NamingRemoteConstants.REGISTER_INSTANCE, instance);
        requestToServer(request, Response.class);
        
        // 记录实例已经注册到redo信息中
        redoService.instanceRegistered(serviceName, groupName);
    }

	// 注册持久化实例
  public void doRegisterServiceForPersistent(String serviceName, String groupName, Instance instance) throws NacosException {
        PersistentInstanceRequest request = new PersistentInstanceRequest(namespaceId, serviceName, groupName,
                NamingRemoteConstants.REGISTER_INSTANCE, instance);
        requestToServer(request, Response.class);
    }

可以看到根据实例是否是临时实例走了两个方法,其实这两个方法最大的区别就是就是持久化实例没有使用redo机制。

redo机制

NamingGrpcRedoService封装了重做逻辑,但是它首先是一个监听器,他实现了nacos的ConnectionEventListener,会分别在链接连接和取消连接时回调相关方法来设置链接的状态。其次,NamingGrpcRedoService中构造了一个线程池,会定时执行redo相关逻辑,也就是链接存在,没有注册成功会重新注册,或者取消注册的逻辑。

public NamingGrpcRedoService(NamingGrpcClientProxy clientProxy, NacosClientProperties properties) {
    setProperties(properties);
    this.redoExecutor = new ScheduledThreadPoolExecutor(redoThreadCount, new NameThreadFactory(REDO_THREAD_NAME));
  	// 执行RedoScheduledTask的逻辑进行重做
    this.redoExecutor.scheduleWithFixedDelay(new RedoScheduledTask(clientProxy, this), redoDelayTime, redoDelayTime,
            TimeUnit.MILLISECONDS);
}

RedoScheduledTask中封装了redo的逻辑,会根据NamingGrpcRedoService获取需要制定重做的服务,然后执行相关逻辑。

RedoScheduledTaskpublic void run() {
  	// 链接断开了 不执行下面逻辑
    if (!redoService.isConnected()) {
        LogUtils.NAMING_LOGGER.warn("Grpc Connection is disconnect, skip current redo task");
        return;
    }
    try {
        redoForInstances();
        redoForSubscribes();
    } catch (Exception e) {
        LogUtils.NAMING_LOGGER.warn("Redo task run with unexpected exception: ", e);
    }
}

		// 实例重做
    private void redoForInstances() {
        // 遍历需要重试的实例
        for (InstanceRedoData each : redoService.findInstanceRedoData()) {
            try {
                // 重试实例
                redoForInstance(each);
            } catch (NacosException e) {
                LogUtils.NAMING_LOGGER.error("Redo instance operation {} for {}@@{} failed. ", each.getRedoType(),
                        each.getGroupName(), each.getServiceName(), e);
            }
        }
    }

private void redoForInstance(InstanceRedoData redoData) throws NacosException {
        // 获取重试类型
        RedoData.RedoType redoType = redoData.getRedoType();
        // 获取服务名
        String serviceName = redoData.getServiceName();
        // 获取分组名
        String groupName = redoData.getGroupName();
        LogUtils.NAMING_LOGGER.info("Redo instance operation {} for {}@@{}", redoType, groupName, serviceName);

        switch (redoType) {
            case REGISTER:
                if (isClientDisabled()) {
                    return;
                }
                // 执行注册
                processRegisterRedoType(redoData, serviceName, groupName);
                break;
            case UNREGISTER:
                if (isClientDisabled()) {
                    return;
                }
                // 执行注销
                clientProxy.doDeregisterService(serviceName, groupName, redoData.get());
                break;
            case REMOVE:
                // 移除重试实例
                redoService.removeInstanceForRedo(serviceName, groupName);
                break;
            default:
        }
        
    }

还有个问题,NamingGrpcRedoService是事件监听器,那么是哪里回调的呢?

在上面NamingGrpcClientProxy创建的时候,会在构造中调用到rpcClient的start方法,其中执行链接相关的回调逻辑:

public final void start() throws NacosException {
    // cas设置客户端状态为: 初始化 -> 启动中
    boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
    if (!success) {
        return;
    }

    // 客户端事件执行器,周期调度线程池
    clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
        Thread t = new Thread(r);
        t.setName("com.alibaba.nacos.client.remote.worker");
        t.setDaemon(true);
        return t;
    });
    
    // 链接、断开链接事件处理
    clientEventExecutor.submit(() -> {
        while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
            ConnectionEvent take;
            try {
                take = eventLinkedBlockingQueue.take();
                if (take.isConnected()) {
                  	// 通知链接事件
                    notifyConnected(take.connection);
                } else if (take.isDisConnected()) {
                  	// 通知链接关闭事件
                    notifyDisConnected(take.connection);
                }
            } catch (Throwable e) {
                // Do nothing
            }
        }
    });
  
  ........
}

		// 回调所有的监听器
    protected void notifyConnected(Connection connection) {
        if (connectionEventListeners.isEmpty()) {
            return;
        }
        LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Notify connected event to listeners.", rpcClientConfig.name());
        for (ConnectionEventListener connectionEventListener : connectionEventListeners) {
            try {
                connectionEventListener.onConnected(connection);
            } catch (Throwable throwable) {
                LoggerUtils.printIfErrorEnabled(LOGGER, "[{}] Notify connect listener error, listener = {}",
                        rpcClientConfig.name(), connectionEventListener.getClass().getName());
            }
        }
    }
    

实例注册

public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
    // 执行注册请求
    InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,
            NamingRemoteConstants.REGISTER_INSTANCE, instance);
    requestToServer(request, Response.class);

    // 记录实例已经注册到redo信息中 逻辑很简单 就是标识之前缓存的信息为已注册 这些task就不执行redo
    redoService.instanceRegistered(serviceName, groupName);
}

下面会向服务端发送注册请求进行注册。关于grpc封装的详细细节在后面的文章分析,本文仅分析服务注册相关内容。

private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)
        throws NacosException {
    Response response = null;
    try {
        request.putAllHeader(
                getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
        response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
        if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
            throw new NacosException(response.getErrorCode(), response.getMessage());
        }
        if (responseClass.isAssignableFrom(response.getClass())) {
            return (T) response;
        }
        NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",
                response.getClass().getName(), responseClass.getName());
        throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
    } catch (NacosException e) {
        recordRequestFailedMetrics(request, e, response);
        throw e;
    } catch (Exception e) {
        recordRequestFailedMetrics(request, e, response);
        throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
    }
}

服务端服务注册

上述我们知道,nacos2.0以后对于临时实例的注册,采用的是grpc。那么服务端注册的入口在哪里?

相关代码在InstanceRequestHandler:

@Component
public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
    
    private final EphemeralClientOperationServiceImpl clientOperationService;
    
    public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
        this.clientOperationService = clientOperationService;
    }
    
    @Override
    @TpsControl(pointName = "RemoteNamingInstanceRegisterDeregister", name = "RemoteNamingInstanceRegisterDeregister")
    @Secured(action = ActionTypes.WRITE)
    @ExtractorManager.Extractor(rpcExtractor = InstanceRequestParamExtractor.class)
    public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
        Service service = Service
                .newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
        InstanceUtil.setInstanceIdIfEmpty(request.getInstance(), service.getGroupedServiceName());
        switch (request.getType()) {
            case NamingRemoteConstants.REGISTER_INSTANCE:
                // 注册实例
                return registerInstance(service, request, meta);
            case NamingRemoteConstants.DE_REGISTER_INSTANCE:
                // 注销实例
                return deregisterInstance(service, request, meta);
            default:
                throw new NacosException(NacosException.INVALID_PARAM,
                        String.format("Unsupported request type %s", request.getType()));
        }
    }
    
    private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta)
            throws NacosException {
        // 注册实例
        clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
        
        // 发布RegisterInstanceTraceEvent
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, service.getNamespace(), service.getGroup(), service.getName(),
                request.getInstance().getIp(), request.getInstance().getPort()));
        
        return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
    }
    
    private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
        // 注销实例
        clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
        
        // 发布DeregisterInstanceTraceEvent
        NotifyCenter.publishEvent(new DeregisterInstanceTraceEvent(System.currentTimeMillis(),
                meta.getClientIp(), true, DeregisterInstanceReason.REQUEST, service.getNamespace(),
                service.getGroup(), service.getName(), request.getInstance().getIp(), request.getInstance().getPort()));
        return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
    }
    
}

ClientOperationService

public void registerInstance(Service service, Instance instance, String clientId) throws NacosException {
    // check if instance is legal
    NamingUtils.checkInstanceIsLegal(instance);

    // 获取单例的Service
    Service singleton = ServiceManager.getInstance().getSingleton(service);
    if (!singleton.isEphemeral()) {
        throw new NacosRuntimeException(NacosException.INVALID_PARAM,
                String.format("Current service %s is persistent service, can't register ephemeral instance.",
                        singleton.getGroupedServiceName()));
    }

    // 获取Client
    Client client = clientManager.getClient(clientId);
    if (!clientIsLegal(client, clientId)) {
        return;
    }

    // 获取InstancePublishInfo
    InstancePublishInfo instanceInfo = getPublishInfo(instance);
    // 保存实例和服务信息
    client.addServiceInstance(singleton, instanceInfo);
    client.setLastUpdatedTime();
    client.recalculateRevision();
    // 发布客户端注册事件 会触发客户端和服务索引的创建
    NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
    NotifyCenter
            .publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
}

ServiceManager

获取服务信息。


private final ConcurrentHashMap<Service, Service> singletonRepository;
    
private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
    
public Service getSingleton(Service service) {
    // 保存服务信息
    singletonRepository.computeIfAbsent(service, key -> {
        NotifyCenter.publishEvent(new MetadataEvent.ServiceMetadataEvent(service, false));
        return service;
    });
    Service result = singletonRepository.get(service);

    // 保存服务和命名空间信息
    namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), namespace -> new ConcurrentHashSet<>());
    namespaceSingletonMaps.get(result.getNamespace()).add(result);
    return result;
}

EphemeralIpPortClientManager

获取客户端信息。

private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();

	 // 客户端连接
   public boolean clientConnected(final Client client) {
        clients.computeIfAbsent(client.getClientId(), s -> {
            Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
            IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
            ipPortBasedClient.init();
            return ipPortBasedClient;
        });
        return true;
    }

		// 客户端取消连接
    public boolean clientDisconnected(String clientId) {
        Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);
        IpPortBasedClient client = clients.remove(clientId);
        if (null == client) {
            return true;
        }
        boolean isResponsible = isResponsibleClient(client);
        NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client, isResponsible));
        client.release();
        NotifyCenter.publishEvent(new ClientOperationEvent.ClientReleaseEvent(client, isResponsible));
        return true;
    }

		// 获取客户端
  	public Client getClient(String clientId) {
        return clients.get(clientId);
    }

AbstractClient

Nacos2.0以后新增Client模型一个客户端gRPC长连接对应一个Client,每个Client有自己唯一的id(clientId)。Client负责管理一个客户端的服务实例注册Publish和服务订阅Subscribe。

    protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);

// 保存服务实例
public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
    if (instancePublishInfo instanceof BatchInstancePublishInfo) {
        InstancePublishInfo old = publishers.put(service, instancePublishInfo);
        MetricsMonitor.incrementIpCountWithBatchRegister(old, (BatchInstancePublishInfo) instancePublishInfo);
    } else {
        if (null == publishers.put(service, instancePublishInfo)) {
            MetricsMonitor.incrementInstanceCount();
        }
    }
    NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
    Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
    return true;
}

可以看到,一个客户端仅能注册服务对应的一个实例。同时客户端注册发布的ClientChangedEvent会触发集群同步。集群同步相关内容在后续数据一致性相关文章分析。

ClientServiceIndexesManager

上述保存了服务信息、服务和命名空间的关系映射,服务和实例的关系映射。那么如何通过客户端快速检索服务呢?在上面发布的ClientRegisterServiceEvent会触发客户端金额服务索引的创建。

 	 ClientServiceIndexesManager:

    private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();

		// 监听的事件类型:
		@Override
    public List<Class<? extends Event>> subscribeTypes() {
        List<Class<? extends Event>> result = new LinkedList<>();
        result.add(ClientOperationEvent.ClientRegisterServiceEvent.class);
        result.add(ClientOperationEvent.ClientDeregisterServiceEvent.class);
        result.add(ClientOperationEvent.ClientSubscribeServiceEvent.class);
        result.add(ClientOperationEvent.ClientUnsubscribeServiceEvent.class);
        result.add(ClientOperationEvent.ClientReleaseEvent.class);
        return result;
    }
    

		// 回调方法
    @Override
    public void onEvent(Event event) {
        if (event instanceof ClientOperationEvent.ClientReleaseEvent) {
            handleClientDisconnect((ClientOperationEvent.ClientReleaseEvent) event);
        } else if (event instanceof ClientOperationEvent) {
            handleClientOperation((ClientOperationEvent) event);
        }
    }

		// 处理客户端操作事件
    private void handleClientOperation(ClientOperationEvent event) {
        Service service = event.getService();
        String clientId = event.getClientId();
      	// 处理客户端注册事件
        if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
            addPublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
            removePublisherIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
            addSubscriberIndexes(service, clientId);
        } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
            removeSubscriberIndexes(service, clientId);
        }
    }
    
		// 添加客户端和服务的索引
    private void addPublisherIndexes(Service service, String clientId) {
      
        publisherIndexes.computeIfAbsent(service, key -> new ConcurrentHashSet<>());
        publisherIndexes.get(service).add(clientId);
      
      	// ServiceChangedEvent会触发订阅的推送 相关内容在服务发现和订阅文章中分析。
        NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
    }