【架構(gòu)師系列】Apollo配置中心之Client端(二)
原創(chuàng)文章,轉(zhuǎn)載請標注。http://www.rzrgm.cn/boycelee/p/17978027
聲明
原創(chuàng)文章,轉(zhuǎn)載請標注。http://www.rzrgm.cn/boycelee/p/17978027
《碼頭工人的一千零一夜》是一位專注于技術(shù)干貨分享的博主,追隨博主的文章,你將深入了解業(yè)界最新的技術(shù)趨勢,以及在Java開發(fā)和安全領(lǐng)域的實用經(jīng)驗分享。無論你是開發(fā)人員還是對逆向工程感興趣的愛好者,都能在《碼頭工人的一千零一夜》找到有價值的知識和見解。
配置中心系列文章
《【架構(gòu)師視角系列】風控場景下的配置中心設(shè)計思考》 http://www.rzrgm.cn/boycelee/p/18355942
《【架構(gòu)師視角系列】Apollo配置中心之架構(gòu)設(shè)計(一)》http://www.rzrgm.cn/boycelee/p/17967590
《【架構(gòu)師視角系列】Apollo配置中心之Client端(二)》http://www.rzrgm.cn/boycelee/p/17978027
《【架構(gòu)師視角系列】Apollo配置中心之Server端(ConfigSevice)(三)》http://www.rzrgm.cn/boycelee/p/18005318
《【架構(gòu)師視角系列】QConfig配置中心系列之架構(gòu)設(shè)計(一)》http://www.rzrgm.cn/boycelee/p/18013653
《【架構(gòu)師視角系列】QConfig配置中心系列之Client端(二)》http://www.rzrgm.cn/boycelee/p/18033286
一、客戶端架構(gòu)
架構(gòu)介紹會從分層、職責、關(guān)系以及運行負責四個維度進行描述。

