BIO
BIO
1BIO介紹
傳統(tǒng)阻塞Java IO編程,其相關(guān)的類和接口在Java.io 包中
BIO(blocking I/O)同步阻塞,服務(wù)器實(shí)現(xiàn)模式為一個(gè)連接一個(gè)線程,即客戶端有連接請(qǐng)求時(shí)服務(wù)器端就需要啟動(dòng)一個(gè)線程進(jìn)行處理,如果這個(gè)連接不做任何事情會(huì)造成不必要的線程開銷,可以通過線程池機(jī)制改善 (實(shí)現(xiàn)多個(gè)客戶連接服務(wù)器)
2工作機(jī)制
3BIO傳統(tǒng)實(shí)例
網(wǎng)絡(luò)編程的基本模型是Client/Server模型,也就是兩個(gè)進(jìn)程之間進(jìn)行相互通信,其中服務(wù)端提供位置信息(綁定IP地址和端口),客戶端通過連接操作向服務(wù)端監(jiān)聽的端口地址發(fā)起連接請(qǐng)求,基于TCP協(xié)議下進(jìn)行三次握手連接,連接成功后,雙方通過網(wǎng)絡(luò)套接字(Socket)進(jìn)行通信。
傳統(tǒng)的同步阻塞模型開發(fā)中,服務(wù)端ServerSocket負(fù)責(zé)綁定IP地址,啟動(dòng)監(jiān)聽端口;客戶端Socket負(fù)責(zé)發(fā)起 連接操作。連接成功后,雙方通過輸入和輸出流進(jìn)行同步阻塞式通信。
基于BIO模式下的通信,客戶端-服務(wù)端是完全同步,完全藕合的。
客戶端案例如下:
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/*** 客戶端 */
public class Client {
public static void main(String[] args) {
try {//1.創(chuàng)建Socket對(duì)象請(qǐng)求服務(wù)端的連接
Socket socket = new Socket("127.0.0.1", 9999);
//2.從Socket對(duì)象中獲取一個(gè)字節(jié)輸出流
OutputStream os = socket.getOutputStream();
//3.把字節(jié)輸出流包裝成一個(gè)打印流
PrintStream ps = new PrintStream(os);
// 單詞發(fā)送消息
// ps.print("hello World! 服務(wù)端,你好");
// ps.println("hello World! 服務(wù)端,你好");
// ps.flush();
// 發(fā)送多條消息
Scanner sc = new Scanner(System.in);
while (true) {
System.out.print("請(qǐng)說:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
} catch (
IOException e) {
e.printStackTrace();
}
}
}
服務(wù)端案例如下:
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
/**
* TODO 服務(wù)端
* 服務(wù)端會(huì)一直等待客戶端的消息,如果客戶端沒有進(jìn)行消息的發(fā)送,服務(wù)端將一直進(jìn)入阻塞狀態(tài)
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:09
*/
public class Server {
public static void main(String[] args) {
try{
System.out.println("===服務(wù)端啟動(dòng)===");
// 1、定義一個(gè)ServerSocket對(duì)象進(jìn)行服務(wù)端的端口注冊(cè)
ServerSocket ss = new ServerSocket(9999);
// 2、監(jiān)聽客戶端的Socket連接請(qǐng)求
Socket socket = ss.accept();
// 3、從socket管道中得到一個(gè)字節(jié)輸入流對(duì)象
InputStream is = socket.getInputStream();
// 4、把字節(jié)輸入流包裝成一個(gè)緩存字符輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
// 接收多條消息
while ((msg = br.readLine()) != null){
System.out.println("服務(wù)端收到:" + msg);
}
// 接收一條消息
// if ((msg = br.readLine()) != null){
// System.out.println("服務(wù)端收到:" + msg);
// }
}catch (Exception e){
e.printStackTrace();
}
}
}
小結(jié)
-
在以上通信中,服務(wù)端會(huì)一直等待客戶端的消息,如果客戶端沒有進(jìn)行消息的發(fā)送,服務(wù)端將一直進(jìn)入阻塞狀態(tài)
-
同時(shí)服務(wù)端是按照行獲取消息的,這意味育客戶端也必須按照行進(jìn)行消息的發(fā)送,否則服務(wù)端將進(jìn)入等待消息的阻塞狀態(tài)!
4BIO模式下接收多個(gè)客戶端
在上述的案例中,一個(gè)服務(wù)端只能接收一個(gè)客戶端的通信請(qǐng)求,那么如果服務(wù)端需要處理很多個(gè)客戶端的消息通信請(qǐng)求應(yīng)該如何處理呢,此時(shí)我們就需要在服務(wù)端引入線程了,也就是說客戶端每發(fā)起一個(gè)請(qǐng)求,服務(wù)端就創(chuàng)建一個(gè)新的線程來處理這個(gè)客戶端請(qǐng)求,這樣就實(shí)現(xiàn)了一個(gè)客戶端一個(gè)線程的模型
圖解如下:
客戶端代碼:
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:20
*/
public class Client {
public static void main(String[] args) {
try{
// 1、請(qǐng)求與服務(wù)器的Socket對(duì)象連接
Socket socket = new Socket("localhost", 9999);
// 2、得到一個(gè)打印流
PrintStream ps = new PrintStream(socket.getOutputStream());
// 3、循環(huán)不斷的發(fā)送消息給服務(wù)器端接收
Scanner sc = new Scanner(System.in);
while (true){
System.out.print(" [root@localhost]:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
}catch (Exception e){
}
}
}
服務(wù)端代碼:
import java.net.ServerSocket;
import java.net.Socket;
/**
* TODO 服務(wù)端可以實(shí)現(xiàn)同時(shí)接收多個(gè)客戶端的Socket通信需求
*
* 服務(wù)端每接收到一個(gè)客戶端Socket請(qǐng)求對(duì)象之后都交給一個(gè)獨(dú)立的線程來處理客戶端的數(shù)據(jù)交互需求
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:28
*/
public class Server {
public static void main(String[] args) {
try {
// 1、注冊(cè)端口
ServerSocket ss = new ServerSocket(9999);
// 2、循環(huán)接收客戶端的Socket連接請(qǐng)求
while (true) {
Socket socket = ss.accept();
// 3、創(chuàng)建一個(gè)獨(dú)立的線程來處理與這個(gè)客戶端的socket通信需求
new ServerThreadReader(socket).start();
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
線程類:
public class ServerThreadReader extends Thread{
private Socket socket;
public ServerThreadReader(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try{
// 1、從socket對(duì)象中得到一個(gè)字節(jié)輸入流
InputStream is = socket.getInputStream();
// 2、使用緩存字符輸入流包裝字節(jié)輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
// 3、循環(huán)打印客戶端的消息
while ((msg = br.readLine()) != null){
System.out.println(msg);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
小結(jié):
-
每個(gè)Socket接收到,都會(huì)創(chuàng)建一個(gè)線程,線程的競(jìng)爭(zhēng)、切換上下文影響性能
-
每個(gè)線程都會(huì)占用棧空間和CPU資源
-
并不是每個(gè)socket都進(jìn)行l(wèi)O操作,無意義的線程處理
-
客戶端的并發(fā)訪問增加時(shí)。服務(wù)端將呈現(xiàn)1:1的線程開銷,訪問量越大,系統(tǒng)將發(fā)生線程棧溢出,線程創(chuàng)建失敗,最終導(dǎo)致進(jìn)程宕機(jī)或者僵死,從而不能對(duì)外提供服務(wù)
5偽異步I/O
在上述案例中,客戶端的并發(fā)訪問增加時(shí)。服務(wù)端將呈現(xiàn)1:1的線程開銷,訪問量越大,系統(tǒng)將發(fā)送線程棧溢出,線程創(chuàng)建失敗,最終導(dǎo)致進(jìn)程宕機(jī)或者僵死,從而不能對(duì)外提供服務(wù)。
接下來我們采用一個(gè)偽異步I/O的通信框架,采用線程池和任務(wù)隊(duì)列實(shí)現(xiàn),當(dāng)客戶端接入時(shí),將客戶端的Socket封裝成一個(gè)Task(該任務(wù)實(shí)現(xiàn)Java. lang. Runnable(線程任務(wù)接口)交給后端的線程池中進(jìn)行處理。JDK的線程池維護(hù)一個(gè)消息隊(duì)列和N個(gè)活躍的線程,對(duì)消息隊(duì)列中Socket任務(wù)進(jìn)行處理,由于線程池可以設(shè)置消息隊(duì)列的大小和最大線程數(shù),因此,它的資源占用是可控的,無論多少個(gè)客戶端并發(fā)訪問,都不會(huì)導(dǎo)致資源的耗盡和宕機(jī)。
客戶端代碼:
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* TODO 客戶端
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:42
*/
public class Client {
public static void main(String[] args) {
try{
// 1、請(qǐng)求與服務(wù)器端的Socket對(duì)象連接
Socket socket = new Socket("localhost", 9999);
// 2、得到一個(gè)打印流
PrintStream ps = new PrintStream(socket.getOutputStream());
// 3、使用循環(huán)不斷向服務(wù)端發(fā)送消息
Scanner sc = new Scanner(System.in);
while (true){
System.out.print(" [root@localhost]:");
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
線程池處理類:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* TODO 線程池處理類
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:46
*/
public class HandlerSocketServerPool {
// 1、創(chuàng)建一個(gè)線程池的成員變量用于存儲(chǔ)一個(gè)線程池對(duì)象
private ExecutorService executorService;
/**
* 2、創(chuàng)建這個(gè)類的對(duì)象的時(shí)候就需要初始化線程池對(duì)象
* public ThreadPoolExecutor(int corePoolSize,
* int maximumPoolSize,
* long keepAliveTime,
* TimeUnit unit,
* BlockingQueue<Runnable> workQueue)
*/
public HandlerSocketServerPool(int maxThreadNum, int queueSize) {
executorService = new ThreadPoolExecutor(
3,
maxThreadNum,
120,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
/**
* 3、提供一個(gè)方法來提交任務(wù)給線程池的任務(wù)隊(duì)列來暫存,等待線程池來處理
*/
public void execute(Runnable target){
executorService.execute(target);
}
}
Socket 任務(wù)類:
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* TODO 線程任務(wù)類
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:52
*/
public class ServerRunnableTarget implements Runnable{
private Socket socket;
public ServerRunnableTarget(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
// 處理接收到客戶端socket通信需求
try{
// 1、從socket管道中得到一個(gè)字節(jié)輸入流對(duì)象
InputStream is = socket.getInputStream();
// 2、把字節(jié)輸入流包裝成一個(gè)緩存字符輸入流
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
while ((msg = br.readLine()) != null){
System.out.println("服務(wù)端收到 => " + msg);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
服務(wù)端代碼:
import java.net.ServerSocket;
import java.net.Socket;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 11:54
*/
public class Server {
public static void main(String[] args) {
try{
// 1、注冊(cè)端口
ServerSocket ss = new ServerSocket(9999);
// 2、定義死循環(huán),負(fù)責(zé)不斷接收客戶端的Socket請(qǐng)求
// 初始化一個(gè)線程池對(duì)象
HandlerSocketServerPool pool = new HandlerSocketServerPool(3, 10);
while (true){
Socket socket = ss.accept();
// 3、把socket對(duì)象交給一個(gè)線程池進(jìn)行處理
ServerRunnableTarget target = new ServerRunnableTarget(socket);
pool.execute(target);
}
}catch(Exception e){
e.printStackTrace();
}
}
}
6BIO文件上傳
支持任意類型文件形式的上傳
客戶端代碼:
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.Socket;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 14:58
*/
public class Client {
public static void main(String[] args) {
try(
InputStream is = new FileInputStream("/Library/Soft/data/io/1.jpg");
){
// 1、請(qǐng)求與服務(wù)端的Socket連接
Socket socket = new Socket("localhost", 8888);
// 2、把字節(jié)輸出流包裝成一個(gè)數(shù)據(jù)輸出流(DataOutputStream可以做分段數(shù)據(jù)發(fā)送)
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
// 3、先發(fā)送上傳文件的后綴給服務(wù)器
dos.writeUTF(".jpg");
// 4、把文件數(shù)據(jù)發(fā)送給服務(wù)端進(jìn)行接收
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) > 0){
dos.write(buffer,0,len);
}
dos.flush();
socket.shutdownOutput();// 通知服務(wù)端,客戶端發(fā)送完畢
}catch (Exception e){
e.printStackTrace();
}
}
}
服務(wù)端代碼:
public class Server {
public static void main(String[] args) {
try{
ServerSocket ss = new ServerSocket(8888);
while (true){
Socket socket = ss.accept();
// 交給一個(gè)獨(dú)立的線程來處理與這個(gè)客戶端的文件通信需求
new ServerReadThread(socket).start();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
Socket 線程處理類:
import java.io.DataInputStream;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.UUID;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 14:46
*/
public class ServerReadThread extends Thread{
private Socket socket;
public ServerReadThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try{
// 1、得到一個(gè)數(shù)據(jù)輸入流來讀取客戶端發(fā)送過來的睡
DataInputStream dis = new DataInputStream(socket.getInputStream());
// 2、讀取客戶端發(fā)送過來的文件類型
String suffix = dis.readUTF();
System.out.println("服務(wù)端已經(jīng)成功接收到了文件類型:" + suffix);
// 3、定義一個(gè)字節(jié)輸出管道,負(fù)責(zé)把客戶端發(fā)來的文件數(shù)據(jù)寫出去
OutputStream os = new FileOutputStream("/Library/Soft/data/io/" + UUID.randomUUID().toString() + suffix);
// 4、從數(shù)據(jù)輸入流中讀取文件數(shù)據(jù),寫出到字節(jié)輸出流中去
byte[] buffer = new byte[1024];
int len;
while ((len = dis.read(buffer)) > 0){
os.write(buffer,0,len);
}
os.close();
System.out.println("服務(wù)端接收文件保存成功");
}catch (Exception e){
}
}
}
7BIO 端口轉(zhuǎn)發(fā)
需求:實(shí)現(xiàn)一個(gè)群聊,即一個(gè)客戶端消息可以發(fā)送給所有客戶端接收
客戶端代碼:
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 15:28
*/
public class Client {
public static void main(String[] args) {
try{
// 1、請(qǐng)求與服務(wù)端的Socket對(duì)象連接
Socket socket = new Socket("localhost", 9999);
// 收消息
Thread clientThread = new ClientReaderThread(socket);
clientThread.start();
while (true){
// 發(fā)消息
OutputStream os = socket.getOutputStream();
PrintStream ps = new PrintStream(os);
// 3、使用循環(huán)不斷發(fā)送消息給服務(wù)端接收
Scanner sc = new Scanner(System.in);
String msg = sc.nextLine();
ps.println(msg);
ps.flush();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
客戶端線程處理類:
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
/**
* TODO 客戶端線程處理
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 15:31
*/
public class ClientReaderThread extends Thread {
private Socket socket;
public ClientReaderThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try{
while (true) {
InputStream is = socket.getInputStream();
BufferedReader br = new BufferedReader(new InputStreamReader(is));
String msg;
// while ((msg = br.readLine()) != null){
// System.out.println(msg);
// }
if ((msg = br.readLine()) != null){
System.out.println(msg);
}
}
}catch (Exception e){
e.printStackTrace();
}
}
}
服務(wù)端代碼:
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 15:46
*/
public class Server {
// 定義一個(gè)靜態(tài)集合,用于存放在線的用戶
public static List<Socket> allSocketOnLine = new ArrayList<>();
public static void main(String[] args) {
try{
ServerSocket ss = new ServerSocket(9999);
while (true){
Socket socket = ss.accept();
// 登錄成功的客戶端存入在線集合
allSocketOnLine.add(socket);
// 為當(dāng)前登錄成功的socket分配一個(gè)獨(dú)立的線程來處理與之通信
new ServerReaderThread(socket).start();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
服務(wù)端線程處理類:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;
/**
* TODO
*
* @author ss_419
* @version 1.0
* @date 2023/8/23 15:38
*/
public class ServerReaderThread extends Thread {
private Socket socket;
public ServerReaderThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try{
// 1、從socket中獲取當(dāng)前客戶端的輸入流
BufferedReader br = new BufferedReader(
new InputStreamReader(
socket.getInputStream()));
String msg;
while ((msg = br.readLine()) != null){
System.out.println("服務(wù)器收到消息 :" + msg);
// 2、服務(wù)端收到了客戶端的消息后,需要推送給所有的當(dāng)前在線的socket
sendMsgToAllClient(msg,socket);
}
}catch (Exception e){
e.printStackTrace();
System.out.println("當(dāng)前有人下線了!");
// 從在線socket集合中移出本socket
Server.allSocketOnLine.remove(socket);
}
}
/**
* 把當(dāng)前客戶端發(fā)送來的消息推送給全部在線的socket
* @param msg
* @param socket
*/
private void sendMsgToAllClient(String msg, Socket socket) throws IOException {
for (Socket sk : Server.allSocketOnLine){
// 只發(fā)送給除自己意外的其他客戶端
if (socket != sk){
PrintStream ps = new PrintStream(sk.getOutputStream());
ps.println(msg);
ps.flush();
}
}
}
}
8BIO即時(shí)通訊
基于BIO模式下的即時(shí)通信,我們需要解決客戶端到客戶端的通信,也就是需要實(shí)現(xiàn)客戶端與客戶端的端口消息轉(zhuǎn)發(fā)邏輯
posted on 2023-08-24 22:09 JavaCoderPan 閱讀(65) 評(píng)論(0) 收藏 舉報(bào)
浙公網(wǎng)安備 33010602011771號(hào)