Apollo 配置中心源码分析

​ Apollo是携程开源的一款分布式配置管理中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

Apollo配置发布和通知的过程

  1. 用户在配置中心对配置进行修改并发布

  2. 配置中心通知Apollo客户端有配置更新

  3. Apollo客户端从配置中心拉取最新的配置、更新本地配置并通知到应用

从Apollo模块看配置发布流程

Apollo四个核心模块及其主要功能

  1. ConfigService

    • 提供配置获取接口
    • 提供配置推送接口
    • 服务于Apollo客户端
  2. AdminService

    • 提供配置管理接口
    • 提供配置修改发布接口
    • 服务于管理界面Portal
  3. Client

    • 为应用获取配置,支持实时更新
    • 通过MetaServer获取ConfigService的服务列表
    • 使用客户端软负载SLB方式调用ConfigService
  4. Portal

    • 配置管理界面
    • 通过MetaServer获取AdminService的服务列表
    • 使用客户端软负载SLB方式调用AdminService

先对整体流程进行一个梳理:

* 用户修改和发布配置是通过portal调用AdminService,把配置变更保存在数据库中。

* 客户端通过长轮询访问ConfigService实时监听配置变更。默认超时时间是90秒。如果在超时前有配置变更,就会立即返回给客户端。客户端获取变化的配置,根据进行实时更新。如果超时也没有数据变更,就放304.客户端重新发起新的请求。

* 配置服务ConfigService有一个定时任务,每秒去扫描数据库,查看是否有新变更的数据。如果有数据变更就通知客户端。

下面打算对Apollo在页面修改配置后如何通知到客户端过程的源码进行分析。

说明:

  • Apollo版本为1.9.1.
  • 测试用的应用appid=apollo-demo,namespace=default,env=DEV,cluster=default

主要分为一下几个部分

  1. 页面发布配置(新增,修改和删除)
  2. configService获取到新发布的配置信息
  3. configService通知客户端最新的配置变更
  4. 客户端的同步更新Spring容器中注入的@Value的值
  5. Apollo 如何实现让自己的配置优先级最高
一、 Apollo修改配置与发布配置
1.1页面修改配置

修改name 旧值:张三 新值:张三1

URL: http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces/application/item

参数:

{"id":1,"namespaceId":1,"key":"name","value":"张三1","lineNum":1,"dataChangeCreatedBy":"apollo","dataChangeLastModifiedBy":"apollo","dataChangeCreatedByDisplayName":"apollo","dataChangeLastModifiedByDisplayName":"apollo","dataChangeCreatedTime":"2022-02-26T12:26:12.000+0800","dataChangeLastModifiedTime":"2022-02-26T12:26:12.000+0800","tableViewOperType":"update","comment":"修改姓名"}

根据上面的分析在页面修改配置是portal调用AdminService保存到数据库。所以我们到Apollo的portal模块去查找请求。Apollo使用的是restful的请求方式,它的请求格式都是/参数名1/{参数值1}/参数名2/{参数值2}/……。所以我们就去portal查询"/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/item")

@PutMapping("/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces/{namespaceName}/item")public void updateItem(@PathVariable String appId, @PathVariable String env,@PathVariable String clusterName, @PathVariable String namespaceName,@RequestBody ItemDTO item) {checkModel(isValidItem(item));String username = userInfoHolder.getUser().getUserId();item.setDataChangeLastModifiedBy(username);configService.updateItem(appId, Env.valueOf(env), clusterName, namespaceName, item);}

单个更新配置时portal通过configService.updateItem()保存数据中

public void updateItem(String appId, Env env, String clusterName, String namespace, long itemId, ItemDTO item) {restTemplate.put(env, "apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items/{itemId}",
item, appId, clusterName, namespace, itemId);}

这里就是portal通过restTemplate调用AdminService保存配置到数据库。

AdminService 中代码如下

 @PutMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/items/{itemId}")public ItemDTO update(@PathVariable("appId") String appId,@PathVariable("clusterName") String clusterName,@PathVariable("namespaceName") String namespaceName,@PathVariable("itemId") long itemId,@RequestBody ItemDTO itemDTO) {Item managedEntity = itemService.findOne(itemId);if (managedEntity == null) {throw new NotFoundException("item not found for itemId " + itemId);}Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);// In case someone constructs an attack scenarioif (namespace == null || namespace.getId() != managedEntity.getNamespaceId()) {throw new BadRequestException("Invalid request, item and namespace do not match!");}Item entity = BeanUtils.transform(Item.class, itemDTO);ConfigChangeContentBuilder builder = new ConfigChangeContentBuilder();Item beforeUpdateItem = BeanUtils.transform(Item.class, managedEntity);//protect. only value,comment,lastModifiedBy can be modifiedmanagedEntity.setValue(entity.getValue());managedEntity.setComment(entity.getComment());managedEntity.setDataChangeLastModifiedBy(entity.getDataChangeLastModifiedBy());// 保存配置到 Item表中entity = itemService.update(managedEntity);builder.updateItem(beforeUpdateItem, entity);itemDTO = BeanUtils.transform(ItemDTO.class, entity);if (builder.hasContent()) {Commit commit = new Commit();commit.setAppId(appId);commit.setClusterName(clusterName);commit.setNamespaceName(namespaceName);commit.setChangeSets(builder.build());commit.setDataChangeCreatedBy(itemDTO.getDataChangeLastModifiedBy());commit.setDataChangeLastModifiedBy(itemDTO.getDataChangeLastModifiedBy());// 保存发布信息到 commit 表中commitService.save(commit);}return itemDTO;}

我们看下数据item表中的配置信息。里面记录namespaceid,key,value,comment(配置的备注信息),可以根据上面信息查询到配置信息。

commit表中的信息。

每次修改配置都会新插入一条记录。其中changSets记录了这次变更的类型和内容。

每个changeSets中会按照createItemsupdateItemsdeleteItems分别记录了新增,修改和删除的配置项。每个分类里面又会记录具体的新增,修改和删除的具体配置信息。

1.2 查询配置列表

url:http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces

列表分别显示了有两条配置修改了,但是没有发布。在上面标记了未发布的标签。这个是怎么判断的呢?

我们一起看下源码吧。根据上面的地址,我们去portal中查询 /apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces

