1.為什么選擇Netty

上一篇文章我們已經了解了Socket通信(IO/NIO/AIO)編程,對于通信模型已經有了一個基本的認識。其實上一篇文章中,我們學習的僅僅是一個模型,如果想把這些真正的用于實際工作中,那么還需要不斷的完善、擴展和優(yōu)化。比如經典的TCP讀包寫包問題,或者是數據接收的大小,實際的通信處理與應答的處理邏輯等等一些細節(jié)問題需要認真的去思考,而這些都需要大量的時間和經歷,以及豐富的經驗。所以想學好Socket通信不是件容易事,那么接下來就來學習一下新的技術Netty,為什么會選擇Netty?因為它簡單!使用Netty不必編寫復雜的邏輯代碼去實現通信,再也不需要去考慮性能問題,不需要考慮編碼問題,半包讀寫等問題。強大的Netty已經幫我們實現好了,我們只需要使用即可。

Netty是最流行的NIO框架,它的健壯性、功能、性能、可定制性和可擴展性在同類框架都是首屈一指的。它已經得到成百上千的商業(yè)/商用項目驗證,如Hadoop的RPC框架Avro、RocketMQ以及主流的分布式通信框架Dubbox等等。

2.Netty簡介

Netty是基于Java NIO client-server的網絡應用框架,使用Netty可以快速開發(fā)網絡應用,例如服務器和客戶端協(xié)議。Netty提供了一種新的方式來開發(fā)網絡應用程序,這種新的方式使它很容易使用和具有很強的擴展性。Netty的內部實現是很復雜的,但是Netty提供了簡單易用的API從網絡處理代碼中解耦業(yè)務邏輯。Netty是完全基于NIO實現的,所以整個Netty都是異步的。

網絡應用程序通常需要有較高的可擴展性,無論是Netty還是其他的基于Java Nio的框架,都會提供可擴展性的解決方案。Netty中一個關鍵組成部分是它的異步特性,本片文章將討論同步(阻塞)和異步(非阻塞)的IO來說明為什么使用異步代碼解決擴展性問題以及如何使用異步。

3.Netty架構組成(借用一下網上的圖片)

 

 

 

 

Netty實現原理淺析,寫的很不錯,感興趣的可以看一下。

4.Helloworld入門

在學習Netty之前,先來回顧一下NIO的通信步驟:

①創(chuàng)建ServerSocketChannel,為其配置非阻塞模式。

②綁定監(jiān)聽,配置TCP參數,錄入backlog大小等。

③創(chuàng)建一個獨立的IO線程,用于輪詢多路復用器Selector。

④創(chuàng)建Selector,將之前創(chuàng)建的ServerSocketChannel注冊到Selector上,并設置監(jiān)聽標識位SelectionKey.OP_ACCEPT。

⑤啟動IO線程,在循環(huán)體中執(zhí)行Selector.select()方法,輪詢就緒的通道。

⑥當輪詢到處于就緒狀態(tài)的通道時,需要進行操作位判斷,如果是ACCEPT狀態(tài),說明是新的客戶端接入,則調用accept方法接收新的客戶端。

⑦設置新接入客戶端的一些參數,如非阻塞,并將其繼續(xù)注冊到Selector上,設置監(jiān)聽標識位等。

⑧如果輪詢的通道標識位是READ,則進行讀取,構造Buffer對象等。

⑨更細節(jié)的問題還有數據沒發(fā)送完成繼續(xù)發(fā)送的問題......

好啦,開始學習Netty了。先去http://netty.io/上下載所有的Netty包。

Netty通信的步驟:

①創(chuàng)建兩個NIO線程組,一個專門用于網絡事件處理(接受客戶端的連接),另一個則進行網絡通信的讀寫。

②創(chuàng)建一個ServerBootstrap對象,配置Netty的一系列參數,例如接受傳出數據的緩存大小等。

③創(chuàng)建一個用于實際處理數據的類ChannelInitializer,進行初始化的準備工作,比如設置接受傳出數據的字符集、格式以及實際處理數據的接口。

