<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【架構(gòu)師系列】Apollo配置中心之Client端(二)

      原創(chuàng)文章,轉(zhuǎn)載請標注。http://www.rzrgm.cn/boycelee/p/17978027

      聲明

      原創(chuàng)文章,轉(zhuǎn)載請標注。http://www.rzrgm.cn/boycelee/p/17978027
      《碼頭工人的一千零一夜》是一位專注于技術(shù)干貨分享的博主,追隨博主的文章,你將深入了解業(yè)界最新的技術(shù)趨勢,以及在Java開發(fā)和安全領(lǐng)域的實用經(jīng)驗分享。無論你是開發(fā)人員還是對逆向工程感興趣的愛好者,都能在《碼頭工人的一千零一夜》找到有價值的知識和見解。

      配置中心系列文章

      《【架構(gòu)師視角系列】風控場景下的配置中心設(shè)計思考》 http://www.rzrgm.cn/boycelee/p/18355942
      《【架構(gòu)師視角系列】Apollo配置中心之架構(gòu)設(shè)計(一)》http://www.rzrgm.cn/boycelee/p/17967590
      《【架構(gòu)師視角系列】Apollo配置中心之Client端(二)》http://www.rzrgm.cn/boycelee/p/17978027
      《【架構(gòu)師視角系列】Apollo配置中心之Server端(ConfigSevice)(三)》http://www.rzrgm.cn/boycelee/p/18005318
      《【架構(gòu)師視角系列】QConfig配置中心系列之架構(gòu)設(shè)計(一)》http://www.rzrgm.cn/boycelee/p/18013653
      《【架構(gòu)師視角系列】QConfig配置中心系列之Client端(二)》http://www.rzrgm.cn/boycelee/p/18033286

      一、客戶端架構(gòu)

      架構(gòu)介紹會從分層、職責、關(guān)系以及運行負責四個維度進行描述。

      1、Config Service職責

      (1)配置管理

      Config Service 是Apollo配置中心的服務(wù)端組件,負責管理應(yīng)用程序的配置信息。它存儲和維護應(yīng)用程序的各種配置項。

      (2)配置發(fā)布

      Config Service 負責將最新的配置發(fā)布給注冊在它上面的Apollo Client。當配置發(fā)生變更時,Config Service 負責通知所有訂閱了相應(yīng)配置的客戶端。

      (3)配置讀取

      Apollo Client 向 Config Service 發(fā)送請求,獲取應(yīng)用程序的配置信息。

      2、Apollo Client 職責

      (1)配置拉取

      Apollo Client 負責向 Config Service 發(fā)送配置拉取請求,獲取三方應(yīng)用程序的配置。

      (2)配置注入

      Apollo Client 將從 Config Service 獲取到的配置注入到三方應(yīng)用程序中。

      (3)配置變更監(jiān)聽

      Apollo Client 可以注冊對配置變更的監(jiān)聽器。當 Config Service 發(fā)布新的配置時,Apollo Client 能夠感知到配置的變更,并觸發(fā)相應(yīng)的操作。

      3、基本交互流程

      (1)應(yīng)用啟動

      Apollo Client 在應(yīng)用啟動時向 Config Service 發(fā)送配置拉取請求,獲取初始的配置。

      (2)配置變更通知

      Config Service 在配置發(fā)生變更時,通知所有注冊的 Apollo Client。

      (3)配置更新

      Apollo Client 接收到配置變更通知后,向 Config Service 發(fā)送請求,獲取最新的配置。

      (4)配置注入

      Apollo Client 將獲取到的最新配置注入到應(yīng)用程序中,以便使用最新的配置信息。

      通過以上交互流程達到應(yīng)用不需要重啟,動態(tài)配置變更的目的。

      二、架構(gòu)思考

      架構(gòu)師視角系列,在分析一款組件的源碼時,需要深入思考其設(shè)計背后的動機。以下是讀者在閱讀本篇文章時應(yīng)思考的問題:

      (1)配置拉取的設(shè)計

      • 思考點: 設(shè)計中采用的配置拉取方式是如何選擇的?背后的動機是什么?可能的考慮包括系統(tǒng)性能、可維護性和安全性。

      (2)配置的注入方式

      • 思考點: 配置是如何被注入到組件中的?這種注入方式有何優(yōu)勢?設(shè)計選擇的原因可能涉及松耦合、動態(tài)變化和代碼可維護性等方面。

      (3)配置變更的通知機制

      • 思考點: 配置變更是如何通知其他組件的?為什么選擇當前的通知機制?可能的考慮包括實時性、效率以及系統(tǒng)整體的架構(gòu)要求。

      (4)為什么配置拉取拆分為兩個請求?

      • 思考點: 配置拉取為何拆分為兩個獨立的請求?這個設(shè)計決策的目的是什么?可能涉及到性能優(yōu)化、可伸縮性以及減輕服務(wù)器負擔的考慮。

      (5)長輪詢的概念

      • 思考點: 什么是長輪詢?為何在配置方案中選擇使用它?長輪詢的優(yōu)勢在哪里?可能涉及到減少輪詢頻率、降低網(wǎng)絡(luò)開銷以及更及時的配置變更通知。

      (6)為什么需要做本地文件緩存?

      • 思考點: 為什么在組件中引入了本地文件緩存的機制?這樣的設(shè)計有哪些優(yōu)點?可能牽涉到性能優(yōu)化、離線支持以及用戶體驗的方面。

      在深入研究源碼時,理解這些設(shè)計決策背后的原因,有助于更全面地理解系統(tǒng)架構(gòu),并為自己的設(shè)計提供有價值的啟示。

      三、源碼剖析

      1、初始化

      (1)邏輯描述

      通過實現(xiàn)Spring框架提供的BeanPostProcessor接口,并完成postProcessBeforeInitialization函數(shù)的實現(xiàn),我們能夠在Bean初始化之前執(zhí)行自定義的操作。BeanPostProcessor是Spring框架提供的一個擴展點,允許我們在Bean初始化前后插入自定義邏輯。在postProcessBeforeInitialization函數(shù)中,我們有機會遍歷Bean的成員變量和函數(shù),實現(xiàn)在初始化之前對它們進行定制化處理的需求。

      (2)時序圖

      (3)代碼位置

      ApolloProcessor#postProcessBeforeInitialization

      為了講解更加順暢,會沿著Method上的注解@ApolloConfigChangeListener實現(xiàn)邏輯進行講解。

      public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {
      
          @Override
          public Object postProcessBeforeInitialization(Object bean, String beanName)
          throws BeansException {
              Class clazz = bean.getClass();
              // 遍歷Bean中的成員變量
              for (Field field : findAllField(clazz)) {
                  processField(bean, beanName, field);
              }
              // 遍歷Bean中的所有函數(shù)(根據(jù)這條邏輯進行講解)
              for (Method method : findAllMethod(clazz)) {
                  processMethod(bean, beanName, method);
              }
              return bean;
          }
          ...
      }
      

      2、查找注解

      (1)邏輯描述

      創(chuàng)建配置變化的監(jiān)聽器,并創(chuàng)建namespace對應(yīng)的config實例,將監(jiān)聽器注冊到config實例中。當發(fā)生配置變更時,會調(diào)用監(jiān)聽器的onChange函數(shù),并利用反射機制通知對應(yīng)的函數(shù)(使用@ApolloConfigChange)。

      (2)時序圖

      (3)代碼位置

      ApolloAnnotationProcessor#processMethod

      public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
          EnvironmentAware {
        ...
        @Override
        protected void processMethod(final Object bean, String beanName, final Method method) {
          // 處理函數(shù)上的注解(@ApolloConfigChange)(關(guān)注這里)
          this.processApolloConfigChangeListener(bean, method);
          this.processApolloJsonValue(bean, beanName, method);
        }
      
        private void processApolloConfigChangeListener(final Object bean, final Method method) {
          ApolloConfigChangeListener annotation = AnnotationUtils
              .findAnnotation(method, ApolloConfigChangeListener.class);
          if (annotation == null) {
            return;
          }
          Class<?>[] parameterTypes = method.getParameterTypes();
          Preconditions.checkArgument(parameterTypes.length == 1,
              "Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
              method);
          Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
              "Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
              method);
      
          ReflectionUtils.makeAccessible(method);
          String[] namespaces = annotation.value();
          String[] annotatedInterestedKeys = annotation.interestedKeys();
          String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
          // 創(chuàng)建配置變化監(jiān)聽器。當配置發(fā)生變化時,會調(diào)用onChange函數(shù)并使用反射觸發(fā)標識@ApolloConfigChange的Method
          ConfigChangeListener configChangeListener = new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
              ReflectionUtils.invokeMethod(method, bean, changeEvent);
            }
          };
      
          Set<String> interestedKeys =
              annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
          Set<String> interestedKeyPrefixes =
              annotatedInterestedKeyPrefixes.length > 0 ? Sets.newHashSet(annotatedInterestedKeyPrefixes)
                  : null;
      
          // 遍歷namespace
          for (String namespace : namespaces) {
            final String resolvedNamespace = this.environment.resolveRequiredPlaceholders(namespace);
            // 創(chuàng)建(獲取)Config實例(關(guān)注這里)
            Config config = ConfigService.getConfig(resolvedNamespace);
      
            // 注冊監(jiān)聽器
            if (interestedKeys == null && interestedKeyPrefixes == null) {
              // 將創(chuàng)建的監(jiān)聽器注冊到namespace對應(yīng)的config實例中(關(guān)注這里)
              config.addChangeListener(configChangeListener);
            } else {
              config.addChangeListener(configChangeListener, interestedKeys, interestedKeyPrefixes);
            }
          }
        }
        ...
      }
      

      3、建立連接

      (1)邏輯描述

      為注解(@ApolloConfigChange)綁定的namespace創(chuàng)建Config實例,Config實例中會會為namespace創(chuàng)建本地配置倉庫(createLocalConfigRepository處理本地配置存儲)和遠程配置倉庫(createRemoteConfigRepository處理遠程ConfigService配置拉取)。

      (2)時序圖

      (3)具體函數(shù)

      ConfigService#getConfig

      public class ConfigService {
      
        // 創(chuàng)建一個ConfigService單例
        private static final ConfigService s_instance = new ConfigService();
      
        // 獲取 m_configManager 與 m_configRegistry單例
        private volatile ConfigManager m_configManager;
        private volatile ConfigRegistry m_configRegistry;
          
        // 獲取nanespae對應(yīng)的config實例(關(guān)注這里)
        public static Config getConfig(String namespace) {
          return s_instance.getManager().getConfig(namespace);
        }
      

      (2)具體函數(shù):DefaultConfigManager#getConfig

      public class DefaultConfigManager implements ConfigManager {
      
        @Override
        public Config getConfig(String namespace) {
          Config config = m_configs.get(namespace);
          // 每個namespace創(chuàng)建一個Config對象
          if (config == null) {
            synchronized (this) {
              config = m_configs.get(namespace);
              if (config == null) {
                ConfigFactory factory = m_factoryManager.getFactory(namespace);
                //config對象中有,拉取遠程和本地倉庫(Repository)
                config = factory.create(namespace);
                m_configs.put(namespace, config);
              }
            }
          }
          return config;
        }
      }
      

      (3)具體函數(shù):DefaultConfigFactory#create

      創(chuàng)建順序是:1)創(chuàng)建遠端存儲倉庫,從Config Service中拉取配置數(shù)據(jù);2)創(chuàng)建本地存儲倉庫,將遠端拉取到的配置文件存儲到本地文件中;3)實例化Config,以供后續(xù)獲取配置信息使用。

      public class DefaultConfigFactory implements ConfigFactory {
        ...
        @Override
        public Config create(String namespace) {
          ConfigFileFormat format = determineFileFormat(namespace);
          if (ConfigFileFormat.isPropertiesCompatible(format)) {
            return this.createRepositoryConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));
          }
          // (關(guān)注這里)。調(diào)用createLocalConfigRepository函數(shù),創(chuàng)建LocalConfigRepository,建立本地存儲倉庫
          return this.createRepositoryConfig(namespace, createLocalConfigRepository(namespace));
        }
      
        LocalFileConfigRepository createLocalConfigRepository(String namespace) {
          if (m_configUtil.isInLocalMode()) {
            logger.warn(
                "==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",
                namespace);
            return new LocalFileConfigRepository(namespace);
          }
          // (關(guān)注這里)。調(diào)用createRemoteConfigRepository函數(shù),創(chuàng)建RemoteConfigRepository,建立遠程存儲倉庫
          return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
        }
          
        RemoteConfigRepository createRemoteConfigRepository(String namespace) {
          return new RemoteConfigRepository(namespace);
        }
        ...
      }
      

      4、拉取配置

      (1)邏輯描述

      配置拉取主要分為三個關(guān)鍵步驟:1)初始化加載配置;2)定期拉取配置;3)通過長輪詢進行刷新。在這其中,長輪詢刷新階段又分為兩個請求:1)配置更新通知(通過長輪詢實現(xiàn));2)詳細配置拉取。通過這個流程,系統(tǒng)能夠?qū)崿F(xiàn)配置的及時更新,確保應(yīng)用程序始終使用最新的配置信息。

      (2)時序圖

      (3)代碼實現(xiàn)

      具體函數(shù):RemoteConfigRepository#RemoteConfigRepository

      public class RemoteConfigRepository extends AbstractConfigRepository {
        ...
        public RemoteConfigRepository(String namespace) {
          m_namespace = namespace;
          m_configCache = new AtomicReference<>();
          m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
          m_httpClient = ApolloInjector.getInstance(HttpClient.class);
          m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
          remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
          m_longPollServiceDto = new AtomicReference<>();
          m_remoteMessages = new AtomicReference<>();
          m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
          m_configNeedForceRefresh = new AtomicBoolean(true);
          m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
              m_configUtil.getOnErrorRetryInterval() * 8);
          // 初始化加載配置
          this.trySync();
          // 周期拉取配置(apollo定時兜底)
          this.schedulePeriodicRefresh();
          // 長輪詢
          this.scheduleLongPollingRefresh();
        }
          
      }
      
      a)配置初始化加載(trySync)

      具體函數(shù):AbstractConfigRepository#trySync

      時序圖:

      邏輯描述:namespace初始化加載配置

      public abstract class AbstractConfigRepository implements ConfigRepository {  
        ...
        // 獲取配置內(nèi)容
        protected boolean trySync() {
          try {
            // (關(guān)注這里)獲取配置
            sync();
            return true;
          } catch (Throwable ex) {
            Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
            logger
                .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
                    .getDetailMessage(ex));
          }
          return false;
        }
        ...
      }
      

      具體函數(shù):RemoteConfigRepository#sync()

      public class RemoteConfigRepository extends AbstractConfigRepository {
        ...
        @Override
        protected synchronized void sync() {
          Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
      
          try {
            ApolloConfig previous = m_configCache.get();
            // 加載配置(關(guān)注這里)
            ApolloConfig current = loadApolloConfig();
      
            //reference equals means HTTP 304
            if (previous != current) {
              logger.debug("Remote Config refreshed!");
              m_configCache.set(current);
              // 通知Repository監(jiān)聽器,配置發(fā)生變化(關(guān)注這里)
              this.fireRepositoryChange(m_namespace, this.getConfig());
            }
      
            if (current != null) {
              Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
                  current.getReleaseKey());
            }
      
            transaction.setStatus(Transaction.SUCCESS);
          } catch (Throwable ex) {
            transaction.setStatus(ex);
            throw ex;
          } finally {
            transaction.complete();
          }
        }
      
        // 加載配置
        private ApolloConfig loadApolloConfig() {
          // 限流,避免創(chuàng)建過多連接。同一個namespace會有多種觸發(fā)loadApolloConfig函數(shù)的方式
          if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
            //wait at most 5 seconds
            try {
              TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
            }
          }
          String appId = m_configUtil.getAppId();
          String cluster = m_configUtil.getCluster();
          String dataCenter = m_configUtil.getDataCenter();
          String secret = m_configUtil.getAccessKeySecret();
          Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
          int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
          long onErrorSleepTime = 0; // 0 means no sleep
          Throwable exception = null;
      
          // 從meta server中獲取注冊到eureka的config service
          List<ServiceDTO> configServices = getConfigServices();
          String url = null;
          retryLoopLabel:
          for (int i = 0; i < maxRetries; i++) {
            List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
            Collections.shuffle(randomConfigServices);
            if (m_longPollServiceDto.get() != null) {
              randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
            }
      
            for (ServiceDTO configService : randomConfigServices) {
              if (onErrorSleepTime > 0) {
                logger.warn(
                    "Load config failed, will retry in {} {}. appId: {}, cluster: {}, namespaces: {}",
                    onErrorSleepTime, m_configUtil.getOnErrorRetryIntervalTimeUnit(), appId, cluster, m_namespace);
      
                try {
                  m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(onErrorSleepTime);
                } catch (InterruptedException e) {
                  //ignore
                }
              }
              // 拼接請求config service獲取配置的url
              url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,
                      dataCenter, m_remoteMessages.get(), m_configCache.get());
      
              logger.debug("Loading config from {}", url);
      
              HttpRequest request = new HttpRequest(url);
              if (!StringUtils.isBlank(secret)) {
                Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
                request.setHeaders(headers);
              }
      
              Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
              transaction.addData("Url", url);
              try {
      
                // 發(fā)送請求
                HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
                m_configNeedForceRefresh.set(false);
                m_loadConfigFailSchedulePolicy.success();
      
                transaction.addData("StatusCode", response.getStatusCode());
                transaction.setStatus(Transaction.SUCCESS);
      
                // 如果配置沒有變更,config service會返回304狀態(tài)碼
                if (response.getStatusCode() == 304) {
                  logger.debug("Config server responds with 304 HTTP status code.");
                  // 緩存中拉取歷史配置
                  return m_configCache.get();
                }
      
                ApolloConfig result = response.getBody();
      
                logger.debug("Loaded config for {}: {}", m_namespace, result);
                // 如果配置變更,這會直接返回
                return result;
              } catch (ApolloConfigStatusCodeException ex) {
                ApolloConfigStatusCodeException statusCodeException = ex;
                //config not found
                if (ex.getStatusCode() == 404) {
                  String message = String.format(
                      "Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +
                          "please check whether the configs are released in Apollo!",
                      appId, cluster, m_namespace);
                  statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),
                      message);
                }
                Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
                transaction.setStatus(statusCodeException);
                exception = statusCodeException;
                if(ex.getStatusCode() == 404) {
                  break retryLoopLabel;
                }
              } catch (Throwable ex) {
                Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
                transaction.setStatus(ex);
                exception = ex;
              } finally {
                transaction.complete();
              }
      
              // if force refresh, do normal sleep, if normal config load, do exponential sleep
              onErrorSleepTime = m_configNeedForceRefresh.get() ? m_configUtil.getOnErrorRetryInterval() :
                  m_loadConfigFailSchedulePolicy.fail();
            }
          }
        ...
      }
      
      b)周期配置拉取(schedulePeriodicRefresh)

      具體函數(shù):RemoteConfigRepository#schedulePeriodicRefresh

      時序圖:

      image-20230416170722704

      邏輯描述:周期拉取配置(apollo定時兜底)

      public class RemoteConfigRepository extends AbstractConfigRepository {
        ...
        private final static ScheduledExecutorService m_executorService;
          
        static {
          m_executorService = Executors.newScheduledThreadPool(1,
              ApolloThreadFactory.create("RemoteConfigRepository", true));
        }
      
        // 定時拉取配置
        private void schedulePeriodicRefresh() {
          logger.debug("Schedule periodic refresh with interval: {} {}",
              m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
            // 固定時間間隔執(zhí)行任務(wù)
            m_executorService.scheduleAtFixedRate(
              new Runnable() {
                @Override
                public void run() {
                  Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
                  logger.debug("refresh config for namespace: {}", m_namespace);
                  trySync();
                  Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
                }
              }, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
              m_configUtil.getRefreshIntervalTimeUnit());
        }
        ...
      }
      
      c)長輪詢監(jiān)聽與最新配置拉取(scheduleLongPollingRefresh)

      具體函數(shù):RemoteConfigRepository#scheduleLongPollingRefresh()

      時序圖:

      image-20230416170722704

      邏輯描述:建立長輪詢,監(jiān)聽配置變更通知通知后加載最新配置

      public class RemoteConfigRepository extends AbstractConfigRepository {
      
        private void scheduleLongPollingRefresh() {
          remoteConfigLongPollService.submit(m_namespace, this);
        }
      }
      

      (8)具體函數(shù):RemoteConfigLongPollService#submit()

      public class RemoteConfigLongPollService {
      
        private final ExecutorService m_longPollingService;
      
        public RemoteConfigLongPollService() {
          m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
          m_longPollingStopped = new AtomicBoolean(false);
          m_longPollingService = Executors.newSingleThreadExecutor(
              ApolloThreadFactory.create("RemoteConfigLongPollService", true));
          m_longPollStarted = new AtomicBoolean(false);
          m_longPollNamespaces =
              Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
          m_notifications = Maps.newConcurrentMap();
          m_remoteNotificationMessages = Maps.newConcurrentMap();
          m_responseType = new TypeToken<List<ApolloConfigNotification>>() {
          }.getType();
          m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
          m_httpClient = ApolloInjector.getInstance(HttpClient.class);
          m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
          m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
        }
      
        public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
          // 如果長輪詢已經(jīng)啟動,就不會再往線程池里添加runnable(通過m_longPollStarted判斷是否啟動),但是會往m_longPollNamespaces中添加需要被通知變更的namespace對應(yīng)的remoteConfigRepository
          boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
          m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
          if (!m_longPollStarted.get()) {
            startLongPolling();
          }
          return added;
        }
      
        // 多個namespace,也只有一個長輪詢
        private void startLongPolling() {
          if (!m_longPollStarted.compareAndSet(false, true)) {
            //already started
            return;
          }
          try {
            final String appId = m_configUtil.getAppId();
            final String cluster = m_configUtil.getCluster();
            final String dataCenter = m_configUtil.getDataCenter();
            final String secret = m_configUtil.getAccessKeySecret();
            final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
            // 單線程連接池
            m_longPollingService.submit(new Runnable() {
              @Override
              public void run() {
                if (longPollingInitialDelayInMills > 0) {
                  try {
                    logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
                    TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
                  } catch (InterruptedException e) {
                    //ignore
                  }
                }
                doLongPollingRefresh(appId, cluster, dataCenter, secret);
              }
            });
          } catch (Throwable ex) {
            m_longPollStarted.set(false);
            ApolloConfigException exception =
                new ApolloConfigException("Schedule long polling refresh failed", ex);
            Tracer.logError(exception);
            logger.warn(ExceptionUtil.getDetailMessage(exception));
          }
        }
      
        private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
          final Random random = new Random();
          ServiceDTO lastServiceDto = null;
          // 只要不中斷就循環(huán)
          while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
            // limiter令牌桶限流為2qps, 5秒之內(nèi)存在沒有獲取到1個令牌的情況,則休眠5秒
            if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
              //wait at most 5 seconds
              try {
                TimeUnit.SECONDS.sleep(5);
              } catch (InterruptedException e) {
              }
            }
            Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
            String url = null;
            try {
              if (lastServiceDto == null) {
                List<ServiceDTO> configServices = getConfigServices();
                lastServiceDto = configServices.get(random.nextInt(configServices.size()));
              }
      
              url =
                  assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
                      m_notifications);
      
              logger.debug("Long polling from {}", url);
      
              HttpRequest request = new HttpRequest(url);
              request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
              if (!StringUtils.isBlank(secret)) {
                Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
                request.setHeaders(headers);
              }
      
              transaction.addData("Url", url);
      
              final HttpResponse<List<ApolloConfigNotification>> response =
                  m_httpClient.doGet(request, m_responseType);
      
              logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
              if (response.getStatusCode() == 200 && response.getBody() != null) {
                updateNotifications(response.getBody());
                updateRemoteNotifications(response.getBody());
                transaction.addData("Result", response.getBody().toString());
                // 此處通知,執(zhí)行notify之后加載數(shù)據(jù)
                notify(lastServiceDto, response.getBody());
              }
      
              //try to load balance
              if (response.getStatusCode() == 304 && random.nextBoolean()) {
                lastServiceDto = null;
              }
      
              m_longPollFailSchedulePolicyInSecond.success();
              transaction.addData("StatusCode", response.getStatusCode());
              transaction.setStatus(Transaction.SUCCESS);
            } catch (Throwable ex) {
              lastServiceDto = null;
              Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
              transaction.setStatus(ex);
              long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
              logger.warn(
                  "Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
                  sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
              try {
                TimeUnit.SECONDS.sleep(sleepTimeInSecond);
              } catch (InterruptedException ie) {
                //ignore
              }
            } finally {
              transaction.complete();
            }
          }
        }
      
        private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
          if (notifications == null || notifications.isEmpty()) {
            return;
          }
          for (ApolloConfigNotification notification : notifications) {
            String namespaceName = notification.getNamespaceName();
            //create a new list to avoid ConcurrentModificationException
            List<RemoteConfigRepository> toBeNotified =
                Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
            ApolloNotificationMessages originalMessages = m_remoteNotificationMessages.get(namespaceName);
            ApolloNotificationMessages remoteMessages = originalMessages == null ? null : originalMessages.clone();
            //since .properties are filtered out by default, so we need to check if there is any listener for it
            toBeNotified.addAll(m_longPollNamespaces
                .get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
            for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
              try {
                remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);
              } catch (Throwable ex) {
                Tracer.logError(ex);
              }
            }
          }
        }
      
        public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto, ApolloNotificationMessages remoteMessages) {
          m_longPollServiceDto.set(longPollNotifiedServiceDto);
          m_remoteMessages.set(remoteMessages);
          m_executorService.submit(new Runnable() {
            @Override
            public void run() {
              m_configNeedForceRefresh.set(true);
              trySync();
            }
          });
        }
        
      }
      

      (9)AbstractConfigRepository#trySync()

      public abstract class AbstractConfigRepository implements ConfigRepository {
          
        // 拉配置信息,不是notificationID,而是配置內(nèi)容
        protected boolean trySync() {
          try {
            sync();
            return true;
          } catch (Throwable ex) {
            Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
            logger
                .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
                    .getDetailMessage(ex));
          }
          return false;
        }
      
        // 子類RemoteConfigRepository中實現(xiàn)
        protected synchronized void sync() {
          Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
      
          try {
            ApolloConfig previous = m_configCache.get();
            ApolloConfig current = loadApolloConfig();
      
            //reference equals means HTTP 304
            if (previous != current) {
              logger.debug("Remote Config refreshed!");
              m_configCache.set(current);
              this.fireRepositoryChange(m_namespace, this.getConfig());
            }
      
            if (current != null) {
              Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),
                  current.getReleaseKey());
            }
      
            transaction.setStatus(Transaction.SUCCESS);
          } catch (Throwable ex) {
            transaction.setStatus(ex);
            throw ex;
          } finally {
            transaction.complete();
          }
        }
      
        protected void fireRepositoryChange(String namespace, Properties newProperties) {
          for (RepositoryChangeListener listener : m_listeners) {
            try {
              listener.onRepositoryChange(namespace, newProperties);
            } catch (Throwable ex) {
              Tracer.logError(ex);
              logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
            }
          }
        }
      }
      

      5、變更通知

      (1)邏輯描述

      在namespace數(shù)據(jù)發(fā)生變更時,系統(tǒng)將通知所有監(jiān)聽該namespace的監(jiān)聽器。系統(tǒng)會比較新老配置,將差異配置存儲在ConfigChange中,并隨后通知各個監(jiān)聽器。

      (2)時序圖

      image-20230416170722704

      (2)代碼實現(xiàn)

      DefaultConfig#onRepositoryChange()

      public class DefaultConfig extends AbstractConfig implements RepositoryChangeListener {
        ...
        /**
         * Repository通知變更,
         * @param namespace the namespace of this repository change
         * @param newProperties the properties after change
         */
        @Override
        public synchronized void onRepositoryChange(String namespace, Properties newProperties) {
          if (newProperties.equals(m_configProperties.get())) {
            return;
          }
      
          ConfigSourceType sourceType = m_configRepository.getSourceType();
          Properties newConfigProperties = propertiesFactory.getPropertiesInstance();
          newConfigProperties.putAll(newProperties);
      
          // 計算配置變更情況,對新老配置進行比較,將差異配置存儲在ConfigChange中Map的格式為{namspace:{key:value}}
          Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties,
              sourceType);
      
          //check double checked result
          if (actualChanges.isEmpty()) {
            return;
          }
      
          // 將具體變更通知各監(jiān)聽器
          this.fireConfigChange(m_namespace, actualChanges);
      
          Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);
        }
      
        // 構(gòu)建Method的變更事件ConfigChange參數(shù)
        private Map<String, ConfigChange> updateAndCalcConfigChanges(Properties newConfigProperties,
            ConfigSourceType sourceType) {
          List<ConfigChange> configChanges = calcPropertyChanges(m_namespace, m_configProperties.get(), newConfigProperties);
      
          ImmutableMap.Builder<String, ConfigChange> actualChanges =
              new ImmutableMap.Builder<>();
      
          /** === Double check since DefaultConfig has multiple config sources ==== **/
      
          //1. use getProperty to update configChanges's old value
          for (ConfigChange change : configChanges) {
            change.setOldValue(this.getProperty(change.getPropertyName(), change.getOldValue()));
          }
      
          //2. update m_configProperties
          updateConfig(newConfigProperties, sourceType);
          clearConfigCache();
      
          //3. use getProperty to update configChange's new value and calc the final changes
          for (ConfigChange change : configChanges) {
            change.setNewValue(this.getProperty(change.getPropertyName(), change.getNewValue()));
            switch (change.getChangeType()) {
              case ADDED:
                if (Objects.equals(change.getOldValue(), change.getNewValue())) {
                  break;
                }
                if (change.getOldValue() != null) {
                  change.setChangeType(PropertyChangeType.MODIFIED);
                }
                actualChanges.put(change.getPropertyName(), change);
                break;
              case MODIFIED:
                if (!Objects.equals(change.getOldValue(), change.getNewValue())) {
                  actualChanges.put(change.getPropertyName(), change);
                }
                break;
              case DELETED:
                if (Objects.equals(change.getOldValue(), change.getNewValue())) {
                  break;
                }
                if (change.getNewValue() != null) {
                  change.setChangeType(PropertyChangeType.MODIFIED);
                }
                actualChanges.put(change.getPropertyName(), change);
                break;
              default:
                //do nothing
                break;
            }
          }
          return actualChanges.build();
        }
      
        /**
         * 父類中實現(xiàn) 
         * 配置變更通知,通知監(jiān)聽器
         * @param changes map's key is config property's key
         */
        protected void fireConfigChange(String namespace, Map<String, ConfigChange> changes) {
          final Set<String> changedKeys = changes.keySet();
          final List<ConfigChangeListener> listeners = this.findMatchedConfigChangeListeners(changedKeys);
      
          // notify those listeners
          for (ConfigChangeListener listener : listeners) {
            Set<String> interestedChangedKeys = resolveInterestedChangedKeys(listener, changedKeys);
            InterestedConfigChangeEvent interestedConfigChangeEvent = new InterestedConfigChangeEvent(
                namespace, changes, interestedChangedKeys);
            this.notifyAsync(listener, interestedConfigChangeEvent);
          }
        }
      
        /**
         * 異步通知
         * @param listener
         * @param changeEvent
         */
        private void notifyAsync(final ConfigChangeListener listener, final ConfigChangeEvent changeEvent) {
          m_executorService.submit(new Runnable() {
            @Override
            public void run() {
              String listenerName = listener.getClass().getName();
              Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);
              try {
                listener.onChange(changeEvent);
                transaction.setStatus(Transaction.SUCCESS);
              } catch (Throwable ex) {
                transaction.setStatus(ex);
                Tracer.logError(ex);
                logger.error("Failed to invoke config change listener {}", listenerName, ex);
              } finally {
                transaction.complete();
              }
            }
          });
        }
        ...
      }
      

      6、配置注入

      (1)邏輯描述

      在配置發(fā)生變更后,系統(tǒng)會通知在Bean初始化時創(chuàng)建的與namespace對應(yīng)的監(jiān)聽器。接著,系統(tǒng)通過反射的方式觸發(fā)相應(yīng)的函數(shù)(使用@ApolloConfigChange注解)。

      (2)代碼位置

      ConfigChangeListener#onChange()

      public class ApolloAnnotationProcessor extends ApolloProcessor implements BeanFactoryAware,
          EnvironmentAware {
        ...
        private void processApolloConfigChangeListener(final Object bean, final Method method) {
          ApolloConfigChangeListener annotation = AnnotationUtils
              .findAnnotation(method, ApolloConfigChangeListener.class);
          if (annotation == null) {
            return;
          }
          Class<?>[] parameterTypes = method.getParameterTypes();
          Preconditions.checkArgument(parameterTypes.length == 1,
              "Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
              method);
          Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
              "Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
              method);
      
          ReflectionUtils.makeAccessible(method);
          // value 是 namespace
          String[] namespaces = annotation.value();
          String[] annotatedInterestedKeys = annotation.interestedKeys();
          String[] annotatedInterestedKeyPrefixes = annotation.interestedKeyPrefixes();
          // 創(chuàng)建配置變化監(jiān)聽器
          ConfigChangeListener configChangeListener = new ConfigChangeListener() {
            @Override
            public void onChange(ConfigChangeEvent changeEvent) {
              ReflectionUtils.invokeMethod(method, bean, changeEvent);
            }
          };
        }
        ...
      }
      

      四、最后

      《碼頭工人的一千零一夜》是一位專注于技術(shù)干貨分享的博主,追隨博主的文章,你將深入了解業(yè)界最新的技術(shù)趨勢,以及在Java開發(fā)和安全領(lǐng)域的實用經(jīng)驗分享。無論你是開發(fā)人員還是對逆向工程感興趣的愛好者,都能在《碼頭工人的一千零一夜》找到有價值的知識和見解。

      懂得不多,做得太少。歡迎批評、指正。

      posted @ 2024-01-21 17:22  碼頭工人  閱讀(834)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲一品道一区二区三区| 国内精品一区二区在线观看| 东北妇女精品bbwbbw| 亚洲av永久无码精品天堂久久| 国产精品三级黄色小视频| 国产在线观看免费人成视频| 午夜片神马影院福利| 国产精品自拍自在线播放| 午夜福利看片在线观看| 精品 日韩 国产 欧美 视频| 国产精品一区二区久久精品 | av天堂亚洲天堂亚洲天堂| 国产一级精品毛片基地| 国产成人午夜福利在线播放| www插插插无码免费视频网站| 成人网站免费观看永久视频下载| 国产亚洲婷婷香蕉久久精品| 日本一区二区精品色超碰| 人妻中文字幕精品一页| 性无码一区二区三区在线观看| 夜夜夜高潮夜夜爽夜夜爰爰| 婷婷五月综合丁香在线| 成人影片一区免费观看| 国产一级区二级区三级区| 国产精品无码a∨麻豆| 欧美浓毛大泬视频| 国产精品国产三级在线专区| 亚洲免费成人av一区| 亚洲性无码av在线| 国产精品伦人一久二久三久| 国产超碰无码最新上传| 午夜羞羞影院男女爽爽爽| 无码av片在线观看免费| 色爱综合激情五月激情| 沧州市| 久热这里有精品免费视频| 美女爽到高潮嗷嗷嗷叫免费网站| 本道久久综合无码中文字幕| 国内自拍视频在线一区| 精品亚洲女同一区二区| 正在播放酒店约少妇高潮|