1、Config Service職責
(1)配置管理
Config Service 是Apollo配置中心的服務(wù)端組件,負責管理應(yīng)用程序的配置信息。它存儲和維護應(yīng)用程序的各種配置項。
(2)配置發(fā)布
Config Service 負責將最新的配置發(fā)布給注冊在它上面的Apollo Client。當配置發(fā)生變更時,Config Service 負責通知所有訂閱了相應(yīng)配置的客戶端。
(3)配置讀取
Apollo Client 向 Config Service 發(fā)送請求,獲取應(yīng)用程序的配置信息。
2、Apollo Client 職責
(1)配置拉取
Apollo Client 負責向 Config Service 發(fā)送配置拉取請求,獲取三方應(yīng)用程序的配置。
(2)配置注入
Apollo Client 將從 Config Service 獲取到的配置注入到三方應(yīng)用程序中。
(3)配置變更監(jiān)聽
Apollo Client 可以注冊對配置變更的監(jiān)聽器。當 Config Service 發(fā)布新的配置時,Apollo Client 能夠感知到配置的變更,并觸發(fā)相應(yīng)的操作。
3、基本交互流程
(1)應(yīng)用啟動
Apollo Client 在應(yīng)用啟動時向 Config Service 發(fā)送配置拉取請求,獲取初始的配置。
(2)配置變更通知
Config Service 在配置發(fā)生變更時,通知所有注冊的 Apollo Client。
(3)配置更新
Apollo Client 接收到配置變更通知后,向 Config Service 發(fā)送請求,獲取最新的配置。
(4)配置注入
Apollo Client 將獲取到的最新配置注入到應(yīng)用程序中,以便使用最新的配置信息。
通過以上交互流程達到應(yīng)用不需要重啟,動態(tài)配置變更的目的。
二、架構(gòu)思考
架構(gòu)師視角系列,在分析一款組件的源碼時,需要深入思考其設(shè)計背后的動機。以下是讀者在閱讀本篇文章時應(yīng)思考的問題:
(1)配置拉取的設(shè)計
- 思考點: 設(shè)計中采用的配置拉取方式是如何選擇的?背后的動機是什么?可能的考慮包括系統(tǒng)性能、可維護性和安全性。
(2)配置的注入方式
- 思考點: 配置是如何被注入到組件中的?這種注入方式有何優(yōu)勢?設(shè)計選擇的原因可能涉及松耦合、動態(tài)變化和代碼可維護性等方面。
(3)配置變更的通知機制
- 思考點: 配置變更是如何通知其他組件的?為什么選擇當前的通知機制?可能的考慮包括實時性、效率以及系統(tǒng)整體的架構(gòu)要求。
(4)為什么配置拉取拆分為兩個請求?
- 思考點: 配置拉取為何拆分為兩個獨立的請求?這個設(shè)計決策的目的是什么?可能涉及到性能優(yōu)化、可伸縮性以及減輕服務(wù)器負擔的考慮。
(5)長輪詢的概念
- 思考點: 什么是長輪詢?為何在配置方案中選擇使用它?長輪詢的優(yōu)勢在哪里?可能涉及到減少輪詢頻率、降低網(wǎng)絡(luò)開銷以及更及時的配置變更通知。
(6)為什么需要做本地文件緩存?
- 思考點: 為什么在組件中引入了本地文件緩存的機制?這樣的設(shè)計有哪些優(yōu)點?可能牽涉到性能優(yōu)化、離線支持以及用戶體驗的方面。
在深入研究源碼時,理解這些設(shè)計決策背后的原因,有助于更全面地理解系統(tǒng)架構(gòu),并為自己的設(shè)計提供有價值的啟示。
三、源碼剖析
1、初始化
(1)邏輯描述
通過實現(xiàn)Spring框架提供的BeanPostProcessor接口,并完成postProcessBeforeInitialization函數(shù)的實現(xiàn),我們能夠在Bean初始化之前執(zhí)行自定義的操作。BeanPostProcessor是Spring框架提供的一個擴展點,允許我們在Bean初始化前后插入自定義邏輯。在postProcessBeforeInitialization函數(shù)中,我們有機會遍歷Bean的成員變量和函數(shù),實現(xiàn)在初始化之前對它們進行定制化處理的需求。
(2)時序圖
(3)代碼位置
ApolloProcessor#postProcessBeforeInitialization
為了講解更加順暢,會沿著Method上的注解@ApolloConfigChangeListener實現(xiàn)邏輯進行講解。
public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
Class clazz = bean.getClass();
// 遍歷Bean中的成員變量
for (Field field : findAllField(clazz)) {
processField(bean, beanName, field);
}
// 遍歷Bean中的所有函數(shù)(根據(jù)這條邏輯進行講解)
for (Method method : findAllMethod(clazz)) {
processMethod(bean, beanName, method);
}
return bean;
}
...
}
2、查找注解
(1)邏輯描述
創(chuàng)建配置變化的監(jiān)聽器,并創(chuàng)建namespace對應(yīng)的config實例,將監(jiān)聽器注冊到config實例中。當發(fā)生配置變更時,會調(diào)用監(jiān)聽器的onChange函數(shù),并利用反射機制通知對應(yīng)的函數(shù)(使用@ApolloConfigChange)。
(2)時序圖

(3)代碼位置
ApolloAnnotationProcessor#processMethod
public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
EnvironmentAware {
...
@Override
protected void processMethod(final Object bean, String beanName, final Method method) {
// 處理函數(shù)上的注解(@ApolloConfigChange)(關(guān)注這里)
this.processApolloConfigChangeListener(bean, method);
this.processApolloJsonValue(bean, beanName, method);
}
private void processApolloConfigChangeListener(final Object bean, final Method method) {
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
method);
Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
"Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
method);
ReflectionUtils.makeAccessible(method);
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
// 創(chuàng)建配置變化監(jiān)聽器。當配置發(fā)生變化時,會調(diào)用onChange函數(shù)并使用反射觸發(fā)標識@ApolloConfigChange的Method
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};
Set<String> interestedKeys =
annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
Set<String> interestedKeyPrefixes =
annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes)
: null;
// 遍歷namespace
for (String namespace : namespaces) {
final String resolvedNamespace = this.environment.resolveRequiredPlaceholders(namespace);
// 創(chuàng)建(獲取)Config實例(關(guān)注這里)
Config config = ConfigService.getConfig(resolvedNamespace);
// 注冊監(jiān)聽器
if (interestedKeys == null && interestedKeyPrefixes == null) {
// 將創(chuàng)建的監(jiān)聽器注冊到namespace對應(yīng)的config實例中(關(guān)注這里)
config.addChangeListener(configChangeListener);
} else {
config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes);
}
}
}
...
}
3、建立連接
(1)邏輯描述
為注解(@ApolloConfigChange)綁定的namespace創(chuàng)建Config實例,Config實例中會會為namespace創(chuàng)建本地配置倉庫(createLocalConfigRepository處理本地配置存儲)和遠程配置倉庫(createRemoteConfigRepository處理遠程ConfigService配置拉取)。
(2)時序圖

