一些概念:
linux下一切接文件,文件描述符fd,文件I/O(包含socket,文本文件等),I/O多路复用,reactor模型,水平触发,边沿触发,多线程模型,阻塞和非阻塞,同步和异步
设备有限,目前实测能达到 11万并发连接。当客户端端口耗尽退出时,服务端异常退出待解决。
#include<stdio.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<errno.h>
#include<string.h>
#include<stdlib.h>
#include<assert.h>
#include<unistd.h>
#define BUFFER_LEN 128
#define MAX_CON 1024
#define EVENTS_LEN 128
#define ITEM_LEN 4096
struct sock_item {
int fd;
char *rbuffer;
int rlength;
char *wbuffer;
int wlength;
int event;
void (*recv_cb)(int fd, char *buffer, int length);
void (*send_cb)(int fd, char *buffer, int length);
void (*accept_cb)(int fd, char *buffer, int length);
};
struct eventblock {
struct sock_item *items;
struct eventblock *next;
};
struct reactor {
int epfd;
int blkcnt;
struct eventblock *evblk;
};
int reactor_alloc(struct reactor *r){
if(!r) return -1;
struct eventblock *blk = r->evblk;
while(blk != NULL && blk->next != NULL) blk = blk->next;
struct sock_item *item = (struct sock_item *)calloc(1, ITEM_LEN * sizeof(struct sock_item));
if(!item){
printf("no memory ret:%d %s\n", errno, strerror(errno));
return -1;
}
struct eventblock *block = (struct eventblock *)calloc(1, sizeof(struct eventblock));
if(!block) {
free(item);
printf("no memory ret:%d %s\n", errno, strerror(errno));
return -1;
}
block->items = item;
block->next = NULL;
//blk == NULL 时表示首个节点
if(!blk)
r->evblk = block;
else
blk->next = block;
r->blkcnt++;
printf("create block:%p, cnt:%d\n", block, r->blkcnt);
return 0;
}
struct sock_item* reactor_lookup(struct reactor *r, int sockfd){
if(!r || !r->evblk || sockfd <= 0) return NULL;
int ret;
int blkidx = sockfd / ITEM_LEN;
while(blkidx >= r->blkcnt){
ret = reactor_alloc(r);
if(ret != 0) return NULL;
}
int i = 0;
struct eventblock *blk = r->evblk;
while(i++ < blkidx) blk = blk->next;
return &blk->items[sockfd % ITEM_LEN];
}
void *routine(void *arg){
int ret;
int clientfd = *(int *)arg;
while(1){
unsigned char buf[BUFFER_LEN] = {0};
ret = recv(clientfd, buf, BUFFER_LEN, 0);
if(ret == 0){ //客户端close,这里不做处理会产生 close_wait 状态
close(clientfd);
break;
}
printf("clientfd:%d buf:%s %d\n", clientfd, buf, ret);
ret = send(clientfd, buf, ret, 0);
printf("clientfd send %d\n", ret);
}
}
int main(){
//socket 插座,理解 socketfd 是插,bind sockaddr_in 是座
int ret;
int listenfd = socket(AF_INET, SOCK_STREAM, 0);
if(listenfd == -1) return -1;
struct sockaddr_in servaddr;
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(9999);
ret = bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
if(ret == -1) return -2;
//fd 默认阻塞
//设置非阻塞
#if 0
int flag = fcntl(listenfd, F_GETFL, 0);
flag |= O_NONBLOCK;
fcntl(listenfd, F_SETFL, flag);
#endif
listen(listenfd, 10); //类似酒店迎宾的人
#if 0
struct sockaddr_in client;
socklen_t len = sizeof(struct sockaddr_in);
int clientfd = accept(listenfd, (struct sockaddr *)&client, &len);
while(1){
unsigned char buf[BUFFER_LEN] = {0};
ret = recv(clientfd, buf, BUFFER_LEN, 0);
printf("clientfd:%d %s %d\n", clientfd, buf, ret);
ret = send(clientfd, buf, ret, 0);
printf("clientfd send %d\n", ret);
}
#elif 0
while(1){
struct sockaddr_in client;
socklen_t len = sizeof(struct sockaddr_in);
//类似酒店服务员
int clientfd = accept(listenfd, (struct sockaddr *)&client, &len);
pthread_t tid;
pthread_create(&tid, NULL, routine, &clientfd);
}
#elif 0 //<apue> pg404 select
fd_set rfds, wfds, rset, wset;
FD_ZERO(&rfds);
FD_SET(listenfd, &rfds);
FD_ZERO(&wfds);
int maxfd = listenfd; //对 maxfd 的理解?内核 bitmap 的下标上限?
unsigned char buf[MAX_CON][BUFFER_LEN + 1] = {{0}};
while(1){
rset = rfds;
wset = wfds;
//内核循环上限 可读集合 可写集合 出错 超时时间
select(maxfd + 1, &rset, &wset, NULL, NULL); //只监听listen的读事件
if(FD_ISSET(listenfd, &rset)){
printf("listenfd event:%d\n", listenfd);
//listen 已经有读事件,accpet该连接
struct sockaddr_in client;
socklen_t len = sizeof(struct sockaddr_in);
int clientfd = accept(listenfd, (struct sockaddr *)&client, &len); //accept会清空listen读事件
FD_SET(clientfd, &rfds); //设置客户端的监听读事件
if(clientfd > maxfd) maxfd = clientfd;
}
int i = 0;
for(i = listenfd + 1; i <= maxfd; ++i){
if(i >= MAX_CON) {
printf("over fd %d\n", i);
continue;
}
if(FD_ISSET(i, &rset)){
ret = recv(i, buf[i], BUFFER_LEN, 0);
if(ret == 0){
close(i);
FD_CLR(i, &rfds);
printf("close fd:%d\n", i);
continue;
}
printf("clientfd:%d %s %d\n", i, buf[i], ret);
FD_SET(i, &wfds); //监听是否可写
}
if(FD_ISSET(i, &wset)){
ret = send(i, buf[i], ret, 0);
printf("clientfd:%d send %d\n", i, ret);
FD_CLR(i, &wfds); //清空监听写
}
}
}
#elif 0 //搜索系统调用 SYSCALL_DEFINE(N) N表示该系统调用的参数个数
int epfd = epoll_create(1);
struct epoll_event ev, events[EVENTS_LEN];
ev.events = EPOLLIN; //这里选择水平还是边沿触发;如果是边沿触发,怎么做?(while accept listenfd 读事件)
ev.data.fd = listenfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //将listenfd加入到红黑树中, 边沿触发监听listenfd的读事件
char buf[BUFFER_LEN] = {0};
while(1){
int nready = epoll_wait(epfd, events, EVENTS_LEN, -1); //-1阻塞,0立刻返回, 1000为1秒返回
printf("epoll_wait ret ready:%d\n", nready);
int i = 0;
for(i = 0; i < nready; ++i){
int clientfd = events[i].data.fd;
if(listenfd == clientfd){
//accept 建立连接
struct sockaddr_in client;
socklen_t len = sizeof(client);
int connfd = accept(listenfd, (struct sockaddr *)&client, &len);
printf("accept connfd:%d\n", connfd);
ev.events = EPOLLET | EPOLLIN; //水平触发 和 边沿触发 的区别?
ev.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
}else if(events[i].events & EPOLLIN){
ret = recv(clientfd, buf, BUFFER_LEN, 0);
if(ret == 0){
epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]);
close(clientfd);
printf("clientfd:%d closed\n", clientfd);
//continue;
}
ev.events = EPOLLOUT; //改为监听可写事件
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
}else if(events[i].events & EPOLLOUT){
ret = send(clientfd, buf, ret, 0);
if(ret <= 0){
char *err = strerror(errno);
printf("clientfd:%d closed ? ret %d errno %d:%s\n",
clientfd, ret, errno, err == NULL ? "unknow": err);
}
printf("clientfd:%d send:%s num:%d\n", clientfd, buf, ret);
ev.events = EPOLLIN; //改为监听可写事件
ev.data.fd = clientfd;
epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
}
}
}
#elif 1 //reactor 百万并发实现?
struct reactor *r = (struct reactor *)calloc(1, sizeof(struct reactor));
if(!r) {
printf("memory failed\n");
return -1;
}
ret = reactor_alloc(r);
if(ret != 0) return -1;
r->epfd = epoll_create(1);
struct epoll_event ev;
struct epoll_event events[EVENTS_LEN]; //准备好的事件缓冲区
ev.events = EPOLLIN; //默认水平触发
ev.data.fd = listenfd;
epoll_ctl(r->epfd, EPOLL_CTL_ADD, listenfd, &ev);
while(1){
int nready = epoll_wait(r->epfd, events, EVENTS_LEN, -1); //-1阻塞,0立刻返回, 1000为1秒返回
printf("epoll_wait ret ready:%d\n", nready);
int i = 0;
for(i = 0; i < nready; ++i){
int clientfd = events[i].data.fd;
if(listenfd == clientfd){
//accept 建立连接
struct sockaddr_in client;
socklen_t len = sizeof(client);
int connfd = accept(listenfd, (struct sockaddr *)&client, &len);
printf("accept connfd:%d\n", connfd);
if(connfd <= 0) continue;
ev.events = EPOLLIN; //水平触发 和 边沿触发 的区别?
ev.data.fd = connfd;
epoll_ctl(r->epfd, EPOLL_CTL_ADD, connfd, &ev);
#if 0
//填充 sock_item
assert(r->items[connfd].rbuffer == NULL);
r->items[connfd].rbuffer = (char *)calloc(1, BUFFER_LEN);
r->items[connfd].rlength = 0;
assert(r->items[connfd].wbuffer == NULL);
r->items[connfd].wbuffer = (char *)calloc(1, BUFFER_LEN);
r->items[connfd].wlength = 0;
r->items[connfd].event = EPOLLIN;
assert(r->items[connfd].fd == 0);
r->items[connfd].fd = connfd;
#endif
struct sock_item *item = reactor_lookup(r, connfd);
if(item == NULL) {
printf("error lookup item\n");
continue;
}
item->fd = connfd;
assert(item->rbuffer == NULL);
item->rbuffer = calloc(1, BUFFER_LEN);
item->rlength = 0;
assert(item->wbuffer == NULL);
item->wbuffer = calloc(1, BUFFER_LEN);
item->wlength = 0;
printf("get item:%p, cfd:%d\n", item, item->fd);
}else if(events[i].events & EPOLLIN){
struct sock_item *item = reactor_lookup(r, clientfd);
printf("opr recv item:%p, item->fd:%d, connfd:%d\n", item, item->fd, clientfd);
assert(item != NULL && item->fd == clientfd);
char *rbuf = item->rbuffer;
char *wbuf = item->wbuffer;
assert(rbuf != NULL && wbuf != NULL);
ret = recv(item->fd, rbuf, BUFFER_LEN, 0);
if(ret == 0){
epoll_ctl(r->epfd, EPOLL_CTL_DEL, item->fd, &events[i]);
free(item->rbuffer); item->rbuffer = NULL;
free(item->wbuffer); item->wbuffer = NULL;
close(item->fd);
printf("clientfd:%d closed\n", item->fd);
item->fd = 0;
continue;
}
if(ret < 0) {
printf("clientfd:%d recv error ret:%d %s\n", item->fd, ret, strerror(errno));
continue;
}
item->rlength = ret;
memcpy(wbuf, rbuf, item->rlength);
item->wlength = ret;
printf("opr recv item:%p, clientfd:%d send:%s num:%d\n", item, item->fd, rbuf, ret);
ev.events = EPOLLOUT; //改为监听可写事件
ev.data.fd = item->fd;
epoll_ctl(r->epfd, EPOLL_CTL_MOD, item->fd, &ev);
}else if(events[i].events & EPOLLOUT){
struct sock_item *item = reactor_lookup(r, clientfd);
printf("opr send item:%p, item->fd:%d, connfd:%d\n", item, item->fd, clientfd);
assert(item != NULL && item->fd == clientfd);
char *wbuf = item->wbuffer;
assert(wbuf != NULL);
ret = send(item->fd, wbuf, item->wlength, 0);
if(ret <= 0){
char *err = strerror(errno);
printf("clientfd:%d closed ? ret %d errno %d:%s\n",
item->fd, ret, errno, err == NULL ? "unknow": err);
continue;
}
printf("opr send item:%p clientfd:%d send:%s num:%d\n", item, item->fd, wbuf, ret);
ev.events = EPOLLIN; //改为监听可写事件
ev.data.fd = item->fd;
epoll_ctl(r->epfd, EPOLL_CTL_MOD, item->fd, &ev);
}
}
}
#endif
return 0;
}