<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      分布式系統:dubbo的連接機制

      研究這個問題的起因

      起因是一次面試,一次面試某電商網站,前面問到緩存,分布式,業務這些,還相談甚歡。然后面試官突然甩出一句:“了解dubbo嗎?dubbo是長連接還是短連接?”。當時我主要接觸了解學習的還是spring cloud,dubbo作為知名的分布式rpc框架,只是有一定了解,并且連接這一塊并沒有很深入去了解,但是基于對分布式系統的了解,我不假思索的回答了:“長連接啊!”。其實分布式系統接觸多了就知道了,分布式系統為了應對高并發,減少在高并發時的線程上下文切換損失以及重新建立連接的損失,往往都是采用長連接的。所以我當時我是這么想的:“dubbo作為處理小數據高并發分布式RPC框架,如果采用短連接,應該不可能達到那么高的吞吐吧。”。所以果斷回答了長連接。可是沒想到面試官微微一笑,帶著幾分不屑的說道:“短連接”。當時就給我整懵逼了,無法想象短連接如何處理高并發下重復建立連接以及線程上下文切換的問題。導致我回家的地鐵上一直都處在懷疑人生的狀態,回家后立馬各種百度Google(甚至還懷疑查到的結果)。

      dubbo的連接機制

      這里直接上結論了,dubbo默認是使用單一長連接,即消費者與每個服務提供者建立一個單一長連接,即如果有消費者soa-user1,soa-user2,提供者soa-account三臺,則每臺消費者user都會與3臺account建立一個連接,結果是每臺消費者user有3個長連接到分別到3臺提供者,每臺提供者account維持到soa-user1和soa-user2的2個長連接。

      為什么這么做

      dubbo這么設計的原因是,一般情況下因為消費者是在請求鏈路的上游,消費者承擔的連接數以及并發量都是最高的,他需要承擔更多其他的連接請求,而對提供者來說,承擔的連接只來于消費者,所以每臺提供者只需要承接消費者數量的連接就可以了,dubbo面向的就是消費者數量遠大于服務提供者的情況。所以說,現在很多項目使用的都是消費者和提供者不分的情況,這種情況并沒有很好的利用這個機制。

      dubbo同步轉異步

      dubbo的底層是使用netty,netty之前介紹過是非阻塞的,但是dubbo調用我們大多數時候都是使用的同步調用,那么這里是怎么異步轉同步的呢?這里其實延伸下,不只是dubbo,大多數在web場景下,還是同步請求為主,那么netty中要如何將異步轉同步?我這邊描述一下關鍵步驟。

      dubbo的實現

       //DubboInvoker
      protected Result doInvoke(final Invocation invocation) throws Throwable {
            
                //...
                  if (isOneway) {//2.異步沒返回值
                      boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                      currentClient.send(inv, isSent);
                      RpcContext.getContext().setFuture(null);
                      return new RpcResult();
                  } else if (isAsync) {
                      //1.異步有返回值,異步的直接返回帶future的result就完事了
                      ResponseFuture future = currentClient.request(inv, timeout);
                      FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
                      RpcContext.getContext().setFuture(futureAdapter);
                      Result result;
                      if (isAsyncFuture) {
                          result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                      } else {
                          result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
                      }
                      return result;
                  } else {//3.異步變同步,這里是同步的返回,主要阻塞的原因在于.get(),實際上就是HeaderExchangeChannel里返回的DefaultFuture的.get()方法
                      RpcContext.getContext().setFuture(null);
                      return (Result) currentClient.request(inv, timeout)//返回下面的future
                                                  .get();//進入get()方法,是當前線程阻塞。那么當有結果返回時,喚醒這個線程
                  }
              }
      
      //--HeaderExchangeChannel
       public ResponseFuture request(Object request, int timeout) throws RemotingException {
              Request req = new Request();
              req.setVersion(Version.getProtocolVersion());
              req.setTwoWay(true);
              req.setData(request);
              //這里在發送前,構建自定義的future,用來讓調用線程等待,注意這里的future和netty的channelFuture不同。
              DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
              try {
                  channel.send(req);//使用實際的channel,里面封裝了netty的channel,最后是調用到了nettyChannel的send的。
              } catch (RemotingException e) {
                  future.cancel();
                  throw e;
              }
              return future;
          }
      
      //--DefaultFuture--實現阻塞調用線程的邏輯,接收到結果
      //阻塞的邏輯
       public Object get(int timeout) throws RemotingException {
              if (timeout <= 0) {
                  timeout = 1000;
              }
              //done是自身對象的一個可重入鎖成員變量的一個condition,這里的邏輯就是:
              //如果獲取到了鎖,并且條件不滿足,則await線程等到下面receive方法喚醒。
              //其實我想吐槽下,這個condition命名為done,又有一個方法叫isDone,但是isDone又是判斷response!=null的和done沒有任何關系,這個命名不是很科學。
              if (!this.isDone()) {
                  long start = System.currentTimeMillis();
                  this.lock.lock();
      
                  try {
                      while(!this.isDone()) {
                          this.done.await((long)timeout, TimeUnit.MILLISECONDS);
                          if (this.isDone() || System.currentTimeMillis() - start > (long)timeout) {
                              break;
                          }
                      }
                  } catch (InterruptedException var8) {
                      throw new RuntimeException(var8);
                  } finally {
                      this.lock.unlock();
                  }
      
                  if (!this.isDone()) {
                      throw new TimeoutException(this.sent > 0L, this.channel, this.getTimeoutMessage(false));
                  }
              }
      
              return this.returnFromResponse();
          }
      //收到結果時喚醒的邏輯
        public static void received(Channel channel, Response response) {
              try {
                  DefaultFuture future = FUTURES.remove(response.getId());
                  if (future != null) {
                      future.doReceived(response);
                  } else {
                  }
              } finally {
                  CHANNELS.remove(response.getId());
              }
          }
        private void doReceived(Response res) {
              lock.lock();
              try {
                  response = res;//拿到了響應
                  if (done != null) {
                      done.signal();//喚醒線程
                  }
              } finally {
                  lock.unlock();
              }
              if (callback != null) {
                  invokeCallback(callback);
              }
          }
      
      //那么我們知道DefaultFuture被調用received方法時會被喚醒,那么是什么時候被調用的呢?
      //--HeaderExchangeHandler-- netty中處理的流就是handler流,之前有篇文章講到過,這里也是在handler中給處理的,其實上面還有ExchangeHandlerDispatcher這類dispatcher預處理,將返回
      //分給具體的channelHandler處理,但是結果到了這里
      public class HeaderExchangeHandler implements ChannelHandlerDelegate {
              public void received(Channel channel, Object message) throws RemotingException {
                      channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
                      HeaderExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
      
                      try {
                      if (message instanceof Request) {
                              Request request = (Request)message;
                              if (request.isEvent()) {
                              this.handlerEvent(channel, request);
                              } else if (request.isTwoWay()) {
                              Response response = this.handleRequest(exchangeChannel, request);
                              channel.send(response);
                              } else {
                              this.handler.received(exchangeChannel, request.getData());
                              }
                      } else if (message instanceof Response) {
                              //主要是這里
                              handleResponse(channel, (Response)message);
                      } else if (message instanceof String) {
                              if (isClientSide(channel)) {
                              Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                              logger.error(e.getMessage(), e);
                              } else {
                              String echo = this.handler.telnet(channel, (String)message);
                              if (echo != null && echo.length() > 0) {
                                      channel.send(echo);
                              }
                              }
                      } else {
                              this.handler.received(exchangeChannel, message);
                      }
                      } finally {
                      HeaderExchangeChannel.removeChannelIfDisconnected(channel);
                      }
      
          }
          //handleResponse,到這里直接調用靜態方法,回到了上面接受結果那步。
          static void handleResponse(Channel channel, Response response) throws RemotingException {
              if (response != null && !response.isHeartbeat()) {
                  DefaultFuture.received(channel, response);
              }
      
          }
      }
      

      純netty的簡單實現

      純netty的簡單實現,其實也很簡單,在創建handler時,構造時將外部的FutureTask對象構造到hanlder中,外面使用FutureTask對象get方法阻塞,handler中在最后有結果時,將FutureTask的結果set一下,外部就取消了阻塞。

          public SettableTask<String> sendAndSync(FullHttpRequest httpContent){
              //創建一個futureStask
              final SettableTask<String> responseFuture = new SettableTask<>();
              ChannelFutureListener connectionListener = future -> {
                  if (future.isSuccess()) {
                      Channel channel = future.channel();
                      //創建一個listener,在連接后將新的futureTask構造到一個handler中
                      channel.pipeline().addLast(new SpidersResultHandler(responseFuture));
                  } else {
                      responseFuture.setExceptionResult(future.cause());
                  }
              };
              try {
                  Channel channel = channelPool.acquire().syncUninterruptibly().getNow();
                  log.info("channel status:{}",channel.isActive());
                  channel.writeAndFlush(httpContent).addListener(connectionListener);
              } catch (Exception e) {
                  log.error("netty寫入異常!", e);
              }
              return responseFuture;
          }
          //重寫一個可以手動set的futureTask
          public class SettableTask<T> extends FutureTask<T> {
              public SettableTask() {
                  super(() -> {
                      throw new IllegalStateException("Should never be called");
                  });
              }
      
              public void setResultValue(T value) {
                  this.set(value);
      
              }
      
              public void setExceptionResult(Throwable exception) {
                  this.setException(exception);
              }
      
              @Override
              protected void done() {
                  super.done();
              }
          }
          //resultHandler
          public class SpidersResultHandler extends SimpleChannelInboundHandler<String> {
              private SettableTask<String> future;
              public SpidersResultHandler(SettableTask<String> future){
                  this.future=future;
              }
              @Override
              protected void channelRead0(ChannelHandlerContext channelHandlerContext, String httpContent) throws Exception {
                  log.info("result={}",httpContent);
                  future.setResultValue(httpContent);
              }
      
              @Override
              public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
                  log.error("{}異常", this.getClass().getName(), throwable);
      
              }
      }
      

      總結

      dubbo的高性能,也源于他對每個點不斷的優化,最早的時候我記得看到一篇文章寫到:dubbo的異步轉同步機制,是使用的CountDownLatch實現的。現在想來,可能是在亂說。一些框架的原理,還是要自己多思考多翻看,才能掌握。

      posted @ 2020-10-14 16:02  IntoTw  閱讀(4037)  評論(5)    收藏  舉報
      主站蜘蛛池模板: 一边吃奶一边做动态图| 日本免费一区二区三区日本| 亚洲国产午夜精品福利| 综合色一色综合久久网| 亚洲熟妇无码八av在线播放| 高清不卡一区二区三区| 亚洲精品一区二区三区在| 人人澡人人透人人爽| 成人亚洲精品一区二区三区| 一区二区中文字幕av| 欧美视频网站www色| 国产真人无码作爱免费视频app| 国产亚洲AV电影院之毛片| 国产午夜福利视频在线观看| 精品亚洲国产成人性色av| 在线看无码的免费网站| 国产精品入口中文字幕| 久久亚洲精品中文字幕无| 国产成人最新三级在线视频| 蜜桃无码一区二区三区| 四虎成人在线观看免费| 亚洲国产v高清在线观看| 久久国产成人av蜜臀| 久久96热人妻偷产精品| 亚洲人成18在线看久| 亚洲一区二区av免费| 国产一级小视频| 狠狠躁夜夜躁无码中文字幕| 亚洲一区二区三区自拍天堂| 亚洲欧美色综合影院| 亚洲最大激情中文字幕| 国产精品久久久久av福利动漫| 亚洲熟女国产熟女二区三区| 色噜噜亚洲男人的天堂| 国产精品日日摸夜夜添夜夜添无码 | 制服丝袜美腿一区二区| 男女猛烈激情xx00免费视频| 看亚洲黄色不在线网占| 亚洲综合色区另类av| 免费人妻无码不卡中文18禁| 国产成人精品无码播放|