(3)具體函數(shù)
ConfigService#getConfig
public class ConfigService {
// 創(chuàng)建一個ConfigService單例
private static final ConfigService s_instance = new ConfigService();
// 獲取 m_configManager 與 m_configRegistry單例
private volatile ConfigManager m_configManager;
private volatile ConfigRegistry m_configRegistry;
// 獲取nanespae對應(yīng)的config實例(關(guān)注這里)
public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}
(2)具體函數(shù):DefaultConfigManager#getConfig
public class DefaultConfigManager implements ConfigManager {
@Override
public Config getConfig(String namespace) {
Config config = m_configs.get(namespace);
// 每個namespace創(chuàng)建一個Config對象
if (config == null) {
synchronized (this) {
config = m_configs.get(namespace);
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
//config對象中有,拉取遠程和本地倉庫(Repository)
config = factory.create(namespace);
m_configs.put(namespace, config);
}
}
}
return config;
}
}
(3)具體函數(shù):DefaultConfigFactory#create
創(chuàng)建順序是:1)創(chuàng)建遠端存儲倉庫,從Config Service中拉取配置數(shù)據(jù);2)創(chuàng)建本地存儲倉庫,將遠端拉取到的配置文件存儲到本地文件中;3)實例化Config,以供后續(xù)獲取配置信息使用。
public class DefaultConfigFactory implements ConfigFactory {
...
@Override
public Config create(String namespace) {
ConfigFileFormat format = determineFileFormat(namespace);
if (ConfigFileFormat.isPropertiesCompatible(format)) {
return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
}
// (關(guān)注這里)。調(diào)用createLocalConfigRepository函數(shù),創(chuàng)建LocalConfigRepository,建立本地存儲倉庫
return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
}
LocalFileConfigRepository createLocalConfigRepository(String namespace) {
if (m_configUtil.isInLocalMode()) {
logger.warn(
"==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
namespace);
return new LocalFileConfigRepository(namespace);
}
// (關(guān)注這里)。調(diào)用createRemoteConfigRepository函數(shù),創(chuàng)建RemoteConfigRepository,建立遠程存儲倉庫
return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
}
RemoteConfigRepository createRemoteConfigRepository(String namespace) {
return new RemoteConfigRepository(namespace);
}
...
}
4、拉取配置
(1)邏輯描述
配置拉取主要分為三個關(guān)鍵步驟:1)初始化加載配置;2)定期拉取配置;3)通過長輪詢進行刷新。在這其中,長輪詢刷新階段又分為兩個請求:1)配置更新通知(通過長輪詢實現(xiàn));2)詳細配置拉取。通過這個流程,系統(tǒng)能夠?qū)崿F(xiàn)配置的及時更新,確保應(yīng)用程序始終使用最新的配置信息。
(2)時序圖

(3)代碼實現(xiàn)
具體函數(shù):RemoteConfigRepository#RemoteConfigRepository
public class RemoteConfigRepository extends AbstractConfigRepository {
...
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpClient = ApolloInjector.getInstance(HttpClient.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
// 初始化加載配置
this.trySync();
// 周期拉取配置(apollo定時兜底)
this.schedulePeriodicRefresh();
// 長輪詢
this.scheduleLongPollingRefresh();
}
}
a)配置初始化加載(trySync)
具體函數(shù):AbstractConfigRepository#trySync
時序圖:

