epoll单台设备支持百万并发连接

news/2024/7/9 16:20:08 标签: epoll, pthread_create, select

一些概念:

linux下一切接文件,文件描述符fd,文件I/O(包含socket,文本文件等),I/O多路复用,reactor模型,水平触发,边沿触发,多线程模型,阻塞和非阻塞,同步和异步

设备有限,目前实测能达到 11万并发连接。当客户端端口耗尽退出时,服务端异常退出待解决。 

#include<stdio.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<fcntl.h>
#include<sys/epoll.h>
#include<errno.h>
#include<string.h>
#include<stdlib.h>
#include<assert.h>
#include<unistd.h>

#define BUFFER_LEN 128
#define MAX_CON 1024
#define EVENTS_LEN 128
#define ITEM_LEN 4096

struct sock_item {
    int fd;

    char *rbuffer;
    int rlength;

    char *wbuffer;
    int wlength;

    int event;

    void (*recv_cb)(int fd, char *buffer, int length);
    void (*send_cb)(int fd, char *buffer, int length);

    void (*accept_cb)(int fd, char *buffer, int length);
};

struct eventblock {
    struct sock_item *items;
    struct eventblock *next;
};

struct reactor {
    int epfd;
    int blkcnt;
    struct eventblock *evblk;
};

int reactor_alloc(struct reactor *r){
    if(!r) return -1;
    
    struct eventblock *blk = r->evblk;
    while(blk != NULL && blk->next != NULL) blk = blk->next;

    struct sock_item *item = (struct sock_item *)calloc(1, ITEM_LEN * sizeof(struct sock_item));
    if(!item){
        printf("no memory ret:%d %s\n", errno, strerror(errno));
        return -1;
    }

    struct eventblock *block = (struct eventblock *)calloc(1, sizeof(struct eventblock));
    if(!block) {
        free(item);
        printf("no memory ret:%d %s\n", errno, strerror(errno));
        return -1;
    }

    
    block->items = item;
    block->next = NULL;

    //blk == NULL 时表示首个节点
    if(!blk) 
        r->evblk = block;
    else 
        blk->next = block;

    r->blkcnt++;

    printf("create block:%p, cnt:%d\n", block, r->blkcnt);
    
    return 0;
}

struct sock_item* reactor_lookup(struct reactor *r, int sockfd){
    if(!r || !r->evblk || sockfd <= 0) return NULL;

    int ret;
    int blkidx = sockfd / ITEM_LEN;
    while(blkidx >= r->blkcnt){
        ret = reactor_alloc(r);
        if(ret != 0) return NULL;
    }

    int i = 0;
    struct eventblock *blk = r->evblk;
    while(i++ < blkidx) blk = blk->next;
    
    return &blk->items[sockfd % ITEM_LEN];
}

void *routine(void *arg){
    int ret;
    int clientfd = *(int *)arg;
    while(1){
        unsigned char buf[BUFFER_LEN] = {0};
        ret = recv(clientfd, buf, BUFFER_LEN, 0);
        if(ret == 0){  //客户端close,这里不做处理会产生 close_wait 状态
            close(clientfd);
            break;
        }
        printf("clientfd:%d buf:%s %d\n", clientfd, buf, ret);
        ret = send(clientfd, buf, ret, 0);
        printf("clientfd send %d\n", ret);
    }
}

int main(){
    //socket 插座,理解 socketfd 是插,bind sockaddr_in 是座
    int ret;
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if(listenfd == -1) return -1;

    struct sockaddr_in servaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);

    ret = bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
    if(ret == -1) return -2;

    //fd 默认阻塞
    //设置非阻塞
#if 0
    int flag = fcntl(listenfd, F_GETFL, 0);
    flag |= O_NONBLOCK;
    fcntl(listenfd, F_SETFL, flag);
#endif
    
    listen(listenfd, 10); //类似酒店迎宾的人

#if 0
    struct sockaddr_in client;
    socklen_t len = sizeof(struct sockaddr_in);
    int clientfd = accept(listenfd, (struct sockaddr *)&client, &len);

    while(1){
        unsigned char buf[BUFFER_LEN] = {0};
        ret = recv(clientfd, buf, BUFFER_LEN, 0);
        printf("clientfd:%d %s %d\n", clientfd, buf, ret);
        ret = send(clientfd, buf, ret, 0);
        printf("clientfd send %d\n", ret);
    }
    
