<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      基于Netty打造RPC服務(wù)器設(shè)計(jì)經(jīng)驗(yàn)談

        自從在園子里,發(fā)表了兩篇如何基于Netty構(gòu)建RPC服務(wù)器的文章:談?wù)勅绾问褂肗etty開發(fā)實(shí)現(xiàn)高性能的RPC服務(wù)器Netty實(shí)現(xiàn)高性能RPC服務(wù)器優(yōu)化篇之消息序列化 之后,收到了很多同行、園友們熱情的反饋和若干個(gè)優(yōu)化建議,于是利用閑暇時(shí)間,打算對原來NettyRPC中不合理的模塊進(jìn)行重構(gòu),并且增強(qiáng)了一些特性,主要的優(yōu)化點(diǎn)如下:

      1. 在原來編碼解碼器:JDK原生的對象序列化方式、kryo、hessian,新增了:protostuff。
      2. 優(yōu)化了NettyRPC服務(wù)端的線程池模型,支持LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue,并擴(kuò)展了多個(gè)線程池任務(wù)處理策略。
      3. RPC服務(wù)啟動、注冊、卸載支持,通過Spring中自定義的nettyrpc標(biāo)簽進(jìn)行統(tǒng)一管理。

        現(xiàn)在重點(diǎn)整理一下重構(gòu)思路、經(jīng)驗(yàn),記錄下來。對應(yīng)源代碼代碼,大家可以查看我的開源github:https://github.com/tang-jie/NettyRPC 項(xiàng)目中的NettyRPC 2.0目錄。

        在最早的NettyRPC消息編解碼插件中,我使用的是:JDK原生的對象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian這三種方式,后續(xù)有園友向我提議,可以引入Protostuff序列化方式。經(jīng)過查閱網(wǎng)絡(luò)的相關(guān)資料,Protostuff基于Google protobuf,但是提供了更多的功能和更簡易的用法。原生的protobuff是需要數(shù)據(jù)結(jié)構(gòu)的預(yù)編譯過程,需要編寫.proto格式的配置文件,再通過protobuf提供的工具翻譯成目標(biāo)語言代碼,而Protostuff則省略了這個(gè)預(yù)編譯的過程。以下是Java主流序列化框架的性能測試結(jié)果(圖片來自網(wǎng)絡(luò)):

        

        可以發(fā)現(xiàn),Protostuff序列化確實(shí)是一種很高效的序列化框架,相比起其他主流的序列化、反序列化框架,其序列化性能可見一斑。如果用它來進(jìn)行RPC消息的編碼、解碼工作,再合適不過了。現(xiàn)在貼出具體的Protostuff序列化編解碼器的實(shí)現(xiàn)代碼。

        首先是定義Schema,這個(gè)是因?yàn)镻rotostuff-Runtime實(shí)現(xiàn)了無需預(yù)編譯對java bean進(jìn)行protobuf序列化/反序列化的能力。我們可以把運(yùn)行時(shí)的Schema緩存起來,提高序列化性能。具體實(shí)現(xiàn)類SchemaCache代碼如下:

      package com.newlandframework.rpc.serialize.protostuff;
      
      import com.dyuproject.protostuff.Schema;
      import com.dyuproject.protostuff.runtime.RuntimeSchema;
      
      import com.google.common.cache.Cache;
      import com.google.common.cache.CacheBuilder;
      
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Callable;
      import java.util.concurrent.TimeUnit;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:SchemaCache.java
       * @description:SchemaCache功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class SchemaCache {
          private static class SchemaCacheHolder {
              private static SchemaCache cache = new SchemaCache();
          }
      
          public static SchemaCache getInstance() {
              return SchemaCacheHolder.cache;
          }
      
          private Cache<Class<?>, Schema<?>> cache = CacheBuilder.newBuilder()
                  .maximumSize(1024).expireAfterWrite(1, TimeUnit.HOURS)
                  .build();
      
          private Schema<?> get(final Class<?> cls, Cache<Class<?>, Schema<?>> cache) {
              try {
                  return cache.get(cls, new Callable<RuntimeSchema<?>>() {
                      public RuntimeSchema<?> call() throws Exception {
                          return RuntimeSchema.createFrom(cls);
                      }
                  });
              } catch (ExecutionException e) {
                  return null;
              }
          }
      
          public Schema<?> get(final Class<?> cls) {
              return get(cls, cache);
          }
      }

        然后定義真正的Protostuff序列化、反序列化類,它實(shí)現(xiàn)了RpcSerialize接口的方法:

      package com.newlandframework.rpc.serialize.protostuff;
      
      import com.dyuproject.protostuff.LinkedBuffer;
      import com.dyuproject.protostuff.ProtostuffIOUtil;
      import com.dyuproject.protostuff.Schema;
      
      import java.io.InputStream;
      import java.io.OutputStream;
      
      import com.newlandframework.rpc.model.MessageRequest;
      import com.newlandframework.rpc.model.MessageResponse;
      import com.newlandframework.rpc.serialize.RpcSerialize;
      
      import org.objenesis.Objenesis;
      import org.objenesis.ObjenesisStd;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ProtostuffSerialize.java
       * @description:ProtostuffSerialize功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class ProtostuffSerialize implements RpcSerialize {
          private static SchemaCache cachedSchema = SchemaCache.getInstance();
          private static Objenesis objenesis = new ObjenesisStd(true);
          private boolean rpcDirect = false;
      
          public boolean isRpcDirect() {
              return rpcDirect;
          }
      
          public void setRpcDirect(boolean rpcDirect) {
              this.rpcDirect = rpcDirect;
          }
      
          private static <T> Schema<T> getSchema(Class<T> cls) {
              return (Schema<T>) cachedSchema.get(cls);
          }
      
          public Object deserialize(InputStream input) {
              try {
                  Class cls = isRpcDirect() ? MessageRequest.class : MessageResponse.class;
                  Object message = (Object) objenesis.newInstance(cls);
                  Schema<Object> schema = getSchema(cls);
                  ProtostuffIOUtil.mergeFrom(input, message, schema);
                  return message;
              } catch (Exception e) {
                  throw new IllegalStateException(e.getMessage(), e);
              }
          }
      
          public void serialize(OutputStream output, Object object) {
              Class cls = (Class) object.getClass();
              LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
              try {
                  Schema schema = getSchema(cls);
                  ProtostuffIOUtil.writeTo(output, object, schema, buffer);
              } catch (Exception e) {
                  throw new IllegalStateException(e.getMessage(), e);
              } finally {
                  buffer.clear();
              }
          }
      }

        同樣為了提高Protostuff序列化/反序列化類的利用效率,我們可以對其進(jìn)行池化處理,而不要頻繁的創(chuàng)建、銷毀對象。現(xiàn)在給出Protostuff池化處理類:ProtostuffSerializeFactory、ProtostuffSerializePool的實(shí)現(xiàn)代碼:

      package com.newlandframework.rpc.serialize.protostuff;
      
      import org.apache.commons.pool2.BasePooledObjectFactory;
      import org.apache.commons.pool2.PooledObject;
      import org.apache.commons.pool2.impl.DefaultPooledObject;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ProtostuffSerializeFactory.java
       * @description:ProtostuffSerializeFactory功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class ProtostuffSerializeFactory extends BasePooledObjectFactory<ProtostuffSerialize> {
      
          public ProtostuffSerialize create() throws Exception {
              return createProtostuff();
          }
      
          public PooledObject<ProtostuffSerialize> wrap(ProtostuffSerialize hessian) {
              return new DefaultPooledObject<ProtostuffSerialize>(hessian);
          }
      
          private ProtostuffSerialize createProtostuff() {
              return new ProtostuffSerialize();
          }
      }
      package com.newlandframework.rpc.serialize.protostuff;
      
      import org.apache.commons.pool2.impl.GenericObjectPool;
      import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ProtostuffSerializePool.java
       * @description:ProtostuffSerializePool功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class ProtostuffSerializePool {
      
          private GenericObjectPool<ProtostuffSerialize> ProtostuffPool;
          volatile private static ProtostuffSerializePool poolFactory = null;
      
          private ProtostuffSerializePool() {
              ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory());
          }
      
          public static ProtostuffSerializePool getProtostuffPoolInstance() {
              if (poolFactory == null) {
                  synchronized (ProtostuffSerializePool.class) {
                      if (poolFactory == null) {
                          poolFactory = new ProtostuffSerializePool();
                      }
                  }
              }
              return poolFactory;
          }
      
          public ProtostuffSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
              ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory());
      
              GenericObjectPoolConfig config = new GenericObjectPoolConfig();
      
              config.setMaxTotal(maxTotal);
              config.setMinIdle(minIdle);
              config.setMaxWaitMillis(maxWaitMillis);
              config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
      
              ProtostuffPool.setConfig(config);
          }
      
          public ProtostuffSerialize borrow() {
              try {
                  return getProtostuffPool().borrowObject();
              } catch (final Exception ex) {
                  ex.printStackTrace();
                  return null;
              }
          }
      
          public void restore(final ProtostuffSerialize object) {
              getProtostuffPool().returnObject(object);
          }
      
          public GenericObjectPool<ProtostuffSerialize> getProtostuffPool() {
              return ProtostuffPool;
          }
      }

        現(xiàn)在有了Protostuff池化處理類,我們就通過它來實(shí)現(xiàn)NettyRPC的編碼、解碼接口,達(dá)到對RPC消息編碼、解碼的目的。首先是Protostuff方式實(shí)現(xiàn)的RPC解碼器代碼:

      package com.newlandframework.rpc.serialize.protostuff;
      
      import com.newlandframework.rpc.serialize.MessageCodecUtil;
      import com.newlandframework.rpc.serialize.MessageDecoder;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ProtostuffDecoder.java
       * @description:ProtostuffDecoder功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class ProtostuffDecoder extends MessageDecoder {
      
          public ProtostuffDecoder(MessageCodecUtil util) {
              super(util);
          }
      }

        然后是Protostuff方式實(shí)現(xiàn)的RPC編碼器代碼:

      package com.newlandframework.rpc.serialize.protostuff;
      
      import com.newlandframework.rpc.serialize.MessageCodecUtil;
      import com.newlandframework.rpc.serialize.MessageEncoder;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ProtostuffEncoder.java
       * @description:ProtostuffEncoder功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class ProtostuffEncoder extends MessageEncoder {
      
          public ProtostuffEncoder(MessageCodecUtil util) {
              super(util);
          }
      }

        最后重構(gòu)出Protostuff方式的RPC編碼、解碼器工具類ProtostuffCodecUtil的實(shí)現(xiàn)代碼:

      package com.newlandframework.rpc.serialize.protostuff;
      
      import com.google.common.io.Closer;
      import com.newlandframework.rpc.serialize.MessageCodecUtil;
      import io.netty.buffer.ByteBuf;
      
      import java.io.ByteArrayInputStream;
      import java.io.ByteArrayOutputStream;
      import java.io.IOException;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ProtostuffCodecUtil.java
       * @description:ProtostuffCodecUtil功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class ProtostuffCodecUtil implements MessageCodecUtil {
          private static Closer closer = Closer.create();
          private ProtostuffSerializePool pool = ProtostuffSerializePool.getProtostuffPoolInstance();
          private boolean rpcDirect = false;
      
          public boolean isRpcDirect() {
              return rpcDirect;
          }
      
          public void setRpcDirect(boolean rpcDirect) {
              this.rpcDirect = rpcDirect;
          }
      
          public void encode(final ByteBuf out, final Object message) throws IOException {
              try {
                  ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                  closer.register(byteArrayOutputStream);
                  ProtostuffSerialize protostuffSerialization = pool.borrow();
                  protostuffSerialization.serialize(byteArrayOutputStream, message);
                  byte[] body = byteArrayOutputStream.toByteArray();
                  int dataLength = body.length;
                  out.writeInt(dataLength);
                  out.writeBytes(body);
                  pool.restore(protostuffSerialization);
              } finally {
                  closer.close();
              }
          }
      
          public Object decode(byte[] body) throws IOException {
              try {
                  ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
                  closer.register(byteArrayInputStream);
                  ProtostuffSerialize protostuffSerialization = pool.borrow();
                  protostuffSerialization.setRpcDirect(rpcDirect);
                  Object obj = protostuffSerialization.deserialize(byteArrayInputStream);
                  pool.restore(protostuffSerialization);
                  return obj;
              } finally {
                  closer.close();
              }
          }
      }

        這樣就使得NettyRPC的消息序列化又多了一種方式,進(jìn)一步增強(qiáng)了其RPC消息網(wǎng)絡(luò)傳輸?shù)哪芰Α?/p>

        其次是優(yōu)化了NettyRPC服務(wù)端的線程模型,使得RPC消息處理線程池對任務(wù)的隊(duì)列容器的支持更加多樣。具體RPC異步處理線程池RpcThreadPool的代碼如下:

      package com.newlandframework.rpc.parallel;
      
      import com.newlandframework.rpc.core.RpcSystemConfig;
      import com.newlandframework.rpc.parallel.policy.AbortPolicy;
      import com.newlandframework.rpc.parallel.policy.BlockingPolicy;
      import com.newlandframework.rpc.parallel.policy.CallerRunsPolicy;
      import com.newlandframework.rpc.parallel.policy.DiscardedPolicy;
      import com.newlandframework.rpc.parallel.policy.RejectedPolicy;
      import com.newlandframework.rpc.parallel.policy.RejectedPolicyType;
      
      import java.util.concurrent.Executor;
      import java.util.concurrent.BlockingQueue;
      import java.util.concurrent.ArrayBlockingQueue;
      import java.util.concurrent.LinkedBlockingQueue;
      import java.util.concurrent.SynchronousQueue;
      import java.util.concurrent.ThreadPoolExecutor;
      import java.util.concurrent.TimeUnit;
      import java.util.concurrent.RejectedExecutionHandler;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:RpcThreadPool.java
       * @description:RpcThreadPool功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class RpcThreadPool {
      
          private static RejectedExecutionHandler createPolicy() {
              RejectedPolicyType rejectedPolicyType = RejectedPolicyType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolRejectedPolicyAttr, "AbortPolicy"));
      
              switch (rejectedPolicyType) {
                  case BLOCKING_POLICY:
                      return new BlockingPolicy();
                  case CALLER_RUNS_POLICY:
                      return new CallerRunsPolicy();
                  case ABORT_POLICY:
                      return new AbortPolicy();
                  case REJECTED_POLICY:
                      return new RejectedPolicy();
                  case DISCARDED_POLICY:
                      return new DiscardedPolicy();
              }
      
              return null;
          }
      
          private static BlockingQueue<Runnable> createBlockingQueue(int queues) {
              BlockingQueueType queueType = BlockingQueueType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolQueueNameAttr, "LinkedBlockingQueue"));
      
              switch (queueType) {
                  case LINKED_BLOCKING_QUEUE:
                      return new LinkedBlockingQueue<Runnable>();
                  case ARRAY_BLOCKING_QUEUE:
                      return new ArrayBlockingQueue<Runnable>(RpcSystemConfig.PARALLEL * queues);
                  case SYNCHRONOUS_QUEUE:
                      return new SynchronousQueue<Runnable>();
              }
      
              return null;
          }
      
          public static Executor getExecutor(int threads, int queues) {
              String name = "RpcThreadPool";
              return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                      createBlockingQueue(queues),
                      new NamedThreadFactory(name, true), createPolicy());
          }
      }

        其中創(chuàng)建線程池方法getExecutor是依賴JDK自帶的線程ThreadPoolExecutor的實(shí)現(xiàn),參考JDK的幫助文檔,可以發(fā)現(xiàn)其中的一種ThreadPoolExecutor構(gòu)造方法重載實(shí)現(xiàn)的版本:

        參數(shù)的具體含義如下:

      • corePoolSize是線程池保留大小。
      • maximumPoolSize是線程池最大線程大小。
      • keepAliveTime是指空閑(idle)線程結(jié)束的超時(shí)時(shí)間。
      • unit用來指定keepAliveTime對應(yīng)的時(shí)間單位,諸如:毫秒、秒、分鐘、小時(shí)、天 等等。
      • workQueue用來存放待處理的任務(wù)隊(duì)列。
      • handler用來具體指定,當(dāng)任務(wù)隊(duì)列填滿、并且線程池最大線程大小也達(dá)到的情形下,線程池的一些應(yīng)對措施策略。

        NettyRPC的線程池支持的任務(wù)隊(duì)列類型主要有以下三種:

      1. LinkedBlockingQueue:采用鏈表方式實(shí)現(xiàn)的無界任務(wù)隊(duì)列,當(dāng)然你可以額外指定其容量,使其有界。
      2. ArrayBlockingQueue:有界的的數(shù)組任務(wù)隊(duì)列。
      3. SynchronousQueue:任務(wù)隊(duì)列的容量固定為1,當(dāng)客戶端提交執(zhí)行任務(wù)過來的時(shí)候,有進(jìn)行阻塞。直到有個(gè)處理線程取走這個(gè)待執(zhí)行的任務(wù),否則會一直阻塞下去。

        NettyRPC的線程池模型,當(dāng)遇到線程池也無法處理的情形的時(shí)候,具體的應(yīng)對措施策略主要有:

      1. AbortPolicy:直接拒絕執(zhí)行,拋出rejectedExecution異常。
      2. DiscardedPolicy:從任務(wù)隊(duì)列的頭部開始直接丟棄一半的隊(duì)列元素,為任務(wù)隊(duì)列“減負(fù)”。
      3. CallerRunsPolicy:不拋棄任務(wù),也不拋出異常,而是調(diào)用者自己來運(yùn)行。這個(gè)是主要是因?yàn)檫^多的并行請求會加劇系統(tǒng)的負(fù)載,線程之間調(diào)度操作系統(tǒng)會頻繁的進(jìn)行上下文切換。當(dāng)遇到線程池滿的情況,與其頻繁的切換、中斷。不如把并行的請求,全部串行化處理,保證盡量少的處理延時(shí),大概是我能想到的Doug Lea的設(shè)計(jì)初衷吧。

        經(jīng)過詳細(xì)的介紹了線程池參數(shù)的具體內(nèi)容之后,下面我就詳細(xì)說一下,NettyRPC的線程池RpcThreadPool的工作流程:

        

      1. NettyRPC的線程池收到RPC數(shù)據(jù)處理請求之后,判斷當(dāng)前活動的線程數(shù)小于線程池設(shè)置的corePoolSize的大小的時(shí)候,會繼續(xù)生成執(zhí)行任務(wù)。
      2. 而當(dāng)達(dá)到corePoolSize的大小的時(shí)候的時(shí)候,這個(gè)時(shí)候,線程池會把待執(zhí)行的任務(wù)放入任務(wù)隊(duì)列之中。
      3. 當(dāng)任務(wù)隊(duì)列也被存滿了之后,如果當(dāng)前活動的線程個(gè)數(shù)還是小于線程池中maximumPoolSize參數(shù)的設(shè)置,線程池還會繼續(xù)分配出任務(wù)線程進(jìn)行救急處理,并且會立馬執(zhí)行。
      4. 如果達(dá)到線程池中maximumPoolSize參數(shù)的設(shè)置的線程上限,線程池分派出來的救火隊(duì)也無法處理的時(shí)候,線程池就會調(diào)用拒絕自保策略RejectedExecutionHandler進(jìn)行處理。

        NettyRPC中默認(rèn)的線程池設(shè)置是把corePoolSize、maximumPoolSize都設(shè)置成16,任務(wù)隊(duì)列設(shè)置成無界鏈表構(gòu)成的阻塞隊(duì)列。在應(yīng)用中要根據(jù)實(shí)際的壓力、吞吐量對NettyRPC的線程池參數(shù)進(jìn)行合理的規(guī)劃。目前NettyRPC暴露了一個(gè)JMX接口,JMX是“Java管理擴(kuò)展的(Java Management Extensions)”的縮寫,是一種類似J2EE的規(guī)范,這樣就可以靈活的擴(kuò)展系統(tǒng)的監(jiān)控、管理功能。實(shí)時(shí)監(jiān)控RPC服務(wù)器線程池任務(wù)的執(zhí)行情況,具體JMX監(jiān)控度量線程池關(guān)鍵指標(biāo)代碼實(shí)現(xiàn)如下:

      package com.newlandframework.rpc.parallel.jmx;
      
      import org.springframework.jmx.export.annotation.ManagedOperation;
      import org.springframework.jmx.export.annotation.ManagedResource;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ThreadPoolStatus.java
       * @description:ThreadPoolStatus功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/13
       */
      
      @ManagedResource
      public class ThreadPoolStatus {
          private int poolSize;
          private int activeCount;
          private int corePoolSize;
          private int maximumPoolSize;
          private int largestPoolSize;
          private long taskCount;
          private long completedTaskCount;
      
          @ManagedOperation
          public int getPoolSize() {
              return poolSize;
          }
      
          @ManagedOperation
          public void setPoolSize(int poolSize) {
              this.poolSize = poolSize;
          }
      
          @ManagedOperation
          public int getActiveCount() {
              return activeCount;
          }
      
          @ManagedOperation
          public void setActiveCount(int activeCount) {
              this.activeCount = activeCount;
          }
      
          @ManagedOperation
          public int getCorePoolSize() {
              return corePoolSize;
          }
      
          @ManagedOperation
          public void setCorePoolSize(int corePoolSize) {
              this.corePoolSize = corePoolSize;
          }
      
          @ManagedOperation
          public int getMaximumPoolSize() {
              return maximumPoolSize;
          }
      
          @ManagedOperation
          public void setMaximumPoolSize(int maximumPoolSize) {
              this.maximumPoolSize = maximumPoolSize;
          }
      
          @ManagedOperation
          public int getLargestPoolSize() {
              return largestPoolSize;
          }
      
          @ManagedOperation
          public void setLargestPoolSize(int largestPoolSize) {
              this.largestPoolSize = largestPoolSize;
          }
      
          @ManagedOperation
          public long getTaskCount() {
              return taskCount;
          }
      
          @ManagedOperation
          public void setTaskCount(long taskCount) {
              this.taskCount = taskCount;
          }
      
          @ManagedOperation
          public long getCompletedTaskCount() {
              return completedTaskCount;
          }
      
          @ManagedOperation
          public void setCompletedTaskCount(long completedTaskCount) {
              this.completedTaskCount = completedTaskCount;
          }
      }

        線程池狀態(tài)監(jiān)控類:ThreadPoolStatus,具體監(jiān)控的指標(biāo)如下:

      • poolSize:池中的當(dāng)前線程數(shù)
      • activeCount:主動執(zhí)行任務(wù)的近似線程數(shù)
      • corePoolSize:核心線程數(shù)
      • maximumPoolSize:允許的最大線程數(shù)
      • largestPoolSize:歷史最大的線程數(shù)
      • taskCount:曾計(jì)劃執(zhí)行的近似任務(wù)總數(shù)
      • completedTaskCount:已完成執(zhí)行的近似任務(wù)總數(shù)

        其中corePoolSize、maximumPoolSize具體含義上文已經(jīng)詳細(xì)講述,這里就不具體展開。

        NettyRPC線程池監(jiān)控JMX接口:ThreadPoolMonitorProvider,JMX通過JNDI-RMI的方式進(jìn)行遠(yuǎn)程連接通訊,具體實(shí)現(xiàn)方式如下:

      package com.newlandframework.rpc.parallel.jmx;
      
      import com.newlandframework.rpc.netty.MessageRecvExecutor;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.ComponentScan;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.context.annotation.DependsOn;
      import org.springframework.context.annotation.EnableMBeanExport;
      import org.springframework.jmx.support.ConnectorServerFactoryBean;
      import org.springframework.jmx.support.MBeanServerConnectionFactoryBean;
      import org.springframework.jmx.support.MBeanServerFactoryBean;
      import org.springframework.remoting.rmi.RmiRegistryFactoryBean;
      import org.apache.commons.lang3.StringUtils;
      
      import javax.management.MBeanServerConnection;
      import javax.management.MalformedObjectNameException;
      import javax.management.ObjectName;
      import javax.management.ReflectionException;
      import javax.management.MBeanException;
      import javax.management.InstanceNotFoundException;
      import java.io.IOException;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:ThreadPoolMonitorProvider.java
       * @description:ThreadPoolMonitorProvider功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/13
       */
      
      @Configuration
      @EnableMBeanExport
      @ComponentScan("com.newlandframework.rpc.parallel.jmx")
      public class ThreadPoolMonitorProvider {
          public final static String DELIMITER = ":";
          public static String url;
          public static String jmxPoolSizeMethod = "setPoolSize";
          public static String jmxActiveCountMethod = "setActiveCount";
          public static String jmxCorePoolSizeMethod = "setCorePoolSize";
          public static String jmxMaximumPoolSizeMethod = "setMaximumPoolSize";
          public static String jmxLargestPoolSizeMethod = "setLargestPoolSize";
          public static String jmxTaskCountMethod = "setTaskCount";
          public static String jmxCompletedTaskCountMethod = "setCompletedTaskCount";
      
          @Bean
          public ThreadPoolStatus threadPoolStatus() {
              return new ThreadPoolStatus();
          }
      
          @Bean
          public MBeanServerFactoryBean mbeanServer() {
              return new MBeanServerFactoryBean();
          }
      
          @Bean
          public RmiRegistryFactoryBean registry() {
              return new RmiRegistryFactoryBean();
          }
      
          @Bean
          @DependsOn("registry")
          public ConnectorServerFactoryBean connectorServer() throws MalformedObjectNameException {
              MessageRecvExecutor ref = MessageRecvExecutor.getInstance();
              String ipAddr = StringUtils.isNotEmpty(ref.getServerAddress()) ? StringUtils.substringBeforeLast(ref.getServerAddress(), DELIMITER) : "localhost";
              url = "service:jmx:rmi://" + ipAddr + "/jndi/rmi://" + ipAddr + ":1099/nettyrpcstatus";
              System.out.println("NettyRPC JMX MonitorURL : [" + url + "]");
              ConnectorServerFactoryBean connectorServerFactoryBean = new ConnectorServerFactoryBean();
              connectorServerFactoryBean.setObjectName("connector:name=rmi");
              connectorServerFactoryBean.setServiceUrl(url);
              return connectorServerFactoryBean;
          }
      
          public static void monitor(ThreadPoolStatus status) throws IOException, MalformedObjectNameException, ReflectionException, MBeanException, InstanceNotFoundException {
              MBeanServerConnectionFactoryBean mBeanServerConnectionFactoryBean = new MBeanServerConnectionFactoryBean();
              mBeanServerConnectionFactoryBean.setServiceUrl(url);
              mBeanServerConnectionFactoryBean.afterPropertiesSet();
              MBeanServerConnection connection = mBeanServerConnectionFactoryBean.getObject();
              ObjectName objectName = new ObjectName("com.newlandframework.rpc.parallel.jmx:name=threadPoolStatus,type=ThreadPoolStatus");
      
              connection.invoke(objectName, jmxPoolSizeMethod, new Object[]{status.getPoolSize()}, new String[]{int.class.getName()});
              connection.invoke(objectName, jmxActiveCountMethod, new Object[]{status.getActiveCount()}, new String[]{int.class.getName()});
              connection.invoke(objectName, jmxCorePoolSizeMethod, new Object[]{status.getCorePoolSize()}, new String[]{int.class.getName()});
              connection.invoke(objectName, jmxMaximumPoolSizeMethod, new Object[]{status.getMaximumPoolSize()}, new String[]{int.class.getName()});
              connection.invoke(objectName, jmxLargestPoolSizeMethod, new Object[]{status.getLargestPoolSize()}, new String[]{int.class.getName()});
              connection.invoke(objectName, jmxTaskCountMethod, new Object[]{status.getTaskCount()}, new String[]{long.class.getName()});
              connection.invoke(objectName, jmxCompletedTaskCountMethod, new Object[]{status.getCompletedTaskCount()}, new String[]{long.class.getName()});
          }
      }

        NettyRPC服務(wù)器啟動成功之后,就可以通過JMX接口進(jìn)行監(jiān)控:可以打開jconsole,然后輸入U(xiǎn)RL:service:jmx:rmi://127.0.0.1/jndi/rmi://127.0.0.1:1099/nettyrpcstatus,用戶名、密碼默認(rèn)為空,點(diǎn)擊連接按鈕。

          

        當(dāng)有客戶端進(jìn)行RPC請求的時(shí)候,通過JMX可以看到如下的監(jiān)控界面:

           

        這個(gè)時(shí)候點(diǎn)擊NettyRPC線程池各個(gè)監(jiān)控指標(biāo)的按鈕,就可以直觀的看到NettyRPC實(shí)際運(yùn)行中,線程池的主要參數(shù)指標(biāo)的實(shí)時(shí)監(jiān)控。比如點(diǎn)擊:getCompletedTaskCount,想查看一下目前已經(jīng)完成的線程任務(wù)總數(shù)指標(biāo)。具體情況如下圖所示:

           

        可以看到,目前已經(jīng)處理了40280筆RPC請求。這樣,我們就可以準(zhǔn)實(shí)時(shí)監(jiān)控NettyRPC線程池參數(shù)設(shè)置、容量規(guī)劃是否合理,以便及時(shí)作出調(diào)整,合理的最大程度利用軟硬件資源。

        最后經(jīng)過重構(gòu)之后,NettyRPC服務(wù)端的Spring配置(NettyRPC/NettyRPC 2.0/main/resources/rpc-invoke-config-server.xml)如下:

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:context="http://www.springframework.org/schema/context"
             xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation="
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
          http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd">
          <!--加載rpc服務(wù)器的ip地址、端口信息-->
          <context:property-placeholder location="classpath:rpc-server.properties"/>
          <!--定義rpc服務(wù)接口-->
          <nettyrpc:service id="demoAddService" interfaceName="com.newlandframework.rpc.services.AddCalculate"
                            ref="calcAddService"></nettyrpc:service>
          <nettyrpc:service id="demoMultiService" interfaceName="com.newlandframework.rpc.services.MultiCalculate"
                            ref="calcMultiService"></nettyrpc:service>
          <!--注冊rpc服務(wù)器,并通過protocol指定序列化協(xié)議-->                  
          <nettyrpc:registry id="rpcRegistry" ipAddr="${rpc.server.addr}" protocol="PROTOSTUFFSERIALIZE"></nettyrpc:registry>
          <!--rpc服務(wù)實(shí)現(xiàn)類聲明-->
          <bean id="calcAddService" class="com.newlandframework.rpc.services.impl.AddCalculateImpl"></bean>
          <bean id="calcMultiService" class="com.newlandframework.rpc.services.impl.MultiCalculateImpl"></bean>
      </beans>

        通過nettyrpc:service標(biāo)簽定義rpc服務(wù)器支持的服務(wù)接口,這里的樣例聲明了當(dāng)前的rpc服務(wù)器提供了加法計(jì)算、乘法計(jì)算兩種服務(wù)給客戶端進(jìn)行調(diào)用。具體通過Spring自定義標(biāo)簽的實(shí)現(xiàn),大家可以自行參考github:NettyRPC/NettyRPC 2.0/main/java/com/newlandframework/rpc/spring(路徑/包)中的實(shí)現(xiàn)代碼,代碼比較多得利用到了Spring框架的特性,希望大家能自行理解和分析。

        然后通過bean標(biāo)簽聲明了具體加法計(jì)算、乘法計(jì)算接口對應(yīng)的實(shí)現(xiàn)類,都統(tǒng)一放在com.newlandframework.rpc.services包之中。

        最后通過nettyrpc:registry注冊了rpc服務(wù)器,ipAddr屬性定義了該rpc服務(wù)器對應(yīng)的ip/端口信息。protocol用來指定,當(dāng)前rpc服務(wù)器支持的消息序列化協(xié)議類型。

        目前已經(jīng)實(shí)現(xiàn)的類型有:JDK原生的對象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian、Protostuff一共四種序列化方式。

        配置完成rpc-invoke-config-server.xml之后,就可以啟動RPC服務(wù)器Main函數(shù)入口:com.newlandframework.rpc.boot.RpcServerStarter。通過Maven打包、部署在(Red Hat Enterprise Linux Server release 5.7 (Tikanga) 64位系統(tǒng),其內(nèi)核版本號:Kernel 2.6.18-274.el5 on an x86_64),可以啟動NettyRPC,如果一切正常的話,在CRT終端上會顯示如下輸出:

        

        這個(gè)時(shí)候再進(jìn)行客戶端的Spring配置(NettyRPC/NettyRPC 2.0/test/resources/rpc-invoke-config-client.xml)。

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:context="http://www.springframework.org/schema/context"
             xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation="
          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
          http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
          http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd">
          <!--加載RPC服務(wù)端對應(yīng)的ip地址、端口信息-->
          <context:property-placeholder location="classpath:rpc-server.properties"/>
          <!--客戶端調(diào)用的RPC服務(wù)信息(加法計(jì)算、乘法計(jì)算服務(wù))-->
          <nettyrpc:reference id="addCalc" interfaceName="com.newlandframework.rpc.services.AddCalculate"
                              protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/>
          <nettyrpc:reference id="multiCalc" interfaceName="com.newlandframework.rpc.services.MultiCalculate"
                              protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/>
      </beans>

        其中加法計(jì)算、乘法計(jì)算的demo代碼如下:

      package com.newlandframework.rpc.services;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:Calculate.java
       * @description:Calculate功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public interface AddCalculate {
          //兩數(shù)相加
          int add(int a, int b);
      }
      package com.newlandframework.rpc.services.impl;
      
      import com.newlandframework.rpc.services.AddCalculate;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:CalculateImpl.java
       * @description:CalculateImpl功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class AddCalculateImpl implements AddCalculate {
          //兩數(shù)相加
          public int add(int a, int b) {
              return a + b;
          }
      }
      package com.newlandframework.rpc.services;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:Calculate.java
       * @description:Calculate功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public interface MultiCalculate {
          //兩數(shù)相乘
          int multi(int a, int b);
      }
      package com.newlandframework.rpc.services.impl;
      
      import com.newlandframework.rpc.services.MultiCalculate;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:CalculateImpl.java
       * @description:CalculateImpl功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class MultiCalculateImpl implements MultiCalculate {
          //兩數(shù)相乘
          public int multi(int a, int b) {
              return a * b;
          }
      }

        值得注意的是客戶端NettyRPC的Spring配置除了指定調(diào)用遠(yuǎn)程RPC的服務(wù)服務(wù)信息之外,還必須配置遠(yuǎn)程RPC服務(wù)端對應(yīng)的ip地址、端口信息、協(xié)議類型這些要素,而且必須和RPC服務(wù)端保持一致,這樣才能正常的進(jìn)行消息的編碼、解碼工作。

        現(xiàn)在我們就模擬1W個(gè)瞬時(shí)并發(fā)的加法、乘法計(jì)算請求,一共2W筆請求操作,調(diào)用遠(yuǎn)程RPC服務(wù)器上的計(jì)算模塊,我們默認(rèn)采用protostuff序列化方式進(jìn)行RPC消息的編碼、解碼。注意,測試代碼的樣例基于1W筆瞬時(shí)并發(fā)計(jì)算請求,不是1W筆循環(huán)進(jìn)行計(jì)算請求,這個(gè)是衡量RPC服務(wù)器吞吐量的一個(gè)重要指標(biāo),因此這里的測試樣例是基于CountDownLatch進(jìn)行編寫的,類java.util.concurrent.CountDownLatch是一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。這里是加法計(jì)算RPC請求、乘法計(jì)算RPC請求,在RPC客戶端分別先啟動1W個(gè)線程,這個(gè)時(shí)候先掛起,然后等待請求信號,瞬時(shí)發(fā)起RPC請求。具體代碼如下:

        首先是加法計(jì)算并發(fā)請求類AddCalcParallelRequestThread:

      package com.newlandframework.test;
      
      import com.newlandframework.rpc.services.AddCalculate;
      
      import java.util.concurrent.CountDownLatch;
      import java.util.logging.Level;
      import java.util.logging.Logger;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:AddCalcParallelRequestThread.java
       * @description:AddCalcParallelRequestThread功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class AddCalcParallelRequestThread implements Runnable {
      
          private CountDownLatch signal;
          private CountDownLatch finish;
          private int taskNumber = 0;
          private AddCalculate calc;
      
          public AddCalcParallelRequestThread(AddCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) {
              this.signal = signal;
              this.finish = finish;
              this.taskNumber = taskNumber;
              this.calc = calc;
          }
      
          public void run() {
              try {
                  //加法計(jì)算線程,先掛起,等待請求信號
                  signal.await();
      
                  //調(diào)用遠(yuǎn)程RPC服務(wù)器的加法計(jì)算方法模塊
                  int add = calc.add(taskNumber, taskNumber);
                  System.out.println("calc add result:[" + add + "]");
      
                  finish.countDown();
              } catch (InterruptedException ex) {
                  Logger.getLogger(AddCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);
              }
          }
      }

        其次是乘法計(jì)算并發(fā)請求類MultiCalcParallelRequestThread:

      package com.newlandframework.test;
      
      import com.newlandframework.rpc.services.MultiCalculate;
      
      import java.util.concurrent.CountDownLatch;
      import java.util.logging.Level;
      import java.util.logging.Logger;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:MultiCalcParallelRequestThread.java
       * @description:MultiCalcParallelRequestThread功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class MultiCalcParallelRequestThread implements Runnable {
      
          private CountDownLatch signal;
          private CountDownLatch finish;
          private int taskNumber = 0;
          private MultiCalculate calc;
      
          public MultiCalcParallelRequestThread(MultiCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) {
              this.signal = signal;
              this.finish = finish;
              this.taskNumber = taskNumber;
              this.calc = calc;
          }
      
          public void run() {
              try {
                  //乘法計(jì)算線程,先掛起,等待請求信號
                  signal.await();
      
                  //調(diào)用遠(yuǎn)程RPC服務(wù)器的乘法計(jì)算方法模塊
                  int multi = calc.multi(taskNumber, taskNumber);
                  System.out.println("calc multi result:[" + multi + "]");
      
                  finish.countDown();
              } catch (InterruptedException ex) {
                  Logger.getLogger(MultiCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);
              }
          }
      }

        現(xiàn)在寫出一個(gè)調(diào)用的測試客戶端RpcParallelTest,測試RPC服務(wù)器的性能,以及是否正確計(jì)算出最終的結(jié)果。測試客戶端RpcParallelTest的具體代碼如下:

      package com.newlandframework.test;
      
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.TimeUnit;
      
      import com.newlandframework.rpc.services.AddCalculate;
      import com.newlandframework.rpc.services.MultiCalculate;
      import org.apache.commons.lang3.time.StopWatch;
      import org.springframework.context.support.ClassPathXmlApplicationContext;
      
      /**
       * @author tangjie<https://github.com/tang-jie>
       * @filename:RpcParallelTest.java
       * @description:RpcParallelTest功能模塊
       * @blogs http://www.rzrgm.cn/jietang/
       * @since 2016/10/7
       */
      public class RpcParallelTest {
      
          public static void parallelAddCalcTask(AddCalculate calc, int parallel) throws InterruptedException {
              //開始計(jì)時(shí)
              StopWatch sw = new StopWatch();
              sw.start();
      
              CountDownLatch signal = new CountDownLatch(1);
              CountDownLatch finish = new CountDownLatch(parallel);
      
              for (int index = 0; index < parallel; index++) {
                  AddCalcParallelRequestThread client = new AddCalcParallelRequestThread(calc, signal, finish, index);
                  new Thread(client).start();
              }
      
              signal.countDown();
              finish.await();
              sw.stop();
      
              String tip = String.format("加法計(jì)算RPC調(diào)用總共耗時(shí): [%s] 毫秒", sw.getTime());
              System.out.println(tip);
          }
      
          public static void parallelMultiCalcTask(MultiCalculate calc, int parallel) throws InterruptedException {
              //開始計(jì)時(shí)
              StopWatch sw = new StopWatch();
              sw.start();
      
              CountDownLatch signal = new CountDownLatch(1);
              CountDownLatch finish = new CountDownLatch(parallel);
      
              for (int index = 0; index < parallel; index++) {
                  MultiCalcParallelRequestThread client = new MultiCalcParallelRequestThread(calc, signal, finish, index);
                  new Thread(client).start();
              }
      
              signal.countDown();
              finish.await();
              sw.stop();
      
              String tip = String.format("乘法計(jì)算RPC調(diào)用總共耗時(shí): [%s] 毫秒", sw.getTime());
              System.out.println(tip);
          }
      
          public static void addTask(AddCalculate calc, int parallel) throws InterruptedException {
              RpcParallelTest.parallelAddCalcTask(calc, parallel);
              TimeUnit.MILLISECONDS.sleep(30);
          }
      
          public static void multiTask(MultiCalculate calc, int parallel) throws InterruptedException {
              RpcParallelTest.parallelMultiCalcTask(calc, parallel);
              TimeUnit.MILLISECONDS.sleep(30);
          }
      
          public static void main(String[] args) throws Exception {
              //并行度10000
              int parallel = 10000;
              //加載Spring配置信息
              ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:rpc-invoke-config-client.xml");
      
              //并發(fā)進(jìn)行RPC加法計(jì)算、乘法計(jì)算請求
              addTask((AddCalculate) context.getBean("addCalc"), parallel);
              multiTask((MultiCalculate) context.getBean("multiCalc"), parallel);
              System.out.printf("[author tangjie] Netty RPC Server 消息協(xié)議序列化并發(fā)驗(yàn)證結(jié)束!\n\n");
      
              context.destroy();
          }
      }

        Netty RPC客戶端運(yùn)行情況,具體截圖如下:下面是開始收到RPC服務(wù)器加法計(jì)算的結(jié)果截圖。

        好了,加法RPC請求計(jì)算完畢,控制臺打印出請求耗時(shí)。

        接著是調(diào)用RPC并行乘法計(jì)算請求,同樣,控制臺上也打印出請求耗時(shí)。

        接著RPC的客戶端運(yùn)行完畢、退出,我們繼續(xù)看下NettyRPC服務(wù)端的運(yùn)行截圖:

        可以發(fā)現(xiàn),NettyRPC的服務(wù)端確實(shí)都收到了來自客戶端發(fā)起的RPC計(jì)算請求,給每個(gè)RPC消息標(biāo)識出了唯一的消息編碼,并進(jìn)行了RPC計(jì)算處理之后,成功的把消息應(yīng)答給了客戶端。

        經(jīng)過一系列的模塊重構(gòu),終于將NettyRPC重新升級了一下,經(jīng)過這次重構(gòu)工作,感覺自己對Netty、Spring、Java線程模型的了解更加深入了,不積跬步無以至千里,千里之行始于足下。學(xué)習(xí)靠的就是這樣一點(diǎn)一滴的重復(fù)積累,才能將自己的能力提升一個(gè)臺階。

        原創(chuàng)文章,加上本人才疏學(xué)淺,文筆有限,本文中有說得不對的地方,望各位同行不吝賜教。文中有忽略的地方希望讀者可以補(bǔ)充,錯(cuò)誤的地方還望斧正。

        最后附上NettyRPC的開源項(xiàng)目地址:https://github.com/tang-jie/NettyRPC 中的NettyRPC 2.0項(xiàng)目。

        感謝大家耐心閱讀NettyRPC系列文章,如果本文對你有幫助,請點(diǎn)下推薦吧!

       

      posted @ 2016-10-20 23:35  Newland  閱讀(14480)  評論(15)    收藏  舉報(bào)
      主站蜘蛛池模板: 国产精品色三级在线观看| 亚洲精品沙发午睡系列| 华人在线亚洲欧美精品| 乱人伦人妻精品一区二区 | 无码天堂亚洲国产AV| 亚洲色成人网站www永久男男| 亚洲av永久一区二区| 欧美人与动欧交视频| 一区二区三区四区激情视频 | 黑人巨大精品oideo| 国产精品区一二三四久久| 十八岁污网站在线观看| 国产精品一区二区小视频| 少妇激情一区二区三区视频小说 | 蜜臀91精品国产高清在线| 妇女性内射冈站hdwww000| 国产综合精品91老熟女| 不卡乱辈伦在线看中文字幕| 99亚洲男女激情在线观看| 成人欧美一区二区三区在线观看| 亚洲熟女精品一区二区| 日本亚洲欧洲免费无线码| 毛片tv网站无套内射tv网站| 激情综合色综合啪啪五月| 中文字幕精品人妻丝袜| 日韩av综合免费在线| 最新精品国偷自产在线美女足| 丁香五月激情图片| 成人中文在线| 亚洲免费观看视频| 成年人尤物视频在线观看| 深州市| 国产激情福利短视频在线| 91精品国产午夜福利| 国产尤物AV尤物在线看| 你懂的亚洲一区二区三区| 国产视频一区二区三区四区视频| 图片区 小说区 区 亚洲五月 | 国产丝袜视频一区二区三区| japan黑人极大黑炮| 国产人妻人伦精品婷婷|