邏輯描述:namespace初始化加載配置
public abstract class AbstractConfigRepository implements ConfigRepository {
...
// 獲取配置內(nèi)容
protected boolean trySync() {
try {
// (關(guān)注這里)獲取配置
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
...
}
具體函數(shù):RemoteConfigRepository#sync()
public class RemoteConfigRepository extends AbstractConfigRepository {
...
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
// 加載配置(關(guān)注這里)
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
// 通知Repository監(jiān)聽器,配置發(fā)生變化(關(guān)注這里)
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
// 加載配置
private ApolloConfig loadApolloConfig() {
// 限流,避免創(chuàng)建過多連接。同一個namespace會有多種觸發(fā)loadApolloConfig函數(shù)的方式
if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter();
String secret = m_configUtil.getAccessKeySecret();
Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
long onErrorSleepTime = 0; // 0 means no sleep
Throwable exception = null;
// 從meta server中獲取注冊到eureka的config service
List<ServiceDTO> configServices = getConfigServices();
String url = null;
retryLoopLabel:
for (int i = 0; i < maxRetries; i++) {
List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
Collections.shuffle(randomConfigServices);
if (m_longPollServiceDto.get() != null) {
randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
}
for (ServiceDTO configService : randomConfigServices) {
if (onErrorSleepTime > 0) {
logger.warn(
"Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
try {
m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
} catch (InterruptedException e) {
//ignore
}
}
// 拼接請求config service獲取配置的url
url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
dataCenter, m_remoteMessages.get(), m_configCache.get());
logger.debug("Loading config from {}", url);
HttpRequest request = new HttpRequest(url);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
transaction.addData("Url", url);
try {
// 發(fā)送請求
HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
m_configNeedForceRefresh.set(false);
m_loadConfigFailSchedulePolicy.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
// 如果配置沒有變更,config service會返回304狀態(tài)碼
if (response.getStatusCode() == 304) {
logger.debug("Config server responds with 304 HTTP status code.");
// 緩存中拉取歷史配置
return m_configCache.get();
}
ApolloConfig result = response.getBody();
logger.debug("Loaded config for {}: {}", m_namespace, result);
// 如果配置變更,這會直接返回
return result;
} catch (ApolloConfigStatusCodeException ex) {
ApolloConfigStatusCodeException statusCodeException = ex;
//config not found
if (ex.getStatusCode() == 404) {
String message = String.format(
"Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
"please check whether the configs are released in Apollo!",
appId, cluster, m_namespace);
statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
message);
}
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
transaction.setStatus(statusCodeException);
exception = statusCodeException;
if(ex.getStatusCode() == 404) {
break retryLoopLabel;
}
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
exception = ex;
} finally {
transaction.complete();
}
// if force refresh, do normal sleep, if normal config load, do exponential sleep
onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
m_loadConfigFailSchedulePolicy.fail();
}
}
...
}
b)周期配置拉取(schedulePeriodicRefresh)
具體函數(shù):RemoteConfigRepository#schedulePeriodicRefresh
時序圖:
邏輯描述:周期拉取配置(apollo定時兜底)
public class RemoteConfigRepository extends AbstractConfigRepository {
...
private final static ScheduledExecutorService m_executorService;
static {
m_executorService = Executors.newScheduledThreadPool(1,
ApolloThreadFactory.create("RemoteConfigRepository", true));
}
// 定時拉取配置
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
// 固定時間間隔執(zhí)行任務(wù)
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
...
}
c)長輪詢監(jiān)聽與最新配置拉取(scheduleLongPollingRefresh)
具體函數(shù):RemoteConfigRepository#scheduleLongPollingRefresh()
時序圖:
邏輯描述:建立長輪詢,監(jiān)聽配置變更通知通知后加載最新配置
public class RemoteConfigRepository extends AbstractConfigRepository {
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
}
(8)具體函數(shù):RemoteConfigLongPollService#submit()
public class RemoteConfigLongPollService {
private final ExecutorService m_longPollingService;
public RemoteConfigLongPollService() {
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigLongPollService", true));
m_longPollStarted = new AtomicBoolean(false);
m_longPollNamespaces =
Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
m_notifications = Maps.newConcurrentMap();
m_remoteNotificationMessages = Maps.newConcurrentMap();
m_responseType = new TypeToken<List<ApolloConfigNotification>>() {
}.getType();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpClient = ApolloInjector.getInstance(HttpClient.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
}
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
// 如果長輪詢已經(jīng)啟動,就不會再往線程池里添加runnable(通過m_longPollStarted判斷是否啟動),但是會往m_longPollNamespaces中添加需要被通知變更的namespace對應(yīng)的remoteConfigRepository
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}
// 多個namespace,也只有一個長輪詢
private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
final String secret = m_configUtil.getAccessKeySecret();
final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
// 單線程連接池
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
if (longPollingInitialDelayInMills > 0) {
try {
logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
} catch (InterruptedException e) {
//ignore
}
}
doLongPollingRefresh(appId, cluster, dataCenter, secret);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Tracer.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
// 只要不中斷就循環(huán)
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
// limiter令牌桶限流為2qps, 5秒之內(nèi)存在沒有獲取到1個令牌的情況,則休眠5秒
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
String url = null;
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
if (!StringUtils.isBlank(secret)) {
Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
request.setHeaders(headers);
}
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpClient.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
updateRemoteNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
// 此處通知,執(zhí)行notify之后加載數(shù)據(jù)
notify(lastServiceDto, response.getBody());
}
//try to load balance
if (response.getStatusCode() == 304 && random.nextBoolean()) {
lastServiceDto = null;
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
}
}
public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_remoteMessages.set(remoteMessages);
m_executorService.submit(new Runnable() {
@Override
public void run() {
m_configNeedForceRefresh.set(true);
trySync();
}
});
}
}
(9)AbstractConfigRepository#trySync()
public abstract class AbstractConfigRepository implements ConfigRepository {
// 拉配置信息,不是notificationID,而是配置內(nèi)容
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
// 子類RemoteConfigRepository中實現(xiàn)
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
ApolloConfig previous = m_configCache.get();
ApolloConfig current = loadApolloConfig();
//reference equals means HTTP 304
if (previous != current) {
logger.debug("Remote Config refreshed!");
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
if (current != null) {
Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
current.getReleaseKey());
}
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
throw ex;
} finally {
transaction.complete();
}
}
protected void fireRepositoryChange(String namespace, Properties newProperties) {
for (RepositoryChangeListener listener : m_listeners) {
try {
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
}
5、變更通知
(1)邏輯描述
在namespace數(shù)據(jù)發(fā)生變更時,系統(tǒng)將通知所有監(jiān)聽該namespace的監(jiān)聽器。系統(tǒng)會比較新老配置,將差異配置存儲在ConfigChange中,并隨后通知各個監(jiān)聽器。
(2)時序圖
(2)代碼實現(xiàn)
DefaultConfig#onRepositoryChange()
public class DefaultConfig extends AbstractConfig implements RepositoryChangeListener {
...
/**
* Repository通知變更,
* @param namespace the namespace of this repository change
* @param newProperties the properties after change
*/
@Override
public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
if (newProperties.equals(m_configProperties.get())) {
return;
}
ConfigSourceType sourceType = m_configRepository.getSourceType();
Properties newConfigProperties = propertiesFactory.getPropertiesInstance();
newConfigProperties.putAll(newProperties);
// 計算配置變更情況,對新老配置進行比較,將差異配置存儲在ConfigChange中Map的格式為{namspace:{key:value}}
Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties,
sourceType);
//check double checked result
if (actualChanges.isEmpty()) {
return;
}
// 將具體變更通知各監(jiān)聽器
this.fireConfigChange(m_namespace, actualChanges);
Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
}
// 構(gòu)建Method的變更事件ConfigChange參數(shù)
private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties,
ConfigSourceType sourceType) {
List<ConfigChange> configChanges = calcPropertyChanges(m_namespace, m_configProperties.get(), newConfigProperties);
ImmutableMap.Builder<String, ConfigChange> actualChanges =
new ImmutableMap.Builder<>();
/** === Double check since DefaultConfig has multiple config sources ==== **/
//1. use getProperty to update configChanges's old value
for (ConfigChange change : configChanges) {
change.setOldValue(this.getProperty(change.getPropertyName(), change.getOldValue()));
}
//2. update m_configProperties
updateConfig(newConfigProperties, sourceType);
clearConfigCache();
//3. use getProperty to update configChange's new value and calc the final changes
for (ConfigChange change : configChanges) {
change.setNewValue(this.getProperty(change.getPropertyName(), change.getNewValue()));
switch (change.getChangeType()) {
case ADDED:
if (Objects.equals(change.getOldValue(), change.getNewValue())) {
break;
}
if (change.getOldValue() != null) {
change.setChangeType(PropertyChangeType.MODIFIED);
}
actualChanges.put(change.getPropertyName(), change);
break;
case MODIFIED:
if (!Objects.equals(change.getOldValue(), change.getNewValue())) {
actualChanges.put(change.getPropertyName(), change);
}
break;
case DELETED:
if (Objects.equals(change.getOldValue(), change.getNewValue())) {
break;
}
if (change.getNewValue() != null) {
change.setChangeType(PropertyChangeType.MODIFIED);
}
actualChanges.put(change.getPropertyName(), change);
break;
default:
//do nothing
break;
}
}
return actualChanges.build();
}
/**
* 父類中實現(xiàn)
* 配置變更通知,通知監(jiān)聽器
* @param changes map's key is config property's key
*/
protected void fireConfigChange(String namespace, Map<String, ConfigChange> changes) {
final Set<String> changedKeys = changes.keySet();
final List<ConfigChangeListener> listeners = this.findMatchedConfigChangeListeners(changedKeys);
// notify those listeners
for (ConfigChangeListener listener : listeners) {
Set<String> interestedChangedKeys = resolveInterestedChangedKeys(listener, changedKeys);
InterestedConfigChangeEvent interestedConfigChangeEvent = new InterestedConfigChangeEvent(
namespace, changes, interestedChangedKeys);
this.notifyAsync(listener, interestedConfigChangeEvent);
}
}
/**
* 異步通知
* @param listener
* @param changeEvent
*/
private void notifyAsync(final ConfigChangeListener listener, final ConfigChangeEvent changeEvent) {
m_executorService.submit(new Runnable() {
@Override
public void run() {
String listenerName = listener.getClass().getName();
Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
try {
listener.onChange(changeEvent);
transaction.setStatus(Transaction.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
Tracer.logError(ex);
logger.error("Failed to invoke config change listener {}", listenerName, ex);
} finally {
transaction.complete();
}
}
});
}
...
}
6、配置注入
(1)邏輯描述
在配置發(fā)生變更后,系統(tǒng)會通知在Bean初始化時創(chuàng)建的與namespace對應(yīng)的監(jiān)聽器。接著,系統(tǒng)通過反射的方式觸發(fā)相應(yīng)的函數(shù)(使用@ApolloConfigChange注解)。
(2)代碼位置
ConfigChangeListener#onChange()
public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
EnvironmentAware {
...
private void processApolloConfigChangeListener(final Object bean, final Method method) {
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
method);
Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
"Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
method);
ReflectionUtils.makeAccessible(method);
// value 是 namespace
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
// 創(chuàng)建配置變化監(jiān)聽器
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};
}
...
}
四、最后
《碼頭工人的一千零一夜》是一位專注于技術(shù)干貨分享的博主,追隨博主的文章,你將深入了解業(yè)界最新的技術(shù)趨勢,以及在Java開發(fā)和安全領(lǐng)域的實用經(jīng)驗分享。無論你是開發(fā)人員還是對逆向工程感興趣的愛好者,都能在《碼頭工人的一千零一夜》找到有價值的知識和見解。
懂得不多,做得太少。歡迎批評、指正。

浙公網(wǎng)安備 33010602011771號