#elif 0
    while(1){
        struct sockaddr_in client;
        socklen_t len = sizeof(struct sockaddr_in);
        //类似酒店服务员
        int clientfd = accept(listenfd, (struct sockaddr *)&client, &len);
        pthread_t tid;
        pthread_create(&tid, NULL, routine, &clientfd);
    }

#elif 0  //<apue> pg404 select
    fd_set rfds, wfds, rset, wset;
    FD_ZERO(&rfds);
    FD_SET(listenfd, &rfds);

    FD_ZERO(&wfds);

    int maxfd = listenfd; //对 maxfd 的理解?内核 bitmap 的下标上限?

    unsigned char buf[MAX_CON][BUFFER_LEN + 1] = {{0}};
    while(1){
        rset = rfds;
        wset = wfds;
        //内核循环上限       可读集合     可写集合     出错   超时时间
        select(maxfd + 1, &rset, &wset, NULL, NULL); //只监听listen的读事件
        if(FD_ISSET(listenfd, &rset)){
            printf("listenfd event:%d\n", listenfd);

            //listen 已经有读事件,accpet该连接
            struct sockaddr_in client;
            socklen_t len = sizeof(struct sockaddr_in);
            int clientfd = accept(listenfd, (struct sockaddr *)&client, &len); //accept会清空listen读事件

            FD_SET(clientfd, &rfds);    //设置客户端的监听读事件
            if(clientfd > maxfd) maxfd = clientfd;
        }

        int i = 0;
        
        for(i = listenfd + 1; i <= maxfd; ++i){
            if(i >= MAX_CON) {
                printf("over fd %d\n", i);
                continue;
            }
            if(FD_ISSET(i, &rset)){
                
                ret = recv(i, buf[i], BUFFER_LEN, 0);
                if(ret == 0){
                    close(i);
                    FD_CLR(i, &rfds);
                    printf("close fd:%d\n", i);
                    continue;
                }
                printf("clientfd:%d %s %d\n", i, buf[i], ret);
                FD_SET(i, &wfds); //监听是否可写
            }

            if(FD_ISSET(i, &wset)){
                ret = send(i, buf[i], ret, 0);
                printf("clientfd:%d send %d\n", i, ret);
                FD_CLR(i, &wfds); //清空监听写
            }
        }
    }

#elif 0  //搜索系统调用 SYSCALL_DEFINE(N) N表示该系统调用的参数个数
    int epfd = epoll_create(1);

    struct epoll_event ev, events[EVENTS_LEN];
    ev.events = EPOLLIN;  //这里选择水平还是边沿触发;如果是边沿触发,怎么做?(while accept listenfd 读事件)
    ev.data.fd = listenfd;

    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //将listenfd加入到红黑树中, 边沿触发监听listenfd的读事件

    char buf[BUFFER_LEN] = {0};
    while(1){
        int nready = epoll_wait(epfd, events, EVENTS_LEN, -1); //-1阻塞,0立刻返回, 1000为1秒返回
        printf("epoll_wait ret ready:%d\n", nready);
    
        int i = 0;
        for(i = 0; i < nready; ++i){
            int clientfd = events[i].data.fd;
            if(listenfd == clientfd){
                //accept 建立连接
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int connfd = accept(listenfd, (struct sockaddr *)&client, &len);

                printf("accept connfd:%d\n", connfd);
                ev.events =  EPOLLET | EPOLLIN;  //水平触发 和 边沿触发 的区别?
                ev.data.fd = connfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);
            }else if(events[i].events & EPOLLIN){
                
                ret = recv(clientfd, buf, BUFFER_LEN, 0);
                if(ret == 0){
                    epoll_ctl(epfd, EPOLL_CTL_DEL, clientfd, &events[i]);
                    close(clientfd);
                    printf("clientfd:%d closed\n", clientfd);
                    //continue;
                }
                ev.events = EPOLLOUT; //改为监听可写事件
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
            }else if(events[i].events & EPOLLOUT){
                ret = send(clientfd, buf, ret, 0);
                if(ret <= 0){ 
                    char *err = strerror(errno);
                    printf("clientfd:%d closed ? ret %d errno %d:%s\n", 
                                    clientfd, ret, errno, err == NULL ? "unknow": err);
                }
                printf("clientfd:%d send:%s num:%d\n", clientfd, buf, ret);
                ev.events = EPOLLIN; //改为监听可写事件
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_MOD, clientfd, &ev);
            }
        }
    }