④綁定端口,執(zhí)行同步阻塞方法等待服務器端啟動即可。

強烈推薦讀一讀Netty官方翻譯文檔。

好了,說了那么多,下面就來HelloWorld入門吧!

服務器端:

public class Server {
 
    private int port;
 
    public Server(int port) {
        this.port = port;
    }
 
    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); //用于處理服務器端接收客戶端連接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //進行網絡通信(讀寫)
        try {
            ServerBootstrap bootstrap = new ServerBootstrap(); //輔助工具類,用于服務器通道的一系列配置
            bootstrap.group(bossGroup, workerGroup) //綁定兩個線程組
                    .channel(NioServerSocketChannel.class) //指定NIO的模式
                    .childHandler(new ChannelInitializer<SocketChannel>() { //配置具體的數據處理方式
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    })
                    /**
                     * 對于ChannelOption.SO_BACKLOG的解釋:
                     * 服務器端TCP內核維護有兩個隊列,我們稱之為A、B隊列。客戶端向服務器端connect時,會發(fā)送帶有SYN標志的包(第一次握手),服務器端
                     * 接收到客戶端發(fā)送的SYN時,向客戶端發(fā)送SYN ACK確認(第二次握手),此時TCP內核模塊把客戶端連接加入到A隊列中,然后服務器接收到
                     * 客戶端發(fā)送的ACK時(第三次握手),TCP內核模塊把客戶端連接從A隊列移動到B隊列,連接完成,應用程序的accept會返回。也就是說accept
                     * 從B隊列中取出完成了三次握手的連接。
                     * A隊列和B隊列的長度之和就是backlog。當A、B隊列的長度之和大于ChannelOption.SO_BACKLOG時,新的連接將會被TCP內核拒絕。
                     * 所以,如果backlog過小,可能會出現accept速度跟不上,A、B隊列滿了,導致新的客戶端無法連接。要注意的是,backlog對程序支持的
                     * 連接數并無影響,backlog影響的只是還沒有被accept取出的連接
                     */
                    .option(ChannelOption.SO_BACKLOG, 128) //設置TCP緩沖區(qū)
                    .option(ChannelOption.SO_SNDBUF, 32 * 1024) //設置發(fā)送數據緩沖大小
                    .option(ChannelOption.SO_RCVBUF, 32 * 1024) //設置接受數據緩沖大小
                    .childOption(ChannelOption.SO_KEEPALIVE, true); //保持連接
            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
 
    public static void main(String[] args) {
        new Server(8379).run();
    }
}

 

  

 

ServerHandler類:

public class ServerHandler  extends ChannelHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            //do something msg
            ByteBuf buf = (ByteBuf)msg;
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            String request = new String(data, "utf-8");
            System.out.println("Server: " + request);
            //寫給客戶端
            String response = "我是反饋的信息";
            ctx.writeAndFlush(Unpooled.copiedBuffer("888".getBytes()));
            //.addListener(ChannelFutureListener.CLOSE);
            
 
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
 
}

 

 

客戶端:

public class Client {
 
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                });
        ChannelFuture future = bootstrap.connect("127.0.0.1", 8379).sync();
        future.channel().writeAndFlush(Unpooled.copiedBuffer("777".getBytes()));
        future.channel().closeFuture().sync();
        workerGroup.shutdownGracefully();
    }
 
}

  ClientHandler類:

public class ClientHandler extends ChannelHandlerAdapter {
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] data = new byte[buf.readableBytes()];
            buf.readBytes(data);
            System.out.println("Client:" + new String(data).trim());
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
 
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
 
}

 

運行結果:

 

 

 

 

5.TCP粘包、拆包問題

熟悉TCP編程的可能都知道,無論是服務器端還是客戶端,當我們讀取或者發(fā)送數據的時候,都需要考慮TCP底層的粘包/拆包機制。

