基于Netty打造RPC服務(wù)器設(shè)計(jì)經(jīng)驗(yàn)談
自從在園子里,發(fā)表了兩篇如何基于Netty構(gòu)建RPC服務(wù)器的文章:談?wù)勅绾问褂肗etty開發(fā)實(shí)現(xiàn)高性能的RPC服務(wù)器、Netty實(shí)現(xiàn)高性能RPC服務(wù)器優(yōu)化篇之消息序列化 之后,收到了很多同行、園友們熱情的反饋和若干個(gè)優(yōu)化建議,于是利用閑暇時(shí)間,打算對原來NettyRPC中不合理的模塊進(jìn)行重構(gòu),并且增強(qiáng)了一些特性,主要的優(yōu)化點(diǎn)如下:
- 在原來編碼解碼器:JDK原生的對象序列化方式、kryo、hessian,新增了:protostuff。
- 優(yōu)化了NettyRPC服務(wù)端的線程池模型,支持LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue,并擴(kuò)展了多個(gè)線程池任務(wù)處理策略。
- RPC服務(wù)啟動、注冊、卸載支持,通過Spring中自定義的nettyrpc標(biāo)簽進(jìn)行統(tǒng)一管理。
現(xiàn)在重點(diǎn)整理一下重構(gòu)思路、經(jīng)驗(yàn),記錄下來。對應(yīng)源代碼代碼,大家可以查看我的開源github:https://github.com/tang-jie/NettyRPC 項(xiàng)目中的NettyRPC 2.0目錄。
在最早的NettyRPC消息編解碼插件中,我使用的是:JDK原生的對象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian這三種方式,后續(xù)有園友向我提議,可以引入Protostuff序列化方式。經(jīng)過查閱網(wǎng)絡(luò)的相關(guān)資料,Protostuff基于Google protobuf,但是提供了更多的功能和更簡易的用法。原生的protobuff是需要數(shù)據(jù)結(jié)構(gòu)的預(yù)編譯過程,需要編寫.proto格式的配置文件,再通過protobuf提供的工具翻譯成目標(biāo)語言代碼,而Protostuff則省略了這個(gè)預(yù)編譯的過程。以下是Java主流序列化框架的性能測試結(jié)果(圖片來自網(wǎng)絡(luò)):