#elif 1  //reactor 百万并发实现?
    struct reactor *r = (struct reactor *)calloc(1, sizeof(struct reactor));
    if(!r) {
        printf("memory failed\n");
        return -1;
    }
    
    ret = reactor_alloc(r);
    if(ret != 0) return -1;
    
    r->epfd = epoll_create(1);

    struct epoll_event ev;
    struct epoll_event events[EVENTS_LEN]; //准备好的事件缓冲区
    ev.events = EPOLLIN; //默认水平触发
    ev.data.fd = listenfd;
    epoll_ctl(r->epfd, EPOLL_CTL_ADD, listenfd, &ev);

    while(1){
        int nready = epoll_wait(r->epfd, events, EVENTS_LEN, -1); //-1阻塞,0立刻返回, 1000为1秒返回
        printf("epoll_wait ret ready:%d\n", nready);
    
        int i = 0;
        for(i = 0; i < nready; ++i){
            
            int clientfd = events[i].data.fd;
        
            if(listenfd == clientfd){
                //accept 建立连接
                struct sockaddr_in client;
                socklen_t len = sizeof(client);
                int connfd = accept(listenfd, (struct sockaddr *)&client, &len);

                printf("accept connfd:%d\n", connfd);
                if(connfd <= 0) continue;
                
                ev.events =  EPOLLIN;  //水平触发 和 边沿触发 的区别?
                ev.data.fd = connfd;
                epoll_ctl(r->epfd, EPOLL_CTL_ADD, connfd, &ev);

#if 0
                //填充 sock_item
                assert(r->items[connfd].rbuffer == NULL);
                r->items[connfd].rbuffer = (char *)calloc(1, BUFFER_LEN);
                r->items[connfd].rlength = 0;

                assert(r->items[connfd].wbuffer == NULL);
                r->items[connfd].wbuffer = (char *)calloc(1, BUFFER_LEN);
                r->items[connfd].wlength = 0;

                r->items[connfd].event = EPOLLIN;

                assert(r->items[connfd].fd == 0);
                r->items[connfd].fd = connfd;
#endif

                struct sock_item *item = reactor_lookup(r, connfd);
                if(item == NULL) {
                    printf("error lookup item\n");
                    continue;
                }
                
                item->fd = connfd;
                assert(item->rbuffer == NULL);
                item->rbuffer = calloc(1, BUFFER_LEN);
                item->rlength = 0;

                assert(item->wbuffer == NULL);
                item->wbuffer = calloc(1, BUFFER_LEN);
                item->wlength = 0;

                printf("get item:%p, cfd:%d\n", item, item->fd);
                
            }else if(events[i].events & EPOLLIN){                
                struct sock_item *item = reactor_lookup(r, clientfd);
                printf("opr recv item:%p, item->fd:%d, connfd:%d\n", item, item->fd, clientfd);
                assert(item != NULL && item->fd == clientfd);
                
                char *rbuf = item->rbuffer;
                char *wbuf = item->wbuffer;

                assert(rbuf != NULL && wbuf != NULL);
                ret = recv(item->fd, rbuf, BUFFER_LEN, 0);
                
                if(ret == 0){
                    epoll_ctl(r->epfd, EPOLL_CTL_DEL, item->fd, &events[i]);

                    free(item->rbuffer); item->rbuffer = NULL;                
                    free(item->wbuffer); item->wbuffer = NULL;
                    
                    close(item->fd);
                    
                    printf("clientfd:%d closed\n", item->fd);
                    item->fd = 0;
                    
                    continue;
                }
                if(ret < 0) {
                    printf("clientfd:%d recv error ret:%d %s\n", item->fd, ret, strerror(errno));
                    continue;
                }

                item->rlength = ret;
    
                memcpy(wbuf, rbuf, item->rlength);
                item->wlength = ret;

                printf("opr recv item:%p, clientfd:%d send:%s num:%d\n", item, item->fd, rbuf, ret);
                
                ev.events = EPOLLOUT; //改为监听可写事件
                ev.data.fd = item->fd;
                epoll_ctl(r->epfd, EPOLL_CTL_MOD, item->fd, &ev);
                
            }else if(events[i].events & EPOLLOUT){
                struct sock_item *item = reactor_lookup(r, clientfd);
                
                printf("opr send item:%p, item->fd:%d, connfd:%d\n", item, item->fd, clientfd);
                assert(item != NULL && item->fd == clientfd);
                char *wbuf = item->wbuffer;

                assert(wbuf != NULL);
                ret = send(item->fd, wbuf, item->wlength, 0);
                if(ret <= 0){ 
                    char *err = strerror(errno);
                    printf("clientfd:%d closed ? ret %d errno %d:%s\n", 
                                    item->fd, ret, errno, err == NULL ? "unknow": err);
                    continue;
                }
                printf("opr send item:%p clientfd:%d send:%s num:%d\n", item, item->fd, wbuf, ret);
                ev.events = EPOLLIN; //改为监听可写事件
                ev.data.fd = item->fd;
                epoll_ctl(r->epfd, EPOLL_CTL_MOD, item->fd, &ev);
            }
        }
    }
    