@GetMapping("/apps/{appId}/envs/{env}/clusters/{clusterName}/namespaces")
public List<NamespaceBO> findNamespaces(@PathVariable String appId, @PathVariable String env,@PathVariable String clusterName) {// 根据应用名,环境和集群查询配置列表,根据namespece返回配置列表。List<NamespaceBO> namespaceBOs = namespaceService.findNamespaceBOs(appId, Env.valueOf(env), clusterName);for (NamespaceBO namespaceBO : namespaceBOs) {if (permissionValidator.shouldHideConfigToCurrentUser(appId, env, namespaceBO.getBaseInfo().getNamespaceName())) {namespaceBO.hideItems();}}return namespaceBOs;}

NamespaceBO中的内容。里面包含基本信息,以及namespace内的配置列表。item中的isModified表示配置是否修改,但是没有发布。如果修改了,里面还会包含修改前后的值。

namespaceService.findNamespaceBOs()是查询该集群下所有namespaces和配置信息。现在看下namespaceService.findNamespaceBOs()的具体实现。

public List<NamespaceBO> findNamespaceBOs(String appId, Env env, String clusterName) {// 根据查询应用,环境和集群查询当前的namespaces列表,
// 查询的表 namespace jpa语句 namespaceRepository.findByAppIdAndClusterNameAndNamespaceName(appId, clusterName,namespaceName);List<NamespaceDTO> namespaces = namespaceAPI.findNamespaceByCluster(appId, env, clusterName);if (namespaces == null || namespaces.size() == 0) {throw new BadRequestException("namespaces not exist");}List<NamespaceBO> namespaceBOs = new LinkedList<>();for (NamespaceDTO namespace : namespaces) {NamespaceBO namespaceBO;try {//根据环境查询得到NamespaceBOnamespaceBO = transformNamespace2BO(env, namespace);namespaceBOs.add(namespaceBO);} catch (Exception e) {logger.error("parse namespace error. app id:{}, env:{}, clusterName:{}, namespace:{}",appId, env, clusterName, namespace.getNamespaceName(), e);throw e;}}return namespaceBOs;}

transformNamespace2BO作用就是查询出namespace中哪些是修改的,哪些是删除的。看下面代码前的前置内容

  • apollo 对数据库的操作都是使用JPA,查询时@Where(clause = “isDeleted = 0”) 默认排除了已删除的

  • -对涉及到的几张表的说明

​ release:每次发布生效的配置记录。里面的Configurations 是对当前生效的配置列表的JSON串。已删除的配置不会保存在里面。

​ item:保存配置的表。adminService中新增,修改和删除配置都是更新这张表。里面是配置的最新值,但是配置的状态可能是已发布的,也可能是已修改但未发布的。

​ commit:保存每次配置修改的记录,里面记录每次修改配置提交时的新增,修改和删除的配置列表。

{“createItems”:[],“updateItems”:[{“oldItem”:{“namespaceId”:1,“key”:“age”,“value”:“21”,“comment”:“年龄修改”,“lineNum”:2,“id”:2,“isDeleted”:false,“dataChangeCreatedBy”:“apollo”,“dataChangeCreatedTime”:“2022-02-26 12:26:23”,“dataChangeLastModifiedBy”:“apollo”,“dataChangeLastModifiedTime”:“2022-03-05 09:56:27”},“newItem”:{“namespaceId”:1,“key”:“age”,“value”:“22”,“comment”:“年龄修改2”,“lineNum”:2,“id”:2,“isDeleted”:false,“dataChangeCreatedBy”:“apollo”,“dataChangeCreatedTime”:“2022-02-26 12:26:23”,“dataChangeLastModifiedBy”:“apollo”,“dataChangeLastModifiedTime”:“2022-03-05 21:35:48”}}],“deleteItems”:[]}

  • 如何判断配置是否发布

如果在item表中存在值跟最新发布生效的配置值不一样,则可能是新增或者修改的值但是为发布

  • 如何判断配置已删除

​ 查询最后一次发布记录,获取最后一次发布配置的时间。然后查询commit表中在最后一次发布配置后,所有的commit记录。然后从里面取出所有的删除配置列表。就得到的删除但没有发布的配置列表

private NamespaceBO transformNamespace2BO(Env env, NamespaceDTO namespace) {NamespaceBO namespaceBO = new NamespaceBO();namespaceBO.setBaseInfo(namespace);String appId = namespace.getAppId();String clusterName = namespace.getClusterName();String namespaceName = namespace.getNamespaceName();fillAppNamespaceProperties(namespaceBO);List<ItemBO> itemBOs = new LinkedList<>();namespaceBO.setItems(itemBOs);//latest ReleaseReleaseDTO latestRelease;Map<String, String> releaseItems = new HashMap<>();Map<String, ItemDTO> deletedItemDTOs = new HashMap<>();// 查询最后一次发布记录,里面保存了最新发布的,已经生效的的所有配置信息,不包括只删除的配置,json串保存。而items中的是最新的值,但可能是已发布的页可能是未发布的配置。// 查询的表 Release  jpa语句  releaseRepository.findFirstByAppIdAndClusterNameAndNamespaceNameAndIsAbandonedFalseOrderByIdDesc(appId,clusterName,namespaceName);latestRelease = releaseService.loadLatestRelease(appId, env, clusterName, namespaceName);if (latestRelease != null) {releaseItems = GSON.fromJson(latestRelease.getConfigurations(), GsonType.CONFIG);}//not Release config items 开始处理未发布的配置// 查询namespace下未删除的配置列表。列表中的内容可能有未发布的配置// 查询的表 Item List<Item> items = itemRepository.findByNamespaceIdOrderByLineNumAsc(namespaceId);List<ItemDTO> items = itemService.findItems(appId, env, clusterName, namespaceName);additionalUserInfoEnrichService.enrichAdditionalUserInfo(items, BaseDtoUserInfoEnrichedAdapter::new);int modifiedItemCnt = 0;for (ItemDTO itemDTO : items) {// 判断内容是否更改,并设置修改前和修改后的值。通过对比最后一次发布记录中的值与当前最新的值是否一致,如果不一致说明是修改后没有发布。ItemBO itemBO = transformItem2BO(itemDTO, releaseItems);if (itemBO.isModified()) {modifiedItemCnt++;}itemBOs.add(itemBO);}//deleted items 开始处理已删除的配置// 调用adminService 获取最后一次发布后的已删除的配置列表itemService.findDeletedItems(appId, env, clusterName, namespaceName).forEach(item -> {deletedItemDTOs.put(item.getKey(),item);});List<ItemBO> deletedItems = parseDeletedItems(items, releaseItems, deletedItemDTOs);itemBOs.addAll(deletedItems);modifiedItemCnt += deletedItems.size();namespaceBO.setItemModifiedCnt(modifiedItemCnt);return namespaceBO;}
1.3 发布配置

url:http://localhost:8070/apps/apollo-demo/envs/DEV/clusters/default/namespaces/application/releases

参数:{“releaseTitle”:“20220305225621-release”,“releaseComment”:“发布删除的111”,“isEmergencyPublish”:false}

  public ReleaseDTO createRelease(@PathVariable String appId,@PathVariable String env, @PathVariable String clusterName,@PathVariable String namespaceName, @RequestBody NamespaceReleaseModel model) {model.setAppId(appId);model.setEnv(env);model.setClusterName(clusterName);model.setNamespaceName(namespaceName);if (model.isEmergencyPublish() && !portalConfig.isEmergencyPublishAllowed(Env.valueOf(env))) {throw new BadRequestException(String.format("Env: %s is not supported emergency publish now", env));}
// 插入release记录ReleaseDTO createdRelease = releaseService.publish(model);ConfigPublishEvent event = ConfigPublishEvent.instance();event.withAppId(appId).withCluster(clusterName).withNamespace(namespaceName).withReleaseId(createdRelease.getId()).setNormalPublishEvent(true).setEnv(Env.valueOf(env));
// 发出发布eventpublisher.publishEvent(event);return createdRelease;}

releaseService.publish(model) 调用adminService 中的插入release记录。adminService代码如下:

  @PostMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases")public ReleaseDTO publish(@PathVariable("appId") String appId,@PathVariable("clusterName") String clusterName,@PathVariable("namespaceName") String namespaceName,@RequestParam("name") String releaseName,@RequestParam(name = "comment", required = false) String releaseComment,@RequestParam("operator") String operator,@RequestParam(name = "isEmergencyPublish", defaultValue = "false") boolean isEmergencyPublish) {Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);if (namespace == null) {throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId,clusterName, namespaceName));}// 保存发布记录Release release = releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);//send release message 发送发布消息Namespace parentNamespace = namespaceService.findParentNamespace(namespace);String messageCluster;if (parentNamespace != null) {messageCluster = parentNamespace.getClusterName();} else {messageCluster = clusterName;}// 实际发布信息messageSender.sendMessage(ReleaseMessageKeyGenerator.generate(appId, messageCluster, namespaceName),Topics.APOLLO_RELEASE_TOPIC);return BeanUtils.transform(ReleaseDTO.class, release);}

保存发布记录 releaseService.publish(namespace, releaseName, releaseComment, operator, isEmergencyPublish);

*查询namespace下所有未删除的配置列表

*先查询最新的发布记录,获取上次最新发布记录的id。

*组装release信息,插入到数据库中

发送消息通知ConfigService有新配置发布

Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace。这里发布后需要通知ConfigService有新的配置发布。configService获取新发布的配置信息,推送给client。

二、配置发布后的实时推送设计

上图简要描述了配置发布的大致过程:

  1. 用户在Portal操作配置发布
  2. Portal调用Admin Service的接口操作发布
  3. Admin Service发布配置后,发送ReleaseMessage给各个Config Service
  4. Config Service收到ReleaseMessage后,通知对应的客户端

####### 2.1 发送ReleaseMessage的实现方式

Admin Service在配置发布后,需要通知所有的Config Service有配置发布,从而Config Service可以通知对应的客户端来拉取最新的配置。

从概念上来看,这是一个典型的消息使用场景,Admin Service作为producer发出消息,各个Config Service作为consumer消费消息。通过一个消息组件(Message Queue)就能很好的实现Admin Service和Config Service的解耦。

在实现上,考虑到Apollo的实际使用场景,以及为了尽可能减少外部依赖,我们没有采用外部的消息中间件,而是通过数据库实现了一个简单的消息队列。

实现方式如下:

  1. Admin Service在配置发布后会往ReleaseMessage表插入一条消息记录,消息内容就是配置发布的AppId+Cluster+Namespace,参见DatabaseMessageSender

  2. Config Service有一个线程会每秒扫描一次ReleaseMessage表,看看是否有新的消息记录,参见ReleaseMessageScanner

  3. Config Service如果发现有新的消息记录,那么就会通知到所有的消息监听器(ReleaseMessageListener),如NotificationControllerV2,消息监听器的注册过程参见ConfigServiceAutoConfiguration

  4. NotificationControllerV2得到配置发布的AppId+Cluster+Namespace后,会通知对应的客户端

示意图如下:

现在看下Apollo源码中的具体实现过程

2.1 configService 定时扫描 releaseMessage

在Java配置类 ConfigServiceAutoConfiguration 中 配置了注册ReleaseMessageScanner到spring容器中。

  @Beanpublic ReleaseMessageScanner releaseMessageScanner() {// ReleaseMessageScanner 构造方法中初始化一个定时执行的线程池ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();// releaseMessageScanner注册监听器列表,获取到信息发布的消息,会调用监听器列表//0. handle release message cachereleaseMessageScanner.addMessageListener(releaseMessageServiceWithCache);//1. handle gray release rulereleaseMessageScanner.addMessageListener(grayReleaseRulesHolder);//2. handle server cachereleaseMessageScanner.addMessageListener(configService);releaseMessageScanner.addMessageListener(configFileController);//3. notify clients 通知客户端releaseMessageScanner.addMessageListener(notificationControllerV2);releaseMessageScanner.addMessageListener(notificationController);return releaseMessageScanner;}

看下ReleaseMessageScanner构造方法

public ReleaseMessageScanner() {listeners = Lists.newCopyOnWriteArrayList();// 初始化一个定时每秒执行的线程池 executorServiceexecutorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ReleaseMessageScanner", true));missingReleaseMessages = Maps.newHashMap();}

同时,我们注意到ReleaseMessageScanner实现了InitializingBean接口,通过afterPropertiesSet方法对spring进行扩展。

public class ReleaseMessageScanner implements InitializingBean {@Overridepublic void afterPropertiesSet() throws Exception {// 获取配置扫描的时间间隔databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();// 初始化属性maxIdScanned,从ReleaseMessage表中获取最大IDmaxIdScanned = loadLargestMessageId();// 定时线程池执行定时扫描ReleaseMessage的任务executorService.scheduleWithFixedDelay(() -> {try {// 扫描ReleaseMessage最新发布,如果有新的发布配置就通知监听器scanMessages();} catch (Throwable ex) {logger.error("Scan and send message failed", ex);} }, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);}
}

查看scanMessages()的内容

 private void scanMessages() {boolean hasMoreMessages = true;// 一致扫描,直到没有新的发布消息while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {hasMoreMessages = scanAndSendMessages();}}private boolean scanAndSendMessages() {//current batch is 500,每次查询比maxIdScanned大的前500条发布记录信息List<ReleaseMessage> releaseMessages =releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);if (CollectionUtils.isEmpty(releaseMessages)) {return false;}// 通知消息监听器对发布的消息进行处理,这里主要是NotificationControllerV2通知客户端新发布的配置fireMessageScanned(releaseMessages);// 获取当前扫描到的最大IDlong newMaxIdScanned = releaseMessages.get(messageScanned - 1).getId();// 更新当前的扫描到的最大IDmaxIdScanned = newMaxIdScanned;// 是否继续循扫描return messageScanned == 500;}private void fireMessageScanned(Iterable<ReleaseMessage> messages) {for (ReleaseMessage message : messages) {for (ReleaseMessageListener listener : listeners) {try {listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);} catch (Throwable ex) {logger.error("Failed to invoke message listener {}", listener.getClass(), ex);}}}}
2.2 NotificationControllerV2通知客户端配置更新

实现方式如下:

  1. 客户端会发起一个Http请求到Config Service的notifications/v2接口,也就是NotificationControllerV2,参见RemoteConfigLongPollService

  2. NotificationControllerV2不会立即返回结果,而是通过Spring DeferredResult把请求挂起

  3. 如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端

  4. 如果有该客户端关心的配置发布,NotificationControllerV2会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回。客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置。

public class NotificationControllerV2 implements ReleaseMessageListener {@Overridepublic void handleMessage(ReleaseMessage message, String channel) {String content = message.getMessage();// 只处理发布消息的消息if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {return;}// 获取发布信息的namespace。比如content内容为 apollo-demo+default+application,namespace为applicationString changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);if (Strings.isNullOrEmpty(changedNamespace)) {logger.error("message format invalid - {}", content);return;}// deferredResults 就是客户端请求到ConfigService请求挂起的集合。相当于客户端的集合if (!deferredResults.containsKey(content)) {return;}//create a new list to avoid ConcurrentModificationExceptionList<DeferredResultWrapper> results = Lists.newArrayList(deferredResults.get(content));// 创建配置变更的通知对象ApolloConfigNotification configNotification = new ApolloConfigNotification(changedNamespace, message.getId());configNotification.addMessage(content, message.getId());//do async notification if too many clients //通知所有请求过来的客户端有新的配置发布。如果客户端太多就执行异步操作if (results.size() > bizConfig.releaseMessageNotificationBatch()) {largeNotificationBatchExecutorService.submit(() -> {for (int i = 0; i < results.size(); i++) {if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {try {TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());} catch (InterruptedException e) {//ignore}}logger.debug("Async notify {}", results.get(i));results.get(i).setResult(configNotification);}});return;}logger.debug("Notify {} clients for key {}", results.size(), content);for (DeferredResultWrapper result : results) {// result.setResult()后,客户端就获得了新发布的namespace信息,挂起的请求就立即返回了。result.setResult(configNotification);}logger.debug("Notification completed");}
}

通过上面的操作,在用户发布新的配置后,adminService就新增一个releaseMessage到数据库中。configService定时扫描获取新增的发布消息,调用NotificationControllerV2后通知所有的客户端哪个namespace有新的配置发布。这是客户端就获取到了有发布配置的namespace,可以请求configService拉取这个namespace的最新配置到本地。

三、客户端实时通知和定时拉取配置

上图简要描述了Apollo客户端的实现原理:

  1. 客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。(通过Http Long Polling实现)
  2. 客户端还会定时从Apollo配置中心服务端拉取应用的最新配置。
    • 这是一个fallback机制,为了防止推送机制失效导致配置不更新
    • 客户端定时拉取会上报本地版本,所以一般情况下,对于定时拉取的操作,服务端都会返回304 - Not Modified
    • 定时频率默认为每5分钟拉取一次,客户端也可以通过在运行时指定System Property: apollo.refreshInterval来覆盖,单位为分钟。
  3. 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
  4. 客户端会把从服务端获取到的配置在本地文件系统缓存一份
    • 在遇到服务不可用,或网络不通的时候,依然能从本地恢复配置
  5. 应用程序可以从Apollo客户端获取最新的配置、订阅配置更新通知
3.1 客户端初始化过程

Apollo是在PropertySourcesProcessor中实现了在spring初始化时获取配置和设置自动更新配置的以及实现将Apollo配置的优先级设置为最高的。BeanFactoryPostProcessor 是spring对bean实例化前的扩展接口,可以对beanDefine进行修改。

public class PropertySourcesProcessor implements BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {this.configUtil = ApolloInjector.getInstance(ConfigUtil.class);// 初始化和拉取配置,并这是Apollo的配置是最先生效的。同时设置实时和定时从configService获取配置initializePropertySources();// 运行时自动更新配置initializeAutoUpdatePropertiesFeature(beanFactory);}
}

我们先看initializePropertySources方法。该方法就是获取应用中设置的所有namespace从Apollo获取配置信息,然后设置Apollo的配置的优先级。

Apollo是如何设置自己配置优先于别的配置文件呢?

Spring从3.1版本开始增加了ConfigurableEnvironmentPropertySource

  • ConfigurableEnvironment

    • Spring的ApplicationContext会包含一个Environment(实现ConfigurableEnvironment接口)
    • ConfigurableEnvironment自身包含了很多个PropertySource
  • PropertySource
    • 属性源
    • 可以理解为很多个Key - Value的属性配置

在运行时的结构形如:

需要注意的是,PropertySource之间是有优先级顺序的,如果有一个Key在多个property source中都存在,那么在前面的property source优先。

所以对上图的例子:

  • env.getProperty(“key1”) -> value1
  • env.getProperty(“key2”) -> value2
  • env.getProperty(“key3”) -> value4

在理解了上述原理后,Apollo和Spring/Spring Boot集成的手段就呼之欲出了:在应用启动阶段,Apollo从远端获取配置,然后组装成PropertySource并插入到第一个即可,如下图所示:

 private void initializePropertySources() {if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME)) {//already initializedreturn;}CompositePropertySource composite;if (configUtil.isPropertyNamesCacheEnabled()) {composite = new CachedCompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);} else {composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_PROPERTY_SOURCE_NAME);}//sort by order ascImmutableSortedSet<Integer> orders = ImmutableSortedSet.copyOf(NAMESPACE_NAMES.keySet());Iterator<Integer> iterator = orders.iterator();while (iterator.hasNext()) {int order = iterator.next();// 按照配置的Apollo的各个nameSpace的生效顺序,获取namespacefor (String namespace : NAMESPACE_NAMES.get(order)) {// 重点 从Apollo的configservice获取nameSpace的配置的信息Config config = ConfigService.getConfig(namespace);composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));}}// clean upNAMESPACE_NAMES.clear();// add after the bootstrap property source or to the first// 设置Apollo配置的优先级最高,在environment.getPropertySources()位置为0if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {// ensure ApolloBootstrapPropertySources is still the firstensureBootstrapPropertyPrecedence(environment);environment.getPropertySources().addAfter(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME, composite);} else {environment.getPropertySources().addFirst(composite);}}

我们看下应用启动是Apollo初始化获取配置的过程 ConfigService.getConfig(namespace);

  public static Config getConfig(String namespace) {// 入口 ,重点看 getConfigreturn s_instance.getManager().getConfig(namespace);}

看下com.ctrip.framework.apollo.internals.DefaultConfigManager#getConfig()方法

  @Overridepublic Config getConfig(String namespace) {// 从内存中获取配置Config config = m_configs.get(namespace);// 双重检验if (config == null) {synchronized (this) {config = m_configs.get(namespace);if (config == null) {ConfigFactory factory = m_factoryManager.getFactory(namespace);// 创建配置config = factory.create(namespace);// 设置nameSpace的配置m_configs.put(namespace, config);}}}return config;}com.ctrip.framework.apollo.spi.DefaultConfigFactory#createpublic Config create(String namespace) {// 获取namespace的格式,一般是propertiesConfigFileFormat format = determineFileFormat(namespace);// YML和yaml文件if (ConfigFileFormat.isPropertiesCompatible(format)) {return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));}//重点 createLocalConfigRepository(namespace) 开启定时任务定时从远端拉取进行同步和长轮询return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));}protected Config createRepositoryConfig(String namespace, ConfigRepository configRepository) {return new DefaultConfig(namespace, configRepository);}

看下Apollo创建配置过程 com.ctrip.framework.apollo.spi.DefaultConfigFactory#create

public Config create(String namespace) {// 获取namespace的格式,一般是propertiesConfigFileFormat format = determineFileFormat(namespace);// YML和yaml文件if (ConfigFileFormat.isPropertiesCompatible(format)) {return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));}// 重点 createLocalConfigRepository(namespace) 开启定时任务定时从远端拉取进行同步和和长轮询return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));}LocalFileConfigRepository createLocalConfigRepository(String namespace) {// createRemoteConfigRepository 客户端连接远端,本地配置与远端配置进行同步,启动定时拉起配置和发起对ConfigService请求获取实时的配置变更return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));}

我们看下RemoteConfigRepository的构造方法

public RemoteConfigRepository(String namespace) {m_namespace = namespace;m_configCache = new AtomicReference<>();m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);m_httpClient = ApolloInjector.getInstance(HttpClient.class);m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);m_longPollServiceDto = new AtomicReference<>();m_remoteMessages = new AtomicReference<>();m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());m_configNeedForceRefresh = new AtomicBoolean(true);m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),m_configUtil.getOnErrorRetryInterval() * 8);// 从远端拉取和本地配置进行同步,超时时间为5秒this.trySync();// 定时周期拉取最新(主动取configService拉取配置到本地,每5秒拉取一次,通过调用trySync(),)this.schedulePeriodicRefresh();// 长轮询实时刷新(远端推送)this.scheduleLongPollingRefresh();
}
3.2 拉取远端配置同步到本地
 protected boolean trySync() {try {sync();return true;} catch (Throwable ex) {logger.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil.getDetailMessage(ex));}return false;}@Overrideprotected synchronized void sync() {Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");try {// 获取本地内存当前的配置ApolloConfig previous = m_configCache.get();// 获取远端的当前配置ApolloConfig current = loadApolloConfig();//reference equals means HTTP 304if (previous != current) {logger.debug("Remote Config refreshed!");// 远端配置覆盖本地配置m_configCache.set(current);// 远端配置有更新时,通知本地配置变更。this.fireRepositoryChange(m_namespace, this.getConfig());}} catch (Throwable ex) {throw ex;}}for (RepositoryChangeListener listener : m_listeners) {try {// 调用listener更新配置listener.onRepositoryChange(namespace, newProperties);} catch (Throwable ex) {logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);}}}

远端配置同步后,如果有更新就会更新本地缓存的配置文件。实现就在 com.ctrip.framework.apollo.internals.LocalFileConfigRepository#onRepositoryChange

  @Overridepublic void onRepositoryChange(String namespace, Properties newProperties) {if (newProperties.equals(m_fileProperties)) {return;}Properties newFileProperties = propertiesFactory.getPropertiesInstance();newFileProperties.putAll(newProperties);// 更新本地缓存updateFileProperties(newFileProperties, m_upstream.getSourceType());this.fireRepositoryChange(namespace, newProperties);}private synchronized void updateFileProperties(Properties newProperties, ConfigSourceType sourceType) {this.m_sourceType = sourceType;// 判断本地配置列表跟远端配置列表是否一致。Hashtable比较里面key的值是否一致if (newProperties.equals(m_fileProperties)) {return;}// 否则远端的配置覆盖本地配置this.m_fileProperties = newProperties;// 写入到本地的配置文件中persistLocalCacheFile(m_baseDir, m_namespace);}

现在我们一起看下client如何拉取从configService拉取配置的。主要流程如下

** 获取远端configService地址列表,负载均衡选择一个去拉取配置

** 拼接url并请求configService。url结构如下:http://192.168.100.2:8080/configs/apollo-demo/default/application?ip=192.168.100.2。com.ctrip.framework.apollo.configservice.controller.ConfigController#queryConfig

代码如下

private ApolloConfig loadApolloConfig() {// 本地拉取远端配置限流每5秒一次if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {//wait at most 5 secondstry {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}}String appId = m_configUtil.getAppId();String cluster = m_configUtil.getCluster();String dataCenter = m_configUtil.getDataCenter();String secret = m_configUtil.getAccessKeySecret();Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;long onErrorSleepTime = 0; // 0 means no sleepThrowable exception = null;// 获取远端的配置服务configServie地址列表List<ServiceDTO> configServices = getConfigServices();String url = null;retryLoopLabel:for (int i = 0; i < maxRetries; i++) {List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);// 对configServie地址列表进行乱序,相当于随机选取一个服务进行调用。负载均衡Collections.shuffle(randomConfigServices);//Access the server which notifies the client first// 优先访问通知过客户端的服务,放在列表的第一位if (m_longPollServiceDto.get() != null) {randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));}for (ServiceDTO configService : randomConfigServices) {if (onErrorSleepTime > 0) {logger.warn("Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);try {m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);} catch (InterruptedException e) {//ignore}}// 组装url http://192.168.100.2:8080/configs/apollo-demo/default/application?ip=192.168.100.2 请求的是configService的 ConfigController 的/{appId}/{clusterName}/{namespace:.+}url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,dataCenter, m_remoteMessages.get(), m_configCache.get());logger.debug("Loading config from {}", url);HttpRequest request = new HttpRequest(url);if (!StringUtils.isBlank(secret)) {Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);request.setHeaders(headers);}try {// 从远端获取配置信息,HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);m_configNeedForceRefresh.set(false);m_loadConfigFailSchedulePolicy.success();// 如果请求返回304,说明配置没变化,返回本地缓存的配置对象if (response.getStatusCode() == 304) {logger.debug("Config server responds with 304 HTTP status code.");return m_configCache.get();}// 否则返回实际返回的配置信息ApolloConfig result = response.getBody();logger.debug("Loaded config for {}: {}", m_namespace, result);return result;} catch (ApolloConfigStatusCodeException ex) {ApolloConfigStatusCodeException statusCodeException = ex;//config not foundif (ex.getStatusCode() == 404) {String message = String.format("Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +"please check whether the configs are released in Apollo!",appId, cluster, m_namespace);statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),message);}exception = statusCodeException;if(ex.getStatusCode() == 404) {break retryLoopLabel;}} catch (Throwable ex) {exception = ex;} // if force refresh, do normal sleep, if normal config load, do exponential sleeponErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :m_loadConfigFailSchedulePolicy.fail();}}String message = String.format("Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s",appId, cluster, m_namespace, url);throw new ApolloConfigException(message, exception);}

接下来看下configService中如何获取配置的。主要逻辑就是configservice获取release表中最新的发布记录,得到最新发布记录的releaseKey,与客户端带来的clientSideReleaseKey比较是否一致。如果一致,说明没有配置变化,返回304.否则就把最新发布的配置返回。

 @GetMapping(value = "/{appId}/{clusterName}/{namespace:.+}")public ApolloConfig queryConfig(@PathVariable String appId,@PathVariable String clusterName,@PathVariable String namespace,@RequestParam(value = "dataCenter", required = false) String dataCenter,@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,@RequestParam(value = "ip", required = false) String clientIp,@RequestParam(value = "messages", required = false) String messagesAsString,HttpServletRequest request, HttpServletResponse response) throws IOException {String originalNamespace = namespace;//strip out .properties suffixnamespace = namespaceUtil.filterNamespaceName(namespace);//fix the character case issue, such as FX.apollo <-> fx.apollonamespace = namespaceUtil.normalizeNamespace(appId, namespace);if (Strings.isNullOrEmpty(clientIp)) {clientIp = tryToGetClientIp(request);}ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);List<Release> releases = Lists.newLinkedList();String appClusterNameLoaded = clusterName;if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {// 查找Release表中最新的发布记录Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,dataCenter, clientMessages);if (currentAppRelease != null) {releases.add(currentAppRelease);//we have cluster search process, so the cluster name might be overriddenappClusterNameLoaded = currentAppRelease.getClusterName();}}//if namespace does not belong to this appId, should check if there is a public configurationif (!namespaceBelongsToAppId(appId, namespace)) {Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,dataCenter, clientMessages);if (Objects.nonNull(publicRelease)) {releases.add(publicRelease);}}if (releases.isEmpty()) {response.sendError(HttpServletResponse.SC_NOT_FOUND,String.format("Could not load configurations with appId: %s, clusterName: %s, namespace: %s",appId, clusterName, originalNamespace));Tracer.logEvent("Apollo.Config.NotFound",assembleKey(appId, clusterName, originalNamespace, dataCenter));return null;}auditReleases(appId, clusterName, dataCenter, clientIp, releases);String mergedReleaseKey = releases.stream().map(Release::getReleaseKey).collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
// 比较客户端请求带来的clientSideReleaseKey与服务端是否一致,如果一致说明没有新的配置发布,返回304if (mergedReleaseKey.equals(clientSideReleaseKey)) {// Client side configuration is the same with server side, return 304response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);Tracer.logEvent("Apollo.Config.NotModified",assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));return null;}ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,mergedReleaseKey);apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,originalNamespace, dataCenter));return apolloConfig;}

######3.3 客户端定时拉取同步远端配置

讲完了客户端同步远端配置,我们重新回到RemoteConfigRepository的构造方法。

 public RemoteConfigRepository(String namespace) {m_namespace = namespace;m_configCache = new AtomicReference<>();m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);m_httpClient = ApolloInjector.getInstance(HttpClient.class);m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);m_longPollServiceDto = new AtomicReference<>();m_remoteMessages = new AtomicReference<>();m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());m_configNeedForceRefresh = new AtomicBoolean(true);m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),m_configUtil.getOnErrorRetryInterval() * 8);// 从远端拉取和本地配置进行同步,超时时间为5秒this.trySync();// 定时周期拉取最新(主动取configService拉取配置到本地,每5秒拉取一次,通过调用trySync(),)this.schedulePeriodicRefresh();// 长轮询实时刷新(远端推送)this.scheduleLongPollingRefresh();}

接下来查看定时周期拉取配置,同步本地配置,避免长轮询失败导致本地与远端配置不一致。是一种兜底的操作。

代码很简单就是启动一个定时线程池,定时调用同步配置的trySync()方法。每5分钟定时拉取一次。

  private void schedulePeriodicRefresh() {logger.debug("Schedule periodic refresh with interval: {} {}",m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());m_executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {logger.debug("refresh config for namespace: {}", m_namespace);trySync();}}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),m_configUtil.getRefreshIntervalTimeUnit());}
3.3 客户端长轮询实时同步配置
  private void scheduleLongPollingRefresh() {// submit 方法中进行长轮询remoteConfigLongPollService.submit(m_namespace, this);}public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);if (!m_longPollStarted.get()) {startLongPolling();}return added;}private void startLongPolling() {if (!m_longPollStarted.compareAndSet(false, true)) {//already startedreturn;}try {final String appId = m_configUtil.getAppId();final String cluster = m_configUtil.getCluster();final String dataCenter = m_configUtil.getDataCenter();final String secret = m_configUtil.getAccessKeySecret();final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();m_longPollingService.submit(new Runnable() {@Overridepublic void run() {// 初始化延迟2秒if (longPollingInitialDelayInMills > 0) {try {logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);} catch (InterruptedException e) {//ignore}}// 定时任务长轮询查询是否存在新的变更doLongPollingRefresh(appId, cluster, dataCenter, secret);}});} catch (Throwable ex) {m_longPollStarted.set(false);ApolloConfigException exception =new ApolloConfigException("Schedule long polling refresh failed", ex);logger.warn(ExceptionUtil.getDetailMessage(exception));}}
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {final Random random = new Random();ServiceDTO lastServiceDto = null;while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {//wait at most 5 secondstry {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}}Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");String url = null;try {// 如果是第一次长轮询if (lastServiceDto == null) {// 获取远端的配置服务列表List<ServiceDTO> configServices = getConfigServices();// 随机取一个configServcie进行长轮询lastServiceDto = configServices.get(random.nextInt(configServices.size()));}// m_notifications 存储每个namespace对应的releaseMessageIdurl = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter, m_notifications);// 长轮询url=http://192.168.100.2:8080/notifications/v2?cluster=default&appId=apollo-demo&ip=192.168.100.2&notifications=[{"namespaceName":"application","notificationId":-1}]logger.debug("Long polling from {}", url);HttpRequest request = new HttpRequest(url);//90 seconds, should be longer than server side's long polling timeout, which is now 60 seconds// 客户端请求超时时间为90 秒,应该比服务器端的长轮询超时时间长,服务端现在是 60 秒request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);if (!StringUtils.isBlank(secret)) {Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);request.setHeaders(headers);}final HttpResponse<List<ApolloConfigNotification>> response =m_httpClient.doGet(request, m_responseType);logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
//        客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中
//        客户端会把从服务端获取到的配置在本地文件系统缓存一份if (response.getStatusCode() == 200 && response.getBody() != null) {//      response.getBody()    ApolloConfigNotification{namespaceName='application', notificationId=971}// 根据namespace 和 notificationId 进行更新,已经更新的记录updateNotifications(response.getBody());updateRemoteNotifications(response.getBody());// 重点 通知本地拉取配置notify(lastServiceDto, response.getBody());}//try to load balance 如果配置没有变化,就设置lastServiceDto为空,下次随机获取一个服务地址列表if (response.getStatusCode() == 304 && random.nextBoolean()) {lastServiceDto = null;}m_longPollFailSchedulePolicyInSecond.success();} catch (Throwable ex) {lastServiceDto = null;long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();logger.warn("Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));try {TimeUnit.SECONDS.sleep(sleepTimeInSecond);} catch (InterruptedException ie) {//ignore}} }}

长轮询的状态码如果是200,会根据返回的结果然本地重新拉取配置。返回结果就是namespace和releaseMessageId。

我们看下configService长轮询返回结果的代码。主要就是把请求转换为deferredResultWrapper 存储起来,对应的key是releaseMessage中的message(比如 apollo-demo+default+application),value是所有请求到该configService的请求转换的deferredResultWrapper的集合,如果configService扫描到有新的releaseMessage,就会查找根据message查询出所有的client,设置deferredResultWrapper的结果,让请求提前结束。否则请求会一直等待直到超时返回。

@GetMappingpublic DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(@RequestParam(value = "appId") String appId,@RequestParam(value = "cluster") String cluster,@RequestParam(value = "notifications") String notificationsAsString,@RequestParam(value = "dataCenter", required = false) String dataCenter,@RequestParam(value = "ip", required = false) String clientIp) {List<ApolloConfigNotification> notifications = null;try {notifications =gson.fromJson(notificationsAsString, notificationsTypeReference);} catch (Throwable ex) {Tracer.logError(ex);}if (CollectionUtils.isEmpty(notifications)) {throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);}Map<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);if (CollectionUtils.isEmpty(filteredNotifications)) {throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);}//这里是构建DeferredResult对象,一会存储起来。NotificationControllerV2不会立即返回结果,而是通过Spring DeferredResult 把请求挂起。//如果在60秒内没有该客户端关心的配置发布,那么会返回Http状态码304给客户端DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {String normalizedNamespace = notificationEntry.getKey();ApolloConfigNotification notification = notificationEntry.getValue();namespaces.add(normalizedNamespace);clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);}}Multimap<String, String> watchedKeysMap =watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());/*** 1、set deferredResult before the check, for avoid more waiting* If the check before setting deferredResult,it may receive a notification the next time* when method handleMessage is executed between check and set deferredResult.* 在check之前设置deferredResult,避免更多的等待。如果在设置deferredResult之前进行check,* 。下次在check和set deferredResult之间执行handleMessage方法时可能会收到通知。*/deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));deferredResultWrapper.onCompletion(() -> {//unregister all keysfor (String key : watchedKeys) {deferredResults.remove(key, deferredResultWrapper);}logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");});//register all keys 存储请求对应的deferredResultWrapper映射关系for (String key : watchedKeys) {this.deferredResults.put(key, deferredResultWrapper);}logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",watchedKeys, appId, cluster, namespaces, dataCenter);/*** 2、check new release* 查询最新的releaseMessage中的最新消息记录,从缓存中获取的*/List<ReleaseMessage> latestReleaseMessages =releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);/*** Manually close the entity manager.* Since for async request, Spring won't do so until the request is finished,* which is unacceptable since we are doing long polling - means the db connection would be hold* for a very long time* 手动关闭实体管理器。由于对于异步请求,Spring 在请求完成之前不会这样做,这是不可接受的,因为我们正在进行长时间轮询 - 意味着数据库连接将保持很长时间*/entityManagerUtil.closeEntityManager();
/*** 如果有该客户端关心的配置发布,NotificationControllerV2会调用DeferredResult的setResult方法,传入有配置变化的namespace信息,同时该请求会立即返回。* 客户端从返回的结果中获取到配置变化的namespace后,会立即请求Config Service获取该namespace的最新配置。*/
// 查询当前是否有未同步的已发布的releaseMessage,如果有就直接返回List<ApolloConfigNotification> newNotifications =getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,latestReleaseMessages);if (!CollectionUtils.isEmpty(newNotifications)) {deferredResultWrapper.setResult(newNotifications);}// 等待直到超时才返回304或者listener监听到发布配置调用 handleMessage() deferredResultWrapper.setResult()提前返回。return deferredResultWrapper.getResult();}

如果有配置变更,长轮询返回namespace和releaseMessageId。客户端会对该namespace进行拉取配置。每个namespace会对应一个单门同步远端配置的对象。

private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {if (notifications == null || notifications.isEmpty()) {return;}for (ApolloConfigNotification notification : notifications) {String namespaceName = notification.getNamespaceName();//create a new list to avoid ConcurrentModificationExceptionList<RemoteConfigRepository> toBeNotified =Lists.newArrayList(m_longPollNamespaces.get(namespaceName));ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();//since .properties are filtered out by default, so we need to check if there is any listener for ittoBeNotified.addAll(m_longPollNamespaces.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {try {// 从远端拉取配置同步到本地remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);} catch (Throwable ex) {Tracer.logError(ex);}}}
}可以看到还是通过trySync()进行同步本地配置public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {m_longPollServiceDto.set(longPollNotifiedServiceDto);m_remoteMessages.set(remoteMessages);m_executorService.submit(new Runnable() {@Overridepublic void run() {m_configNeedForceRefresh.set(true);trySync();}});}
3.4 Apollo配置在spring中placeholder在运行时自动更新

https://github.com/apolloconfig/apollo/pull/972

回到Apollo客户端启动的代码

public class PropertySourcesProcessor implements BeanFactoryPostProcessor, EnvironmentAware, PriorityOrdered {@Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {this.configUtil = ApolloInjector.getInstance(ConfigUtil.class);// 初始化和拉取配置,并设置Apollo的配置是最先生效的。同时设置实时和定时从configService获取配置initializePropertySources();// 运行时自动更新配置initializeAutoUpdatePropertiesFeature(beanFactory);}
}

上面已经讲完了客户端初始化和同步配置的功能。下面继续讲运行时占位符自动更新的功能。

如果要实现Spring占位符值在Apollo配置变更后自动更新,需要满足下面的前提:

1,能监听Apollo的配置变化(Apollo可以为配置添加监听器实现)

2,在Spring容器初始化时把xml和Bean中的占位符与对应bean的映射关系保存起来。在配置变化时根据Apollo变更的key,查找出哪些bean中的属性需要自动更新,并进行更新操作。(Apollo中新增了ApolloProcessor,SpringValueProcessor,ApolloAnnotationProcessor 进行占位符的注册)

spring占位符自动更新的逻辑就是在spring容器实初始化时新增了ApolloProcessor扩展了BeanPostProcessor,并新增子类子类 SpringValueProcessor(处理Spring中作用在field和method上的@Value和xml文件中的占位符)与ApolloAnnotationProcessor(Apollo自定义注解ApolloJsonValue)中的占位符组装成SpringValue对象,并注册在springValueRegistry中。key是占位符名称,value是SpringValue列表(因为一个占位符可能用在多个Bean对象中)。然后为所有的Apollo的Namespace对象添加监听器,一旦配置变化。就到springValueRegistry根据变更的key产找SpringValue,如果存在就更新对应Bean的属性值。

  private void initializeAutoUpdatePropertiesFeature(ConfigurableListableBeanFactory beanFactory) {if (!configUtil.isAutoUpdateInjectedSpringPropertiesEnabled() ||!AUTO_UPDATE_INITIALIZED_BEAN_FACTORIES.add(beanFactory)) {return;}// 创建一个Apollo配置变更的监听器,用于监听配置变化,查看AutoUpdateConfigChangeListener 的onChange方法AutoUpdateConfigChangeListener autoUpdateConfigChangeListener = new AutoUpdateConfigChangeListener(environment, beanFactory);// 对所有的配置文件添加自动更新的监听器,List<ConfigPropertySource> configPropertySources = configPropertySourceFactory.getAllConfigPropertySources();for (ConfigPropertySource configPropertySource : configPropertySources) {configPropertySource.addChangeListener(autoUpdateConfigChangeListener);}}

重点关注 AutoUpdateConfigChangeListener中的onChange方法。

  @Overridepublic void onChange(ConfigChangeEvent changeEvent) {Set<String> keys = changeEvent.changedKeys();if (CollectionUtils.isEmpty(keys)) {return;}for (String key : keys) {// 1. check whether the changed key is relevant。查找变更的key关联的bean列表信息。每注入一个bean中就多一个SpringValue对象Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);if (targetValues == null || targetValues.isEmpty()) {continue;}// 2. update the valuefor (SpringValue val : targetValues) {updateSpringValue(val);}}}private void updateSpringValue(SpringValue springValue) {try {Object value = resolvePropertyValue(springValue);springValue.update(value);logger.info("Auto update apollo changed value successfully, new value: {}, {}", value,springValue);} catch (Throwable ex) {logger.error("Auto update apollo changed value failed, {}", springValue.toString(), ex);}}public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {if (isField()) {injectField(newVal);} else {injectMethod(newVal);}}private void injectField(Object newVal) throws IllegalAccessException {Object bean = beanRef.get();if (bean == null) {return;}boolean accessible = field.isAccessible();field.setAccessible(true);field.set(bean, newVal);field.setAccessible(accessible);}private void injectMethod(Object newVal)throws InvocationTargetException, IllegalAccessException {Object bean = beanRef.get();if (bean == null) {return;}methodParameter.getMethod().invoke(bean, newVal);}

那么Apollo怎样将占位符的和bean信息都保存到springValueRegistry中的呢?

其实是调用了com.ctrip.framework.apollo.spring.annotation.ApolloProcessor#postProcessBeforeInitialization

public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName)throws BeansException {Class clazz = bean.getClass();for (Field field : findAllField(clazz)) {processField(bean, beanName, field);}for (Method method : findAllMethod(clazz)) {processMethod(bean, beanName, method);}return bean;}/*** subclass should implement this method to process field*/protected abstract void processField(Object bean, String beanName, Field field);/*** subclass should implement this method to process method*/protected abstract void processMethod(Object bean, String beanName, Method method);

com.ctrip.framework.apollo.spring.annotation.SpringValueProcessor#processField
@Overrideprotected void processField(Object bean, String beanName, Field field) {// register @Value on fieldValue value = field.getAnnotation(Value.class);if (value == null) {return;}Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());if (keys.isEmpty()) {return;}for (String key : keys) {SpringValue springValue = new SpringValue(key, value.value(), bean, beanName, field, false);springValueRegistry.register(beanFactory, key, springValue);logger.debug("Monitoring {}", springValue);}}

处理@ApolloJsonValue

 @Overrideprotected void processField(Object bean, String beanName, Field field) {this.processApolloConfig(bean, field);this.processApolloJsonValue(bean, beanName, field);}private void processApolloJsonValue(Object bean, String beanName, Field field) {ApolloJsonValue apolloJsonValue = AnnotationUtils.getAnnotation(field, ApolloJsonValue.class);if (apolloJsonValue == null) {return;}String placeholder = apolloJsonValue.value();Object propertyValue = placeholderHelper.resolvePropertyValue(this.configurableBeanFactory, beanName, placeholder);// propertyValue will never be null, as @ApolloJsonValue will not allow thatif (!(propertyValue instanceof String)) {return;}boolean accessible = field.isAccessible();field.setAccessible(true);ReflectionUtils.setField(field, bean, parseJsonValue((String) propertyValue, field.getGenericType()));field.setAccessible(accessible);if (configUtil.isAutoUpdateInjectedSpringPropertiesEnabled()) {Set<String> keys = placeholderHelper.extractPlaceholderKeys(placeholder);for (String key : keys) {SpringValue springValue = new SpringValue(key, placeholder, bean, beanName, field, true);springValueRegistry.register(this.configurableBeanFactory, key, springValue);logger.debug("Monitoring {}", springValue);}}}
参考文档
  • 微服务架构~携程Apollo配置中心架构剖析 https://mp.weixin.qq.com/s/-hUaQPzfsl9Lm3IqQW3VDQ
  • Apollo官方文档 https://www.apolloconfig.com/#/zh/

Apollo 配置中心源码分析相关推荐

  1. Apollo配置中心源码解析

    1.Apollo配置中心总体设计 一.主流程-流程图 简要流程说明,后面源码分析 备注:一个 Namespace 对应一个 RemoteConfigRepository .多个 RemoteConfi ...

  2. apollo配置中心源码全解析

    文章目录 前言 源码解析 与springboot集成 远程配置的加载 长轮询监听配置更改 服务端长轮询机制 通过客户端发布配置 总结 前言 紧接前文nacos配置中心,本文继续讲目前比较火热的动态配置 ...

  3. spring cloud config配置中心源码分析之注解@EnableConfigServer

    spring cloud config的主函数是ConfigServerApplication,其定义如下: @Configuration @EnableAutoConfiguration @Enab ...

  4. 知其所以然之Nacos配置中心源码浅析

    文章目录 引例 NacosConfigService的初始化 ServerHttpAgent的构造函数解析 ServerListManager的构造函数解析 ConfigFilterChainMana ...

  5. Dubbo2.6.x—注册中心源码分析 dubbo-registry模块 (api and zookeeper)

    文章有点长,亲,要慢慢看! 1. 概述 1.1 注册中心作用 在Dubbo中,注册中心为核心模块,Dubbo通过注册中心实现各个服务之间的注册与发现等功能,而本次源码的分析为registry模块的ap ...

  6. nacos配置刷新失败导致的cpu上升和频繁重启,nacos配置中心源码解析

    大家好,我是烤鸭: nacos 版本 1.3.2,先说下结论,频繁重启的原因确实没有找到,跟nacos有关,日志没有保留多少,只能从源码找下头绪(出问题的版本 server用的是 nacos 1.1, ...

  7. halfstone 原理_HashMap的结构以及核心源码分析

    摘要 对于Java开发人员来说,能够熟练地掌握java的集合类是必须的,本节想要跟大家共同学习一下JDK1.8中HashMap的底层实现与源码分析.HashMap是开发中使用频率最高的用于映射(键值对 ...

  8. docker container DNS配置介绍和源码分析

    2019独角兽企业重金招聘Python工程师标准>>> 本文主要介绍了docker容器的DNS配置及其注意点,重点对docker 1.10发布的embedded DNS server ...

  9. Mybatis 核心源码分析

    一.Mybatis 整体执行流程 二.Mybatis 具体流程源码分析 三.源码分析 写一个测试类,来具体分析Mybatis 的执行流程: public class MybatisTest {publ ...

最新文章

  1. 2018目标检测最新算法+经典目标检测算法
  2. 如何定制一款12306抢票浏览器——处理预订页面和验证码自动识别功能
  3. OpenResty 最佳实践
  4. Linux运维之道之ENGINEER1.1(配置邮件服务器,数据库管理基础,表数据管理)
  5. 临床基因组学数据分析实战开课啦!!!
  6. mysql数据库约束无符号_mysql 数据类型 约束条件
  7. GDCM:gdcm::Preamble的测试程序
  8. 为窗体添加 最大化,最小化,还原等 事件
  9. 年入10亿,“山寨”耳机芯片凶猛
  10. 苹果发布iOS 13.1.1更新 修复第三方键盘APP安全等问题
  11. 映月城与电子姬服务器维护,映月城与电子姬11月16日更新公告 加强玩家作弊检测增加举报功能...
  12. Android应用程序组件Content Provider的启动过程源代码分析(5)
  13. C语言程序设计(第五版)-谭浩强著-课后习题
  14. Python Cartopy地图投影【2】
  15. 有效软件测试 - 50条建议 - 需求阶段
  16. ec6108v9a精简刷机包_华为悦盒无安装限制固件下载|华为悦盒EC6108V9A第三方精简流畅无安装限制固件 下载 - 巴士下载站...
  17. 基于腾讯云的物联网云端数据传输-STM32F103C8T6(微信小程序显示数据).一
  18. Win10+yolov5 踩坑记录
  19. 【蓝桥真题】三羊献瑞,祥瑞生辉+三羊献瑞=三羊生瑞气(暴力破解)
  20. Linux mount 命令

热门文章

  1. JavaSE_强化篇_kuang
  2. MYSQL-kuang
  3. HTML 前端学习(3)—— CSS 选择器
  4. varchar mysql,VARCHAR主键 – MySQL
  5. python小技巧:使用HTMLTestReport模板生成html报告
  6. 关于oracle使用客户端无法卸载的问题
  7. 美国20家最具创新性的初创公司
  8. 单片机应用系统设计技术——独立式键盘及其工作原理
  9. Text Data for Trading—Sentiment Analysis 代码复现(二)
  10. oracle的nvl函数返回的类型,oracle的nvl函数的用法-查询结果若是NULL则用指定值代(转) sober...