TCP是一個“流”協(xié)議,所謂流就是沒有界限的遺傳數據。大家可以想象一下,如果河水就好比數據,他們是連成一片的,沒有分界線,TCP底層并不了解上層業(yè)務數據的具體含義,它會根據TCP緩沖區(qū)的具體情況進行包的劃分,也就是說,在業(yè)務上一個完整的包可能會被TCP分成多個包進行發(fā)送,也可能把多個小包封裝成一個大的數據包發(fā)送出去,這就是所謂的粘包/拆包問題。

解決方案:

①消息定長,例如每個報文的大小固定為200個字節(jié),如果不夠,空位補空格。

②在包尾部增加特殊字符進行分割,例如加回車等。

③將消息分為消息頭和消息體,在消息頭中包含表示消息總長度的字段,然后進行業(yè)務邏輯的處理。

Netty中解決TCP粘包/拆包的方法:

①分隔符類:DelimiterBasedFrameDecoder(自定義分隔符)

②定長:FixedLengthFrameDecoder

         

 

         

 

 

 

6.Netty編解碼技術

通常我們也習慣將編碼(Encode)成為序列化,它將數據序列化為字節(jié)數組,用于網絡傳輸、數據持久化或者其他用途。反之,解碼(Decode)/反序列化(deserialization)

把從網絡、磁盤等讀取的字節(jié)數組還原成原始對象(通常是原始對象的拷貝),以方便后續(xù)的業(yè)務邏輯操作。進行遠程跨進程服務調用時(例如RPC調用),需要使用特定的編解碼技術,對需要進行網絡傳輸的對象做編碼或者解碼,以便完成遠程調用。

主流的編解碼框架:

①JBoss的Marshalling包

②google的Protobuf

③基于Protobuf的Kyro

④MessagePack框架

上代碼,一讀就懂,注意紅色字體部分。

服務器端:

public class Server {