#endif
    
    return 0;
}


http://www.niftyadmin.cn/n/63169.html

相关文章

【C++从入门到放弃】初识C++(基础知识入门详解)

&#x1f9d1;‍&#x1f4bb;作者&#xff1a; 情话0.0 &#x1f4dd;专栏&#xff1a;《C从入门到放弃》 &#x1f466;个人简介&#xff1a;一名双非编程菜鸟&#xff0c;在这里分享自己的编程学习笔记&#xff0c;欢迎大家的指正与点赞&#xff0c;谢谢&#xff01; C基础…

C++之可调用对象、bind绑定器和function包装器

可调用对象在C中&#xff0c;可以像函数一样调用的有&#xff1a;普通函数、类的静态成员函数、仿函数、lambda函数、类的非静态成员函数、可被转换为函数的类的对象&#xff0c;统称可调用对象或函数对象。可调用对象有类型&#xff0c;可以用指针存储它们的地址&#xff0c;可…

Python:青蛙跳杯子(BFS)

题目描述 X 星球的流行宠物是青蛙&#xff0c;一般有两种颜色&#xff1a;白色和黑色。 X 星球的居民喜欢把它们放在一排茶杯里&#xff0c;这样可以观察它们跳来跳去。 如下图&#xff0c;有一排杯子&#xff0c;左边的一个是空着的&#xff0c;右边的杯子&#xff0c;每个…

NSSCTF Round#8 web专项赛

文章目录MyPage方法一&#xff1a; pearcmd.php方法二&#xff1a;多级连接绕过方法三&#xff1a; PHP Base64 Filter 宽松解析MyDoorUpload_gogoggoez_nodeMyPage Where is my page&#xff1f; 拿到题目就是这个样子 感觉就是文件包含 可以读取&#xff0c;可以用filter协议…

独立产品灵感周刊 DecoHack #047 - 安卓手机上最有用的APP

本周刊记录有趣好玩的独立产品设计开发相关内容&#xff0c;每周发布&#xff0c;往期内容同样精彩&#xff0c;感兴趣的伙伴可以点击订阅我的周刊。为保证每期都能收到&#xff0c;建议邮件订阅。欢迎通过 Twitter 私信推荐或投稿。&#x1f4bb; 产品推荐 1. Bouncer Tempor…

vim编辑器和gcc/g++编译器和gdb调试器和make/makefile自动化构建工具的使用

vim的三种模式(其实有好多模式 )&#xff08;1&#xff09;.命令模式&#xff08;2&#xff09;.插入模式&#xff08;3&#xff09;.底行模式vim的基本操作vim的命令模式的基本操作vim的插入模式的基本操作vim的底行模式的基本操作vim的配置gcc和g相关操作&#xff08;1&#…

C语言学习笔记(七): 指针的使用

指针变量 指针是一种特殊的变量&#xff0c;它存储的是某个变量的内存地址。指针变量可以存储内存地址&#xff0c;并且通过指针变量可以间接操作内存中的数据 include <stdio.h> int main() {int a1, * p; //定义指针变量,*是指针运算符p &a; //把a的地…

MySQL性能优化六 事物隔离级别与锁机制

概述 我们的数据库一般都会并发执行多个事务&#xff0c;多个事务可能会并发的对相同的一批数据进行增删改查操作&#xff0c;可能就会导致我们说的脏写、脏读、不可重复读、幻读这些问题。 这些问题的本质都是数据库的多事务并发问题&#xff0c;为了解决多事务并发问题&#…