一個簡單的百萬并發的TCP服務器的實現。
我們緊接著上篇文章,看看我們上節課的代碼有什么問題?
可以明顯的看出來上節課的代碼公用了一個同樣的緩沖區進行讀寫,正常的情況下我們需要封裝一個結構體,讓每個對應的客戶端的FD都有獨立的結構進行讀寫還有接收連接。
具體的結構如下:
struct sock_item
{
//客戶端的fd
int fd;
//讀取的緩沖區
char * rbuffer;
//緩沖區的大小
int rlength;
//寫入的緩沖區
char * wbuffer;
//寫入的緩沖區大小
int wlength;
//處理的事件類型
int event;
//回調函數
void (*recv_cb)(int fd,char * buffer,int length);
void (*send_cb)(int fd,char *buffer,int length);
void (*accept_cb)(int fd,char * buffer,int length);
};
struct eventblock
{
struct sock_item * items;
struct eventblock*next;
};
struct reactor
{
int epfd;
int blkcnt;
struct eventblock *evblk;
};
首先看一下我的思路,如圖所示

我們有一個總的reactor結構保存我們多個事件域。事件域中保存這我們的所有連接的客戶端,而事件域就是一個鏈表的數據結構,我們無法知道我們要連入多少臺客戶端,因此我們要使用鏈表,當有超過我們的設置的1024個sock_item項,就再去申請一個新的事件域,保存新的sockitem。
上面,暫時解決了連接了很多個客戶端FD的保存的問題,還有一個問題,就是我們一個IP端口可以連接的最大數為65535.
我們如何解決那?
一個連接都有5個元素來確定,一個源IP,一個源端口,一個目的IP,一個目的端口,協議的種類。
這里我們有3個方式來解決這個問題
第一個就是通過有多個網卡,就可以接受多個客戶端的接入。
第二個就是通過綁定多個端口的方式,也可以接受多個客戶端的接入。
第三個就是通過多個目的ip的端口更改,也是可以解決這個問題的,
我這個例子就是通過使用綁定多個端口來實現的。
初始化100個端口的代碼
int init_server(short port){
int listenfd= socket(AF_INET,SOCK_STREAM,0);
if (listenfd==-1)
{
return -1;
}
struct sockaddr_in servaddr;
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
servaddr.sin_port=htons(port);
if (-1==bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)))
{
return -2;
}
//設置非阻塞
int flag = fcntl(listenfd,F_GETFL,0);
flag |=O_NONBLOCK;
fcntl(listenfd,F_SETFL,flag);
listen(listenfd,10);
return listenfd;
}
判斷是否是監聽的FD的代碼
int is_listenfd(int *fds,int connfd){
int i=0;
for (int i = 0; i < PORT_COUNT; i++)
{
if (fds[i] == connfd) {
return 1;
}
}
return 0;
}
現在初始化我們ractor結構并保存我們的fd。(主要就是鏈表的操作)
int reactor_resize(struct reactor *r ){
if (r==NULL)
{
return -1;
}
struct eventblock* blk=r->evblk;
while (blk!=NULL && blk->next!=NULL)
{
blk=blk->next;
}
struct sock_item * item=(struct sock_item *)malloc(ITEM_LENGTH * sizeof(struct sock_item));
if (item==NULL)
{
return -2;
}
memset(item,0,ITEM_LENGTH*sizeof(struct sock_item));
//鏈表增加的操作
struct eventblock * block=malloc(sizeof(struct eventblock));
if (block==NULL)
{
free(item);
return -3;
}
memset(block,0,sizeof(struct eventblock));
block->items=item;
block->next=NULL;
if (blk==NULL)
{
r->evblk=block;
}else
{
blk->next = block;
}
r->blkcnt ++;
return 0;
}
struct sock_item * reactor_lookup(struct reactor * r ,int sockfd)
{
if (r==NULL)
{
return NULL;
}
printf("currrent eventblock num:%d\n",r->blkcnt);
int blkidx=sockfd/ITEM_LENGTH;
while (blkidx >= r->blkcnt)
{
//如果當前的數目超過我們的最大數目 r->blkcnt * 1024; 重新申請一個block_event;
reactor_resize(r);
}
int i = 0;
struct eventblock* blk=r->evblk;
while (i ++ < blkidx && blk != NULL) {
blk = blk->next;
}
return &blk->items[sockfd % ITEM_LENGTH];
}
最后就是我們整個的一個代碼
#include<stdio.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<fcntl.h>
#include <unistd.h>
#include<sys/epoll.h>
#include <string.h>
#include <stdlib.h>
#define BUFFER_LENGTH 128
#define EVENTS_LENGTH 128
#define PORT_COUNT 200
#define ITEM_LENGTH 1024
struct sock_item
{
//客戶端的fd
int fd;
//讀取的緩沖區
char * rbuffer;
//緩沖區的大小
int rlength;
//寫入的緩沖區
char * wbuffer;
//寫入的緩沖區大小
int wlength;
//處理的事件類型
int event;
//回調函數
void (*recv_cb)(int fd,char * buffer,int length);
void (*send_cb)(int fd,char *buffer,int length);
void (*accept_cb)(int fd,char * buffer,int length);
};
struct eventblock
{
struct sock_item * items;
struct eventblock*next;
};
struct reactor
{
int epfd;
int blkcnt;
struct eventblock *evblk;
};
int init_server(short port){
int listenfd= socket(AF_INET,SOCK_STREAM,0);
if (listenfd==-1)
{
return -1;
}
struct sockaddr_in servaddr;
servaddr.sin_family=AF_INET;
servaddr.sin_addr.s_addr=htonl(INADDR_ANY);
servaddr.sin_port=htons(port);
if (-1==bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)))
{
return -2;
}
//設置非阻塞
int flag = fcntl(listenfd,F_GETFL,0);
flag |=O_NONBLOCK;
fcntl(listenfd,F_SETFL,flag);
listen(listenfd,10);
return listenfd;
}
int is_listenfd(int *fds,int connfd){
int i=0;
for (int i = 0; i < PORT_COUNT; i++)
{
if (fds[i] == connfd) {
return 1;
}
}
return 0;
}
int reactor_resize(struct reactor *r ){
if (r==NULL)
{
return -1;
}
struct eventblock* blk=r->evblk;
while (blk!=NULL && blk->next!=NULL)
{
blk=blk->next;
}
struct sock_item * item=(struct sock_item *)malloc(ITEM_LENGTH * sizeof(struct sock_item));
if (item==NULL)
{
return -2;
}
memset(item,0,ITEM_LENGTH*sizeof(struct sock_item));
//鏈表增加的操作
struct eventblock * block=(struct eventblock*)malloc(sizeof(struct eventblock));
if (block==NULL)
{
free(item);
return -3;
}
memset(block,0,sizeof(struct eventblock));
block->items=item;
block->next=NULL;
if (blk==NULL)
{
r->evblk=block;
}else
{
blk->next = block;
}
r->blkcnt ++;
return 0;
}
struct sock_item * reactor_lookup(struct reactor * r ,int sockfd)
{
if (r==NULL)
{
return NULL;
}
printf("currrent eventblock num:%d\n",r->blkcnt);
int blkidx=sockfd/ITEM_LENGTH;
while (blkidx >= r->blkcnt)
{
//如果當前的數目超過我們的最大數目 r->blkcnt * 1024; 重新申請一個block_event;
reactor_resize(r);
}
int i = 0;
struct eventblock* blk=r->evblk;
while (i ++ < blkidx && blk != NULL) {
blk = blk->next;
}
return &blk->items[sockfd % ITEM_LENGTH];
}
int main()
{
int ret;
//申請 reactor 結構
struct reactor * r= (struct reactor * )calloc(1,sizeof(struct reactor));
if (r==NULL)
{
return -3;
}
//開始進行EPOLL的創建
r->epfd= epoll_create(1);
struct epoll_event ev, events[EVENTS_LENGTH];
int sockfds[PORT_COUNT] = {0};
for (int i = 0; i < PORT_COUNT; i++)
{
sockfds[i] = init_server(9999 + i);
ev.events = EPOLLIN;
ev.data.fd = sockfds[i];
epoll_ctl(r->epfd, EPOLL_CTL_ADD, sockfds[i], &ev);
}
//接下來開始接受 我們的客戶端的連接請求
while (1)
{
//我們需要詳細講解一下這個函數的里面的各個參數的意義 ,以及它什么時候是阻塞的,什么時候是非阻塞的,
//第一個參數我們的EPFD的文件描述符,第二個我們的接收事件的緩沖器,第三個是我們事件數量的多少,最后一個參數就是我們等待的時長了。
//當是-1的時候就是一直等待連接的意思,沒有連接就會 一直被阻塞住,
//當是0的時候就是一直有連接直接返回的意思,
//當是大于0的數的時候,就是在輪詢查看是否有事件的時長,單位是MS。
int nready = epoll_wait(r->epfd,events,EVENTS_LENGTH,-1);
printf("----------%d\n",nready);
//開始遍歷我們的事件
int i =0;
for (int i = 0; i < nready; i++)
{
int clientfd=events[i].data.fd;
if (is_listenfd(sockfds,clientfd))
{
//如果是我們的監聽的FD,說明是有客戶端連入的事件
struct sockaddr_in client;
socklen_t len=sizeof(client);
//接受客戶端的請求,
int connfd=accept(clientfd,(struct sockaddr*)&client,&len);
if (connfd==-1)
{
break;
}
int flag = fcntl(connfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(connfd, F_SETFL, flag);
//增加到我們的快遞柜中
ev.events=EPOLLIN;
ev.data.fd=connfd;
epoll_ctl(r->epfd,EPOLL_CTL_ADD,connfd,&ev);
//如果是讀的請求
struct sock_item *item = reactor_lookup(r, connfd);
item->fd = connfd;
item->rbuffer = (char*)calloc(1, BUFFER_LENGTH);
item->rlength = 0;
item->wbuffer = (char*)calloc(1, BUFFER_LENGTH);
item->wlength = 0;
}
else if (events[i].events & EPOLLIN)
{
struct sock_item *item = reactor_lookup(r, clientfd);
char *rbuffer = item->rbuffer;
char *wbuffer = item->wbuffer;
int n = recv(clientfd, rbuffer, BUFFER_LENGTH, 0);
if (n > 0) {
//rbuffer[n] = '\0';
printf("recv: %s, n: %d\n", rbuffer, n);
memcpy(wbuffer, rbuffer, BUFFER_LENGTH);
ev.events = EPOLLOUT;
ev.data.fd = clientfd;
epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev);
} else if (n == 0) {
free(rbuffer);
free(wbuffer);
item->fd = 0;
close(clientfd);
}
}
else if(events[i].events & EPOLLOUT)
{
struct sock_item *item = reactor_lookup(r, clientfd);
char *wbuffer = item->wbuffer;
int sent = send(clientfd, wbuffer, BUFFER_LENGTH, 0); //
printf("sent: %d\n", sent);
ev.events = EPOLLIN;
ev.data.fd = clientfd;
epoll_ctl(r->epfd, EPOLL_CTL_MOD, clientfd, &ev);
}
}
}
return 0;
}
推薦一個零聲學院免費教程,個人覺得老師講得不錯,
分享給大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,
fastdfs,MongoDB,ZK,流媒體,CDN,P2P,K8S,Docker,
TCP/IP,協程,DPDK等技術內容,點擊立即學習:
服務器
音視頻
dpdk
Linux內核

浙公網安備 33010602011771號