     public Server(int port) {
        EventLoopGroup bossGroup = newNioEventLoopGroup();
        EventLoopGroup workerGroup = newNioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = newServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                   .channel(NioServerSocketChannel.class)
                    .handler(newLoggingHandler(LogLevel.INFO))
                    .childHandler(newChannelInitializer<SocketChannel>() {

                        @Override
                        protected voidinitChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                            socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                           socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    })
                   .option(ChannelOption.SO_BACKLOG,1024)
                   .option(ChannelOption.SO_RCVBUF, 32 * 1024)
                   .option(ChannelOption.SO_SNDBUF, 32 * 1024)
                   .option(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture future = bootstrap.bind(port).sync();
           future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

 

    public static void main(String[] args) {
        new Server(8765);
    }

}

ServerHandler類:

public classServerHandler extends ChannelHandlerAdapter {
 
    @Override
    public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

 

    @Override

    public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

 

    @Override

    public void channelRead(ChannelHandlerContextctx, Object msg) throws Exception {
        Request request = (Request) msg;
        System.out.println("Server:"+ request.getId() + "," + request.getName() + "," +request.getReqeustMessag());
         Response response = new Response();
        response.setId(request.getId());
        response.setName("response "+ request.getId());
        response.setResponseMessage("響應內容:" +request.getReqeustMessag());
        byte[] unGizpData =GzipUtils.unGzip(request.getAttachment());
        char separator = File.separatorChar;
        FileOutputStream outputStream = newFileOutputStream(System.getProperty("user.dir") + separator +"recieve" + separator + "1.png");
        outputStream.write(unGizpData);
        outputStream.flush();
        outputStream.close();
        ctx.writeAndFlush(response);
    }
}

public class Client {

     public static void main(String[] args) {
       EventLoopGroup workerGroup = newNioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                    .handler(newLoggingHandler(LogLevel.INFO))
                   .channel(NioSocketChannel.class)
                    .handler(newChannelInitializer<SocketChannel>() {
                        

            @Override
                       protected voidinitChannel(SocketChannel socketChannel) throws Exception {
                           socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                           socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                           socketChannel.pipeline().addLast(new ClientHandler());
                       }
                    });

            ChannelFuture future =bootstrap.connect(new InetSocketAddress("127.0.01", 8765)).sync();

            for(int i=1; i<=5; i++) {
                Request request = newRequest();
                request.setId(i);
                request.setName("pro"+ i);
                request.setReqeustMessag("數據信息" + i);
                //傳輸圖片
                char separator =File.separatorChar;
                File file = newFile(System.getProperty("user.dir") + separator + "source"+ separator + "2.jpg");
                FileInputStream inputStream = newFileInputStream(file);
                byte[] data = newbyte[inputStream.available()];
                inputStream.read(data);
                inputStream.close();
                byte[] gzipData =GzipUtils.gzip(data);
               request.setAttachment(gzipData);
               future.channel().writeAndFlush(request);
            }
          future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }

}


ClientHandler類:


public classClientHandler extends ChannelHandlerAdapter {

   @Override
    public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

 


    @Override

    public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

     @Override

    public voidchannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        Response response = (Response) msg;

      System.out.println("Client:"+ response.getId() + "," + response.getName() + "," +response.getResponseMessage());

    }


}


Marshalling工具類:


public final classMarshallingCodeCFactory {

    /**
     * 創(chuàng)建Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */

    public static MarshallingDecoderbuildMarshallingDecoder() {

          //首先通過Marshalling工具類的精通方法獲取Marshalling實例對象 參數serial標識創(chuàng)建的是java序列化工廠對象。
                  final MarshallerFactorymarshallerFactory =Marshalling.getProvidedMarshallerFactory("serial");
                  //創(chuàng)建了MarshallingConfiguration對象,配置了版本號為5
                  final MarshallingConfigurationconfiguration = new MarshallingConfiguration();
                  configuration.setVersion(5);
                 //根據marshallerFactory和configuration創(chuàng)建provider
                 UnmarshallerProvider provider= new DefaultUnmarshallerProvider(marshallerFactory, configuration);
                  //構建Netty的MarshallingDecoder對象,倆個參數分別為provider和單個消息序列化后的最大長度
                  MarshallingDecoder decoder =new MarshallingDecoder(provider, 1024 * 1024);
                  return decoder;
    }

 


    /**

     * 創(chuàng)建Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoderbuildMarshallingEncoder() {
                  final MarshallerFactorymarshallerFactory =Marshalling.getProvidedMarshallerFactory("serial");
                  final MarshallingConfigurationconfiguration = new MarshallingConfiguration();
                  configuration.setVersion(5);
                  MarshallerProvider provider =new DefaultMarshallerProvider(marshallerFactory, configuration);
                  //構建Netty的MarshallingEncoder對象,MarshallingEncoder用于實現序列化接口的POJO對象序列化為二進制數組
                  MarshallingEncoder encoder =new MarshallingEncoder(provider);
                  return encoder;

    }


}


Gizp壓縮與解壓縮工具類:


public classGzipUtils {

   public static byte[] gzip(byte[] val)throws IOException {
        ByteArrayOutputStream bos = newByteArrayOutputStream(val.length);
        GZIPOutputStream gos = null;
        try {
            gos = new GZIPOutputStream(bos);
            gos.write(val, 0, val.length);
            gos.finish();
            gos.flush();
            bos.flush();
           val = bos.toByteArray();
        } finally {
            if (gos != null)
                gos.close();
            if (bos != null)
                bos.close();
        }
        return val;
    }

 


    public static byte[] unGzip(byte[] buf)throws IOException {

        GZIPInputStream gzi = null;
        ByteArrayOutputStream bos = null;

        try {

            gzi = new GZIPInputStream(newByteArrayInputStream(buf));
            bos = newByteArrayOutputStream(buf.length);
            int count = 0;
            byte[] tmp = new byte[2048];
            while ((count = gzi.read(tmp)) !=-1) {
              bos.write(tmp, 0, count);
            }
            buf = bos.toByteArray();
        } finally {
            if (bos != null) {
                bos.flush();
                bos.close();
           }
            if (gzi != null)
                gzi.close();
        }
        return buf;
    }
}

 

 

7.最佳實踐

(1)數據通信

我們需要了解在真正項目中如何使用Netty,大體上對于一些參數設置都是根據服務器性能決定的。我們需要考慮的問題是兩臺機器(甚至多臺)使用Netty怎樣進行通信。

大體上分為三種:
     ①使用長連接通道不斷開的形式進行通信,也就是服務器和客戶端的通道一直處于開啟狀態(tài),如果服務器性能足夠好,并且客戶端數量也比較上的情況下,推薦這種方式。
     ②一次性批量提交數據,采用短連接方式。也就是說先把數據保存到本地臨時緩存區(qū)或者臨時表,當達到界值時進行一次性批量提交,又或者根據定時任務輪詢提交,

這種情況的弊端是做不到實時性傳輸,對實時性要求不高的應用程序中推薦使用。
     ③使用一種特殊的長連接,在某一指定時間段內,服務器與某臺客戶端沒有任何通信,則斷開連接。下次連接則是客戶端向服務器發(fā)送請求的時候,再次建立連接。
     在這里將介紹使用Netty實現第三種方式的連接,但是我們需要考慮兩個因素:
     ①如何在超時(即服務器和客戶端沒有任何通信)后關閉通道?關閉通道后又如何再次建立連接?
     ②客戶端宕機時,我們無需考慮,下次重啟客戶端之后就可以與服務器建立連接,但服務器宕機時,客戶端如何與服務器端通信?

服務器端:增加了紅色框部分

                 

 

 

客戶端(注意紅色字體部分):

public class Client {

 

    private static class SingleHodler {

        static final Client client = newClient();

    }

 

    public static Client getInstance() {

        return SingleHodler.client;

    }

 

    private EventLoopGroup workerGroup;

    private Bootstrap bootstrap;

    private ChannelFuture future;

 

    private Client() {

        workerGroup = new NioEventLoopGroup();

        bootstrap = new Bootstrap();

        bootstrap.group(workerGroup)

               .channel(NioSocketChannel.class)

                .handler(newChannelInitializer<SocketChannel>() {

                    @Override

                    protected voidinitChannel(SocketChannel socketChannel) throws Exception {

                       socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

                       socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

                        socketChannel.pipeline().addLast(newReadTimeoutHandler(5)); //5秒后未與服務器通信,則斷開連接。

                       socketChannel.pipeline().addLast(new ClientHandler());

                    }

                });

    }

 

    public void connect() {

        try {

            future =bootstrap.connect("127.0.0.1", 8765).sync();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

 

    public ChannelFuture getFuture() {

        if(future == null ||!future.channel().isActive()) {

            this.connect();

        }

        return future;

    }

 

    public static void main(String[] args)throws InterruptedException {

        Client client = getInstance();

        ChannelFuture future = client.getFuture();

 

        for(int i=1; i<=3; i++) {

            Message message = new Message(i,"pro" + i, "數據信息" + i);

           future.channel().writeAndFlush(message);

            Thread.sleep(4000);  //休眠4秒后再發(fā)送數據

        }

 

        future.channel().closeFuture().sync();

 

        new Thread(() -> {

            try {

                System.out.println("子線程開始....");

                ChannelFuture f =client.getFuture();

                Message message = newMessage(4, "pro" + 4, "數據信息" + 4);

                f.channel().writeAndFlush(message);

               f.channel().closeFuture().sync();

            } catch (Exception e) {

                e.printStackTrace();

            }

        }).start();

 

        System.out.println("主線程退出......");

    }

}

 

其他的類與之前的一樣,沒有變化。

運行結果:

 

       

 

  

(2)心跳檢測

  我們使用Socket通信一般經常會處理多個服務器之間的心跳檢測,一般來講我們去維護服務器集群,肯定要有一臺或多臺服務器主機(Master),然后還應該有N臺(Slave),那么我們的主機肯定要時時刻刻知道自己下面的從服務器的各方面情況,然后進行實時監(jiān)控的功能。這個在分布式架構里交做心跳檢測或者心跳監(jiān)控。最佳處理方案是使用一些通信框架進行實現,Netty就可以做這樣的事。

這個例子需要使用Sigar,不熟悉的可以看這篇文章。

Server

public class Server {

    public Server(int port) {

        EventLoopGroup bossGroup = newNioEventLoopGroup();

        EventLoopGroup workerGroup = newNioEventLoopGroup();

        try {

            ServerBootstrap bootstrap = newServerBootstrap();

            bootstrap.group(bossGroup,workerGroup)

                   .channel(NioServerSocketChannel.class)

                    .childHandler(newChannelInitializer<SocketChannel>() {

                        @Override

                        protected voidinitChannel(SocketChannel sc) throws Exception {

                           sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

                            sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

                           sc.pipeline().addLast(new ServerHeartBeatHandler());

                        }

                    })

                    .handler(newLoggingHandler(LogLevel.INFO))

                   .option(ChannelOption.SO_BACKLOG, 1024);

            ChannelFuture future =bootstrap.bind(new InetSocketAddress("127.0.0.1", port)).sync();

           future.channel().closeFuture().sync();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

    }

 

    public static void main(String[] args) {

        new Server(8765);

    }

}

ServerHeartBeatHandler類:

public classServerHeartBeatHandler extends ChannelHandlerAdapter {

 

    private static Map<String, String>AUTH_IP_MAP = new HashMap<>();

    private static final String SUCCESS_KEY ="auth_success_key";

 

    static {

        AUTH_IP_MAP.put("192.168.3.176","1234");

    }

 

    private boolean auth(ChannelHandlerContextctx, Object msg) {

        String[] rets = ((String)msg).split(",");

        String auth = AUTH_IP_MAP.get(rets[0]);

        if(auth != null &&auth.equals(rets[1])) {

            ctx.writeAndFlush(SUCCESS_KEY);

            return true;

        } else {

            ctx.writeAndFlush("authfailure!").addListener(ChannelFutureListener.CLOSE);

            return false;

        }

    }

 

    @Override

    public void channelRead(ChannelHandlerContextctx, Object msg) throws Exception {

        if(msg instanceof String) {

            auth(ctx, msg);

        } else if(msg instanceof RequestInfo) {

            RequestInfo info = (RequestInfo)msg;

            System.out.println("----------------------------------------------");

            System.out.println("當前主機ip:" +info.getIp());

            System.out.println("當前主機cpu:情況");

            Map<String, Object> cpuMap =info.getCpuPercMap();

            System.out.println("總使用率:" +  cpuMap.get("combined"));

            System.out.println("用戶使用率:" +cpuMap.get("user"));

            System.out.println("系統(tǒng)使用率:" +cpuMap.get("sys"));

            System.out.println("等待率:" +cpuMap.get("wait"));

            System.out.println("空閑率:" +cpuMap.get("idle"));

            System.out.println("當前主機memory情況:");

            Map<String, Object> memMap =info.getMemoryMap();

            System.out.println("內存總量:" +memMap.get("total"));

            System.out.println("當前內存使用量:" +memMap.get("used"));

            System.out.println("當前內存剩余量:" +memMap.get("free"));

           System.out.println("-----------------------------------------------");

            ctx.writeAndFlush("inforeceived!");

        } else {

            ctx.writeAndFlush("connectfailure").addListener(ChannelFutureListener.CLOSE);

        }

    }

}

Client類:

public class Client {

    public static void main(String[] args) {

        EventLoopGroup workerGroup = newNioEventLoopGroup();

        try {

            Bootstrap bootstrap = newBootstrap();

            bootstrap.group(workerGroup)

                   .channel(NioSocketChannel.class)

                    .handler(newChannelInitializer<SocketChannel>() {

                        @Override

                        protected void initChannel(SocketChannelsc) throws Exception {

                           sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());

                           sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());

                           sc.pipeline().addLast(new ClientHeartBeatHandler());

                        }

                    });

            ChannelFuture future =bootstrap.connect(new InetSocketAddress("127.0.0.1", 8765)).sync();

            future.channel().closeFuture().sync();

        } catch (Exception e) {

            e.printStackTrace();

        } finally {

            workerGroup.shutdownGracefully();

        }

    }

}

ClientHeartBeatHandler類:

public classClientHeartBeatHandler extends ChannelHandlerAdapter {

 

    private ScheduledExecutorService scheduled= Executors.newScheduledThreadPool(1);

    private ScheduledFuture<?> heartBeat;

    private InetAddress address;

    private static final String SUCCESS_KEY ="auth_success_key";

 

    @Override

    public voidchannelActive(ChannelHandlerContext ctx) throws Exception {

        address = InetAddress.getLocalHost();

        String ip = address.getHostAddress();

        String key = "1234";

        String auth = ip + "," + key;

        ctx.writeAndFlush(auth);

    }

 

    @Override

    public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        cause.printStackTrace();

        if(heartBeat != null) {

            heartBeat.cancel(true);

            heartBeat = null;

        }

        ctx.fireExceptionCaught(cause);

    }

 

    @Override

    public voidchannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        try {

            if(msg instanceof String) {

                String data = (String) msg;

                if(SUCCESS_KEY.equals(data)) {

                    heartBeat =scheduled.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5,TimeUnit.SECONDS);

                    System.out.println(msg);

                } else {

                    System.out.println(msg);

                }

            }

        } finally {

            ReferenceCountUtil.release(msg);

        }

    }

 

    private class HeartBeatTask implements Runnable{

        private final ChannelHandlerContextctx;

 

        publicHeartBeatTask(ChannelHandlerContext ctx) {

            this.ctx = ctx;

        }

 

        @Override

        public void run() {

            try {

                RequestInfo requestInfo = newRequestInfo();

               requestInfo.setIp(address.getHostAddress());

                Sigar sigar = new Sigar();

                CpuPerc cpuPerc =sigar.getCpuPerc();

                Map<String, Object>cpuPercMap = new HashMap<>();

                cpuPercMap.put("combined",cpuPerc.getCombined());

               cpuPercMap.put("user", cpuPerc.getUser());

                cpuPercMap.put("sys",cpuPerc.getSys());

               cpuPercMap.put("wait", cpuPerc.getWait());

                cpuPercMap.put("idle",cpuPerc.getIdle());

 

                Mem mem = sigar.getMem();

                Map<String, Object>memoryMap = new HashMap<>();

               memoryMap.put("total", mem.getTotal() / (1024 * 1024));

                memoryMap.put("used",mem.getUsed() / (1024 * 1024));

                memoryMap.put("free",mem.getFree() / (1024 * 1024));

 

               requestInfo.setCpuPercMap(cpuPercMap);

               requestInfo.setMemoryMap(memoryMap);

 

                ctx.writeAndFlush(requestInfo);

            } catch (Exception e) {

                e.printStackTrace();

            }

        }

    }

}

RequestInfo類:

public classRequestInfo implements Serializable {

 

         private String ip ;

         private Map<String, Object>cpuPercMap ;

         private Map<String, Object>memoryMap;

         //.. other field

 

         public String getIp() {

                  return ip;

         }

 

         public void setIp(String ip) {

                  this.ip = ip;

         }

 

         public Map<String, Object>getCpuPercMap() {

                  return cpuPercMap;

         }

 

         public voidsetCpuPercMap(Map<String, Object> cpuPercMap) {

                  this.cpuPercMap = cpuPercMap;

         }

 

         public Map<String, Object>getMemoryMap() {

                  return memoryMap;

         }

 

         public void setMemoryMap(Map<String,Object> memoryMap) {

                  this.memoryMap = memoryMap;

         }

}

 

MarshallingCodeCFactory類就不貼出來了,跟之前的一樣。

運行結果:

 

        

 

   每5秒發(fā)送一次數據到服務器端,這樣主服務器就可以知道每臺從服務器的狀態(tài)了。當然,這只是一個簡單的小例子,真實環(huán)境中肯定需要更嚴格的校驗。

 


原文鏈接:https://blog.csdn.net/haoyuyang/article/details/53243785