可以發(fā)現(xiàn),Protostuff序列化確實(shí)是一種很高效的序列化框架,相比起其他主流的序列化、反序列化框架,其序列化性能可見一斑。如果用它來進(jìn)行RPC消息的編碼、解碼工作,再合適不過了。現(xiàn)在貼出具體的Protostuff序列化編解碼器的實(shí)現(xiàn)代碼。
首先是定義Schema,這個(gè)是因?yàn)镻rotostuff-Runtime實(shí)現(xiàn)了無需預(yù)編譯對java bean進(jìn)行protobuf序列化/反序列化的能力。我們可以把運(yùn)行時(shí)的Schema緩存起來,提高序列化性能。具體實(shí)現(xiàn)類SchemaCache代碼如下:
package com.newlandframework.rpc.serialize.protostuff; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.util.concurrent.ExecutionException; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; /** * @author tangjie<https://github.com/tang-jie> * @filename:SchemaCache.java * @description:SchemaCache功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class SchemaCache { private static class SchemaCacheHolder { private static SchemaCache cache = new SchemaCache(); } public static SchemaCache getInstance() { return SchemaCacheHolder.cache; } private Cache<Class<?>, Schema<?>> cache = CacheBuilder.newBuilder() .maximumSize(1024).expireAfterWrite(1, TimeUnit.HOURS) .build(); private Schema<?> get(final Class<?> cls, Cache<Class<?>, Schema<?>> cache) { try { return cache.get(cls, new Callable<RuntimeSchema<?>>() { public RuntimeSchema<?> call() throws Exception { return RuntimeSchema.createFrom(cls); } }); } catch (ExecutionException e) { return null; } } public Schema<?> get(final Class<?> cls) { return get(cls, cache); } }
然后定義真正的Protostuff序列化、反序列化類,它實(shí)現(xiàn)了RpcSerialize接口的方法:
package com.newlandframework.rpc.serialize.protostuff; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import java.io.InputStream; import java.io.OutputStream; import com.newlandframework.rpc.model.MessageRequest; import com.newlandframework.rpc.model.MessageResponse; import com.newlandframework.rpc.serialize.RpcSerialize; import org.objenesis.Objenesis; import org.objenesis.ObjenesisStd; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffSerialize.java * @description:ProtostuffSerialize功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class ProtostuffSerialize implements RpcSerialize { private static SchemaCache cachedSchema = SchemaCache.getInstance(); private static Objenesis objenesis = new ObjenesisStd(true); private boolean rpcDirect = false; public boolean isRpcDirect() { return rpcDirect; } public void setRpcDirect(boolean rpcDirect) { this.rpcDirect = rpcDirect; } private static <T> Schema<T> getSchema(Class<T> cls) { return (Schema<T>) cachedSchema.get(cls); } public Object deserialize(InputStream input) { try { Class cls = isRpcDirect() ? MessageRequest.class : MessageResponse.class; Object message = (Object) objenesis.newInstance(cls); Schema<Object> schema = getSchema(cls); ProtostuffIOUtil.mergeFrom(input, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } public void serialize(OutputStream output, Object object) { Class cls = (Class) object.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema schema = getSchema(cls); ProtostuffIOUtil.writeTo(output, object, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } }
同樣為了提高Protostuff序列化/反序列化類的利用效率,我們可以對其進(jìn)行池化處理,而不要頻繁的創(chuàng)建、銷毀對象。現(xiàn)在給出Protostuff池化處理類:ProtostuffSerializeFactory、ProtostuffSerializePool的實(shí)現(xiàn)代碼:
package com.newlandframework.rpc.serialize.protostuff; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffSerializeFactory.java * @description:ProtostuffSerializeFactory功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class ProtostuffSerializeFactory extends BasePooledObjectFactory<ProtostuffSerialize> { public ProtostuffSerialize create() throws Exception { return createProtostuff(); } public PooledObject<ProtostuffSerialize> wrap(ProtostuffSerialize hessian) { return new DefaultPooledObject<ProtostuffSerialize>(hessian); } private ProtostuffSerialize createProtostuff() { return new ProtostuffSerialize(); } }
package com.newlandframework.rpc.serialize.protostuff; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffSerializePool.java * @description:ProtostuffSerializePool功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class ProtostuffSerializePool { private GenericObjectPool<ProtostuffSerialize> ProtostuffPool; volatile private static ProtostuffSerializePool poolFactory = null; private ProtostuffSerializePool() { ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory()); } public static ProtostuffSerializePool getProtostuffPoolInstance() { if (poolFactory == null) { synchronized (ProtostuffSerializePool.class) { if (poolFactory == null) { poolFactory = new ProtostuffSerializePool(); } } } return poolFactory; } public ProtostuffSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) { ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory()); GenericObjectPoolConfig config = new GenericObjectPoolConfig(); config.setMaxTotal(maxTotal); config.setMinIdle(minIdle); config.setMaxWaitMillis(maxWaitMillis); config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis); ProtostuffPool.setConfig(config); } public ProtostuffSerialize borrow() { try { return getProtostuffPool().borrowObject(); } catch (final Exception ex) { ex.printStackTrace(); return null; } } public void restore(final ProtostuffSerialize object) { getProtostuffPool().returnObject(object); } public GenericObjectPool<ProtostuffSerialize> getProtostuffPool() { return ProtostuffPool; } }
現(xiàn)在有了Protostuff池化處理類,我們就通過它來實(shí)現(xiàn)NettyRPC的編碼、解碼接口,達(dá)到對RPC消息編碼、解碼的目的。首先是Protostuff方式實(shí)現(xiàn)的RPC解碼器代碼:
package com.newlandframework.rpc.serialize.protostuff; import com.newlandframework.rpc.serialize.MessageCodecUtil; import com.newlandframework.rpc.serialize.MessageDecoder; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffDecoder.java * @description:ProtostuffDecoder功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class ProtostuffDecoder extends MessageDecoder { public ProtostuffDecoder(MessageCodecUtil util) { super(util); } }
然后是Protostuff方式實(shí)現(xiàn)的RPC編碼器代碼:
package com.newlandframework.rpc.serialize.protostuff; import com.newlandframework.rpc.serialize.MessageCodecUtil; import com.newlandframework.rpc.serialize.MessageEncoder; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffEncoder.java * @description:ProtostuffEncoder功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class ProtostuffEncoder extends MessageEncoder { public ProtostuffEncoder(MessageCodecUtil util) { super(util); } }
最后重構(gòu)出Protostuff方式的RPC編碼、解碼器工具類ProtostuffCodecUtil的實(shí)現(xiàn)代碼:
package com.newlandframework.rpc.serialize.protostuff; import com.google.common.io.Closer; import com.newlandframework.rpc.serialize.MessageCodecUtil; import io.netty.buffer.ByteBuf; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; /** * @author tangjie<https://github.com/tang-jie> * @filename:ProtostuffCodecUtil.java * @description:ProtostuffCodecUtil功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class ProtostuffCodecUtil implements MessageCodecUtil { private static Closer closer = Closer.create(); private ProtostuffSerializePool pool = ProtostuffSerializePool.getProtostuffPoolInstance(); private boolean rpcDirect = false; public boolean isRpcDirect() { return rpcDirect; } public void setRpcDirect(boolean rpcDirect) { this.rpcDirect = rpcDirect; } public void encode(final ByteBuf out, final Object message) throws IOException { try { ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); closer.register(byteArrayOutputStream); ProtostuffSerialize protostuffSerialization = pool.borrow(); protostuffSerialization.serialize(byteArrayOutputStream, message); byte[] body = byteArrayOutputStream.toByteArray(); int dataLength = body.length; out.writeInt(dataLength); out.writeBytes(body); pool.restore(protostuffSerialization); } finally { closer.close(); } } public Object decode(byte[] body) throws IOException { try { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body); closer.register(byteArrayInputStream); ProtostuffSerialize protostuffSerialization = pool.borrow(); protostuffSerialization.setRpcDirect(rpcDirect); Object obj = protostuffSerialization.deserialize(byteArrayInputStream); pool.restore(protostuffSerialization); return obj; } finally { closer.close(); } } }
這樣就使得NettyRPC的消息序列化又多了一種方式,進(jìn)一步增強(qiáng)了其RPC消息網(wǎng)絡(luò)傳輸?shù)哪芰Α?/p>
其次是優(yōu)化了NettyRPC服務(wù)端的線程模型,使得RPC消息處理線程池對任務(wù)的隊(duì)列容器的支持更加多樣。具體RPC異步處理線程池RpcThreadPool的代碼如下:
package com.newlandframework.rpc.parallel; import com.newlandframework.rpc.core.RpcSystemConfig; import com.newlandframework.rpc.parallel.policy.AbortPolicy; import com.newlandframework.rpc.parallel.policy.BlockingPolicy; import com.newlandframework.rpc.parallel.policy.CallerRunsPolicy; import com.newlandframework.rpc.parallel.policy.DiscardedPolicy; import com.newlandframework.rpc.parallel.policy.RejectedPolicy; import com.newlandframework.rpc.parallel.policy.RejectedPolicyType; import java.util.concurrent.Executor; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.RejectedExecutionHandler; /** * @author tangjie<https://github.com/tang-jie> * @filename:RpcThreadPool.java * @description:RpcThreadPool功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class RpcThreadPool { private static RejectedExecutionHandler createPolicy() { RejectedPolicyType rejectedPolicyType = RejectedPolicyType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolRejectedPolicyAttr, "AbortPolicy")); switch (rejectedPolicyType) { case BLOCKING_POLICY: return new BlockingPolicy(); case CALLER_RUNS_POLICY: return new CallerRunsPolicy(); case ABORT_POLICY: return new AbortPolicy(); case REJECTED_POLICY: return new RejectedPolicy(); case DISCARDED_POLICY: return new DiscardedPolicy(); } return null; } private static BlockingQueue<Runnable> createBlockingQueue(int queues) { BlockingQueueType queueType = BlockingQueueType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolQueueNameAttr, "LinkedBlockingQueue")); switch (queueType) { case LINKED_BLOCKING_QUEUE: return new LinkedBlockingQueue<Runnable>(); case ARRAY_BLOCKING_QUEUE: return new ArrayBlockingQueue<Runnable>(RpcSystemConfig.PARALLEL * queues); case SYNCHRONOUS_QUEUE: return new SynchronousQueue<Runnable>(); } return null; } public static Executor getExecutor(int threads, int queues) { String name = "RpcThreadPool"; return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, createBlockingQueue(queues), new NamedThreadFactory(name, true), createPolicy()); } }
其中創(chuàng)建線程池方法getExecutor是依賴JDK自帶的線程ThreadPoolExecutor的實(shí)現(xiàn),參考JDK的幫助文檔,可以發(fā)現(xiàn)其中的一種ThreadPoolExecutor構(gòu)造方法重載實(shí)現(xiàn)的版本:

參數(shù)的具體含義如下:
- corePoolSize是線程池保留大小。
- maximumPoolSize是線程池最大線程大小。
- keepAliveTime是指空閑(idle)線程結(jié)束的超時(shí)時(shí)間。
- unit用來指定keepAliveTime對應(yīng)的時(shí)間單位,諸如:毫秒、秒、分鐘、小時(shí)、天 等等。
- workQueue用來存放待處理的任務(wù)隊(duì)列。
- handler用來具體指定,當(dāng)任務(wù)隊(duì)列填滿、并且線程池最大線程大小也達(dá)到的情形下,線程池的一些應(yīng)對措施策略。
NettyRPC的線程池支持的任務(wù)隊(duì)列類型主要有以下三種:
- LinkedBlockingQueue:采用鏈表方式實(shí)現(xiàn)的無界任務(wù)隊(duì)列,當(dāng)然你可以額外指定其容量,使其有界。
- ArrayBlockingQueue:有界的的數(shù)組任務(wù)隊(duì)列。
- SynchronousQueue:任務(wù)隊(duì)列的容量固定為1,當(dāng)客戶端提交執(zhí)行任務(wù)過來的時(shí)候,有進(jìn)行阻塞。直到有個(gè)處理線程取走這個(gè)待執(zhí)行的任務(wù),否則會一直阻塞下去。
NettyRPC的線程池模型,當(dāng)遇到線程池也無法處理的情形的時(shí)候,具體的應(yīng)對措施策略主要有:
- AbortPolicy:直接拒絕執(zhí)行,拋出rejectedExecution異常。
- DiscardedPolicy:從任務(wù)隊(duì)列的頭部開始直接丟棄一半的隊(duì)列元素,為任務(wù)隊(duì)列“減負(fù)”。
- CallerRunsPolicy:不拋棄任務(wù),也不拋出異常,而是調(diào)用者自己來運(yùn)行。這個(gè)是主要是因?yàn)檫^多的并行請求會加劇系統(tǒng)的負(fù)載,線程之間調(diào)度操作系統(tǒng)會頻繁的進(jìn)行上下文切換。當(dāng)遇到線程池滿的情況,與其頻繁的切換、中斷。不如把并行的請求,全部串行化處理,保證盡量少的處理延時(shí),大概是我能想到的Doug Lea的設(shè)計(jì)初衷吧。
經(jīng)過詳細(xì)的介紹了線程池參數(shù)的具體內(nèi)容之后,下面我就詳細(xì)說一下,NettyRPC的線程池RpcThreadPool的工作流程:

- NettyRPC的線程池收到RPC數(shù)據(jù)處理請求之后,判斷當(dāng)前活動的線程數(shù)小于線程池設(shè)置的corePoolSize的大小的時(shí)候,會繼續(xù)生成執(zhí)行任務(wù)。
- 而當(dāng)達(dá)到corePoolSize的大小的時(shí)候的時(shí)候,這個(gè)時(shí)候,線程池會把待執(zhí)行的任務(wù)放入任務(wù)隊(duì)列之中。
- 當(dāng)任務(wù)隊(duì)列也被存滿了之后,如果當(dāng)前活動的線程個(gè)數(shù)還是小于線程池中maximumPoolSize參數(shù)的設(shè)置,線程池還會繼續(xù)分配出任務(wù)線程進(jìn)行救急處理,并且會立馬執(zhí)行。
- 如果達(dá)到線程池中maximumPoolSize參數(shù)的設(shè)置的線程上限,線程池分派出來的救火隊(duì)也無法處理的時(shí)候,線程池就會調(diào)用拒絕自保策略RejectedExecutionHandler進(jìn)行處理。
NettyRPC中默認(rèn)的線程池設(shè)置是把corePoolSize、maximumPoolSize都設(shè)置成16,任務(wù)隊(duì)列設(shè)置成無界鏈表構(gòu)成的阻塞隊(duì)列。在應(yīng)用中要根據(jù)實(shí)際的壓力、吞吐量對NettyRPC的線程池參數(shù)進(jìn)行合理的規(guī)劃。目前NettyRPC暴露了一個(gè)JMX接口,JMX是“Java管理擴(kuò)展的(Java Management Extensions)”的縮寫,是一種類似J2EE的規(guī)范,這樣就可以靈活的擴(kuò)展系統(tǒng)的監(jiān)控、管理功能。實(shí)時(shí)監(jiān)控RPC服務(wù)器線程池任務(wù)的執(zhí)行情況,具體JMX監(jiān)控度量線程池關(guān)鍵指標(biāo)代碼實(shí)現(xiàn)如下:
package com.newlandframework.rpc.parallel.jmx; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; /** * @author tangjie<https://github.com/tang-jie> * @filename:ThreadPoolStatus.java * @description:ThreadPoolStatus功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/13 */ @ManagedResource public class ThreadPoolStatus { private int poolSize; private int activeCount; private int corePoolSize; private int maximumPoolSize; private int largestPoolSize; private long taskCount; private long completedTaskCount; @ManagedOperation public int getPoolSize() { return poolSize; } @ManagedOperation public void setPoolSize(int poolSize) { this.poolSize = poolSize; } @ManagedOperation public int getActiveCount() { return activeCount; } @ManagedOperation public void setActiveCount(int activeCount) { this.activeCount = activeCount; } @ManagedOperation public int getCorePoolSize() { return corePoolSize; } @ManagedOperation public void setCorePoolSize(int corePoolSize) { this.corePoolSize = corePoolSize; } @ManagedOperation public int getMaximumPoolSize() { return maximumPoolSize; } @ManagedOperation public void setMaximumPoolSize(int maximumPoolSize) { this.maximumPoolSize = maximumPoolSize; } @ManagedOperation public int getLargestPoolSize() { return largestPoolSize; } @ManagedOperation public void setLargestPoolSize(int largestPoolSize) { this.largestPoolSize = largestPoolSize; } @ManagedOperation public long getTaskCount() { return taskCount; } @ManagedOperation public void setTaskCount(long taskCount) { this.taskCount = taskCount; } @ManagedOperation public long getCompletedTaskCount() { return completedTaskCount; } @ManagedOperation public void setCompletedTaskCount(long completedTaskCount) { this.completedTaskCount = completedTaskCount; } }
線程池狀態(tài)監(jiān)控類:ThreadPoolStatus,具體監(jiān)控的指標(biāo)如下:
- poolSize:池中的當(dāng)前線程數(shù)
- activeCount:主動執(zhí)行任務(wù)的近似線程數(shù)
- corePoolSize:核心線程數(shù)
- maximumPoolSize:允許的最大線程數(shù)
- largestPoolSize:歷史最大的線程數(shù)
- taskCount:曾計(jì)劃執(zhí)行的近似任務(wù)總數(shù)
- completedTaskCount:已完成執(zhí)行的近似任務(wù)總數(shù)
其中corePoolSize、maximumPoolSize具體含義上文已經(jīng)詳細(xì)講述,這里就不具體展開。
NettyRPC線程池監(jiān)控JMX接口:ThreadPoolMonitorProvider,JMX通過JNDI-RMI的方式進(jìn)行遠(yuǎn)程連接通訊,具體實(shí)現(xiàn)方式如下:
package com.newlandframework.rpc.parallel.jmx; import com.newlandframework.rpc.netty.MessageRecvExecutor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.EnableMBeanExport; import org.springframework.jmx.support.ConnectorServerFactoryBean; import org.springframework.jmx.support.MBeanServerConnectionFactoryBean; import org.springframework.jmx.support.MBeanServerFactoryBean; import org.springframework.remoting.rmi.RmiRegistryFactoryBean; import org.apache.commons.lang3.StringUtils; import javax.management.MBeanServerConnection; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import javax.management.ReflectionException; import javax.management.MBeanException; import javax.management.InstanceNotFoundException; import java.io.IOException; /** * @author tangjie<https://github.com/tang-jie> * @filename:ThreadPoolMonitorProvider.java * @description:ThreadPoolMonitorProvider功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/13 */ @Configuration @EnableMBeanExport @ComponentScan("com.newlandframework.rpc.parallel.jmx") public class ThreadPoolMonitorProvider { public final static String DELIMITER = ":"; public static String url; public static String jmxPoolSizeMethod = "setPoolSize"; public static String jmxActiveCountMethod = "setActiveCount"; public static String jmxCorePoolSizeMethod = "setCorePoolSize"; public static String jmxMaximumPoolSizeMethod = "setMaximumPoolSize"; public static String jmxLargestPoolSizeMethod = "setLargestPoolSize"; public static String jmxTaskCountMethod = "setTaskCount"; public static String jmxCompletedTaskCountMethod = "setCompletedTaskCount"; @Bean public ThreadPoolStatus threadPoolStatus() { return new ThreadPoolStatus(); } @Bean public MBeanServerFactoryBean mbeanServer() { return new MBeanServerFactoryBean(); } @Bean public RmiRegistryFactoryBean registry() { return new RmiRegistryFactoryBean(); } @Bean @DependsOn("registry") public ConnectorServerFactoryBean connectorServer() throws MalformedObjectNameException { MessageRecvExecutor ref = MessageRecvExecutor.getInstance(); String ipAddr = StringUtils.isNotEmpty(ref.getServerAddress()) ? StringUtils.substringBeforeLast(ref.getServerAddress(), DELIMITER) : "localhost"; url = "service:jmx:rmi://" + ipAddr + "/jndi/rmi://" + ipAddr + ":1099/nettyrpcstatus"; System.out.println("NettyRPC JMX MonitorURL : [" + url + "]"); ConnectorServerFactoryBean connectorServerFactoryBean = new ConnectorServerFactoryBean(); connectorServerFactoryBean.setObjectName("connector:name=rmi"); connectorServerFactoryBean.setServiceUrl(url); return connectorServerFactoryBean; } public static void monitor(ThreadPoolStatus status) throws IOException, MalformedObjectNameException, ReflectionException, MBeanException, InstanceNotFoundException { MBeanServerConnectionFactoryBean mBeanServerConnectionFactoryBean = new MBeanServerConnectionFactoryBean(); mBeanServerConnectionFactoryBean.setServiceUrl(url); mBeanServerConnectionFactoryBean.afterPropertiesSet(); MBeanServerConnection connection = mBeanServerConnectionFactoryBean.getObject(); ObjectName objectName = new ObjectName("com.newlandframework.rpc.parallel.jmx:name=threadPoolStatus,type=ThreadPoolStatus"); connection.invoke(objectName, jmxPoolSizeMethod, new Object[]{status.getPoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxActiveCountMethod, new Object[]{status.getActiveCount()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxCorePoolSizeMethod, new Object[]{status.getCorePoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxMaximumPoolSizeMethod, new Object[]{status.getMaximumPoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxLargestPoolSizeMethod, new Object[]{status.getLargestPoolSize()}, new String[]{int.class.getName()}); connection.invoke(objectName, jmxTaskCountMethod, new Object[]{status.getTaskCount()}, new String[]{long.class.getName()}); connection.invoke(objectName, jmxCompletedTaskCountMethod, new Object[]{status.getCompletedTaskCount()}, new String[]{long.class.getName()}); } }
NettyRPC服務(wù)器啟動成功之后,就可以通過JMX接口進(jìn)行監(jiān)控:可以打開jconsole,然后輸入U(xiǎn)RL:service:jmx:rmi://127.0.0.1/jndi/rmi://127.0.0.1:1099/nettyrpcstatus,用戶名、密碼默認(rèn)為空,點(diǎn)擊連接按鈕。

當(dāng)有客戶端進(jìn)行RPC請求的時(shí)候,通過JMX可以看到如下的監(jiān)控界面:

這個(gè)時(shí)候點(diǎn)擊NettyRPC線程池各個(gè)監(jiān)控指標(biāo)的按鈕,就可以直觀的看到NettyRPC實(shí)際運(yùn)行中,線程池的主要參數(shù)指標(biāo)的實(shí)時(shí)監(jiān)控。比如點(diǎn)擊:getCompletedTaskCount,想查看一下目前已經(jīng)完成的線程任務(wù)總數(shù)指標(biāo)。具體情況如下圖所示:

可以看到,目前已經(jīng)處理了40280筆RPC請求。這樣,我們就可以準(zhǔn)實(shí)時(shí)監(jiān)控NettyRPC線程池參數(shù)設(shè)置、容量規(guī)劃是否合理,以便及時(shí)作出調(diào)整,合理的最大程度利用軟硬件資源。
最后經(jīng)過重構(gòu)之后,NettyRPC服務(wù)端的Spring配置(NettyRPC/NettyRPC 2.0/main/resources/rpc-invoke-config-server.xml)如下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd"> <!--加載rpc服務(wù)器的ip地址、端口信息--> <context:property-placeholder location="classpath:rpc-server.properties"/> <!--定義rpc服務(wù)接口--> <nettyrpc:service id="demoAddService" interfaceName="com.newlandframework.rpc.services.AddCalculate" ref="calcAddService"></nettyrpc:service> <nettyrpc:service id="demoMultiService" interfaceName="com.newlandframework.rpc.services.MultiCalculate" ref="calcMultiService"></nettyrpc:service> <!--注冊rpc服務(wù)器,并通過protocol指定序列化協(xié)議--> <nettyrpc:registry id="rpcRegistry" ipAddr="${rpc.server.addr}" protocol="PROTOSTUFFSERIALIZE"></nettyrpc:registry> <!--rpc服務(wù)實(shí)現(xiàn)類聲明--> <bean id="calcAddService" class="com.newlandframework.rpc.services.impl.AddCalculateImpl"></bean> <bean id="calcMultiService" class="com.newlandframework.rpc.services.impl.MultiCalculateImpl"></bean> </beans>
通過nettyrpc:service標(biāo)簽定義rpc服務(wù)器支持的服務(wù)接口,這里的樣例聲明了當(dāng)前的rpc服務(wù)器提供了加法計(jì)算、乘法計(jì)算兩種服務(wù)給客戶端進(jìn)行調(diào)用。具體通過Spring自定義標(biāo)簽的實(shí)現(xiàn),大家可以自行參考github:NettyRPC/NettyRPC 2.0/main/java/com/newlandframework/rpc/spring(路徑/包)中的實(shí)現(xiàn)代碼,代碼比較多得利用到了Spring框架的特性,希望大家能自行理解和分析。
然后通過bean標(biāo)簽聲明了具體加法計(jì)算、乘法計(jì)算接口對應(yīng)的實(shí)現(xiàn)類,都統(tǒng)一放在com.newlandframework.rpc.services包之中。
最后通過nettyrpc:registry注冊了rpc服務(wù)器,ipAddr屬性定義了該rpc服務(wù)器對應(yīng)的ip/端口信息。protocol用來指定,當(dāng)前rpc服務(wù)器支持的消息序列化協(xié)議類型。
目前已經(jīng)實(shí)現(xiàn)的類型有:JDK原生的對象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian、Protostuff一共四種序列化方式。
配置完成rpc-invoke-config-server.xml之后,就可以啟動RPC服務(wù)器Main函數(shù)入口:com.newlandframework.rpc.boot.RpcServerStarter。通過Maven打包、部署在(Red Hat Enterprise Linux Server release 5.7 (Tikanga) 64位系統(tǒng),其內(nèi)核版本號:Kernel 2.6.18-274.el5 on an x86_64),可以啟動NettyRPC,如果一切正常的話,在CRT終端上會顯示如下輸出:

這個(gè)時(shí)候再進(jìn)行客戶端的Spring配置(NettyRPC/NettyRPC 2.0/test/resources/rpc-invoke-config-client.xml)。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd"> <!--加載RPC服務(wù)端對應(yīng)的ip地址、端口信息--> <context:property-placeholder location="classpath:rpc-server.properties"/> <!--客戶端調(diào)用的RPC服務(wù)信息(加法計(jì)算、乘法計(jì)算服務(wù))--> <nettyrpc:reference id="addCalc" interfaceName="com.newlandframework.rpc.services.AddCalculate" protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/> <nettyrpc:reference id="multiCalc" interfaceName="com.newlandframework.rpc.services.MultiCalculate" protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/> </beans>
其中加法計(jì)算、乘法計(jì)算的demo代碼如下:
package com.newlandframework.rpc.services; /** * @author tangjie<https://github.com/tang-jie> * @filename:Calculate.java * @description:Calculate功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public interface AddCalculate { //兩數(shù)相加 int add(int a, int b); }
package com.newlandframework.rpc.services.impl; import com.newlandframework.rpc.services.AddCalculate; /** * @author tangjie<https://github.com/tang-jie> * @filename:CalculateImpl.java * @description:CalculateImpl功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class AddCalculateImpl implements AddCalculate { //兩數(shù)相加 public int add(int a, int b) { return a + b; } }
package com.newlandframework.rpc.services; /** * @author tangjie<https://github.com/tang-jie> * @filename:Calculate.java * @description:Calculate功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public interface MultiCalculate { //兩數(shù)相乘 int multi(int a, int b); }
package com.newlandframework.rpc.services.impl; import com.newlandframework.rpc.services.MultiCalculate; /** * @author tangjie<https://github.com/tang-jie> * @filename:CalculateImpl.java * @description:CalculateImpl功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class MultiCalculateImpl implements MultiCalculate { //兩數(shù)相乘 public int multi(int a, int b) { return a * b; } }
值得注意的是客戶端NettyRPC的Spring配置除了指定調(diào)用遠(yuǎn)程RPC的服務(wù)服務(wù)信息之外,還必須配置遠(yuǎn)程RPC服務(wù)端對應(yīng)的ip地址、端口信息、協(xié)議類型這些要素,而且必須和RPC服務(wù)端保持一致,這樣才能正常的進(jìn)行消息的編碼、解碼工作。
現(xiàn)在我們就模擬1W個(gè)瞬時(shí)并發(fā)的加法、乘法計(jì)算請求,一共2W筆請求操作,調(diào)用遠(yuǎn)程RPC服務(wù)器上的計(jì)算模塊,我們默認(rèn)采用protostuff序列化方式進(jìn)行RPC消息的編碼、解碼。注意,測試代碼的樣例基于1W筆瞬時(shí)并發(fā)計(jì)算請求,不是1W筆循環(huán)進(jìn)行計(jì)算請求,這個(gè)是衡量RPC服務(wù)器吞吐量的一個(gè)重要指標(biāo),因此這里的測試樣例是基于CountDownLatch進(jìn)行編寫的,類java.util.concurrent.CountDownLatch是一個(gè)同步輔助類,在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線程一直等待。這里是加法計(jì)算RPC請求、乘法計(jì)算RPC請求,在RPC客戶端分別先啟動1W個(gè)線程,這個(gè)時(shí)候先掛起,然后等待請求信號,瞬時(shí)發(fā)起RPC請求。具體代碼如下:
首先是加法計(jì)算并發(fā)請求類AddCalcParallelRequestThread:
package com.newlandframework.test; import com.newlandframework.rpc.services.AddCalculate; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; /** * @author tangjie<https://github.com/tang-jie> * @filename:AddCalcParallelRequestThread.java * @description:AddCalcParallelRequestThread功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class AddCalcParallelRequestThread implements Runnable { private CountDownLatch signal; private CountDownLatch finish; private int taskNumber = 0; private AddCalculate calc; public AddCalcParallelRequestThread(AddCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) { this.signal = signal; this.finish = finish; this.taskNumber = taskNumber; this.calc = calc; } public void run() { try { //加法計(jì)算線程,先掛起,等待請求信號 signal.await(); //調(diào)用遠(yuǎn)程RPC服務(wù)器的加法計(jì)算方法模塊 int add = calc.add(taskNumber, taskNumber); System.out.println("calc add result:[" + add + "]"); finish.countDown(); } catch (InterruptedException ex) { Logger.getLogger(AddCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex); } } }
其次是乘法計(jì)算并發(fā)請求類MultiCalcParallelRequestThread:
package com.newlandframework.test; import com.newlandframework.rpc.services.MultiCalculate; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; /** * @author tangjie<https://github.com/tang-jie> * @filename:MultiCalcParallelRequestThread.java * @description:MultiCalcParallelRequestThread功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class MultiCalcParallelRequestThread implements Runnable { private CountDownLatch signal; private CountDownLatch finish; private int taskNumber = 0; private MultiCalculate calc; public MultiCalcParallelRequestThread(MultiCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) { this.signal = signal; this.finish = finish; this.taskNumber = taskNumber; this.calc = calc; } public void run() { try { //乘法計(jì)算線程,先掛起,等待請求信號 signal.await(); //調(diào)用遠(yuǎn)程RPC服務(wù)器的乘法計(jì)算方法模塊 int multi = calc.multi(taskNumber, taskNumber); System.out.println("calc multi result:[" + multi + "]"); finish.countDown(); } catch (InterruptedException ex) { Logger.getLogger(MultiCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex); } } }
現(xiàn)在寫出一個(gè)調(diào)用的測試客戶端RpcParallelTest,測試RPC服務(wù)器的性能,以及是否正確計(jì)算出最終的結(jié)果。測試客戶端RpcParallelTest的具體代碼如下:
package com.newlandframework.test; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import com.newlandframework.rpc.services.AddCalculate; import com.newlandframework.rpc.services.MultiCalculate; import org.apache.commons.lang3.time.StopWatch; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * @author tangjie<https://github.com/tang-jie> * @filename:RpcParallelTest.java * @description:RpcParallelTest功能模塊 * @blogs http://www.rzrgm.cn/jietang/ * @since 2016/10/7 */ public class RpcParallelTest { public static void parallelAddCalcTask(AddCalculate calc, int parallel) throws InterruptedException { //開始計(jì)時(shí) StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { AddCalcParallelRequestThread client = new AddCalcParallelRequestThread(calc, signal, finish, index); new Thread(client).start(); } signal.countDown(); finish.await(); sw.stop(); String tip = String.format("加法計(jì)算RPC調(diào)用總共耗時(shí): [%s] 毫秒", sw.getTime()); System.out.println(tip); } public static void parallelMultiCalcTask(MultiCalculate calc, int parallel) throws InterruptedException { //開始計(jì)時(shí) StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { MultiCalcParallelRequestThread client = new MultiCalcParallelRequestThread(calc, signal, finish, index); new Thread(client).start(); } signal.countDown(); finish.await(); sw.stop(); String tip = String.format("乘法計(jì)算RPC調(diào)用總共耗時(shí): [%s] 毫秒", sw.getTime()); System.out.println(tip); } public static void addTask(AddCalculate calc, int parallel) throws InterruptedException { RpcParallelTest.parallelAddCalcTask(calc, parallel); TimeUnit.MILLISECONDS.sleep(30); } public static void multiTask(MultiCalculate calc, int parallel) throws InterruptedException { RpcParallelTest.parallelMultiCalcTask(calc, parallel); TimeUnit.MILLISECONDS.sleep(30); } public static void main(String[] args) throws Exception { //并行度10000 int parallel = 10000; //加載Spring配置信息 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:rpc-invoke-config-client.xml"); //并發(fā)進(jìn)行RPC加法計(jì)算、乘法計(jì)算請求 addTask((AddCalculate) context.getBean("addCalc"), parallel); multiTask((MultiCalculate) context.getBean("multiCalc"), parallel); System.out.printf("[author tangjie] Netty RPC Server 消息協(xié)議序列化并發(fā)驗(yàn)證結(jié)束!\n\n"); context.destroy(); } }
Netty RPC客戶端運(yùn)行情況,具體截圖如下:下面是開始收到RPC服務(wù)器加法計(jì)算的結(jié)果截圖。

好了,加法RPC請求計(jì)算完畢,控制臺打印出請求耗時(shí)。

接著是調(diào)用RPC并行乘法計(jì)算請求,同樣,控制臺上也打印出請求耗時(shí)。

接著RPC的客戶端運(yùn)行完畢、退出,我們繼續(xù)看下NettyRPC服務(wù)端的運(yùn)行截圖:

可以發(fā)現(xiàn),NettyRPC的服務(wù)端確實(shí)都收到了來自客戶端發(fā)起的RPC計(jì)算請求,給每個(gè)RPC消息標(biāo)識出了唯一的消息編碼,并進(jìn)行了RPC計(jì)算處理之后,成功的把消息應(yīng)答給了客戶端。
經(jīng)過一系列的模塊重構(gòu),終于將NettyRPC重新升級了一下,經(jīng)過這次重構(gòu)工作,感覺自己對Netty、Spring、Java線程模型的了解更加深入了,不積跬步無以至千里,千里之行始于足下。學(xué)習(xí)靠的就是這樣一點(diǎn)一滴的重復(fù)積累,才能將自己的能力提升一個(gè)臺階。
原創(chuàng)文章,加上本人才疏學(xué)淺,文筆有限,本文中有說得不對的地方,望各位同行不吝賜教。文中有忽略的地方希望讀者可以補(bǔ)充,錯(cuò)誤的地方還望斧正。
最后附上NettyRPC的開源項(xiàng)目地址:https://github.com/tang-jie/NettyRPC 中的NettyRPC 2.0項(xiàng)目。
感謝大家耐心閱讀NettyRPC系列文章,如果本文對你有幫助,請點(diǎn)下推薦吧!

浙公網(wǎng)安備 33010602011771號