百万连接实现01:使用epoll+多线程+多IP地址管理tcp客户端集群

news/2024/7/9 16:48:57 标签: epoll, getopt, system, pthread_create, socket

操作系统采用 <客户端IP : 客户端端口> : <服务端IP : 服务端端口> 四元组来标识一条TCP连接。

所以要想实现百万连接:

  • 第一种是服务器端只开启一个进程,然后使用很多个客户端进程绑定不同的客户端 ip 来连接,假设 20个ip * 5w(端口范围是最大65535,这里保守算5w)

  • 第二种是服务器开启多个进程,这样客户端就可以只使用一个 ip 即可,原理类似。

实验说明 

 第一个实验

只是简单的创建一个连接,并使用epoll监听它的读事件

第二的实验  

1 使用任意多个线程,

2 每个线程创建任意数量的客户端

3 每个线程都有一个独立的epoll

4 各个线程管理各自的epoll

5 每个epoll管理各自所在线程创建的客户端

6 每个线程绑定一个独立的IP地址,这个IP地址是通过下面的代码创建的,这里也是使用一个进程能够创建百万连接的关键。

        snprintf(pd->ip_str, sizeof(pd->ip_str),"192.168.0.%d",host_index);
        DEBUG_INFO("pd->ip_str = %s",pd->ip_str);
        snprintf(cmd_str, sizeof(cmd_str),"sudo ip addr add %s/24 dev ens33 1>/dev/null 2>&1",pd->ip_str);        
        DEBUG_INFO("cmd_str = %s",cmd_str);
        system(cmd_str);

第一个实验:单客户端测试

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

#define _DEBUG_INFO
#ifdef _DEBUG_INFO
#define DEBUG_INFO(format, ...) printf("%s:%s:%d -- "format"\n" \
,__FILE__,__func__,__LINE__ \
, ##__VA_ARGS__)
#else
#define DEBUG_INFO(format, ...)
#endif

#define _DEBUG_PRINT
#ifdef _DEBUG_PRINT
#define DEBUG_PRINT(format,...)	printf(format,##__VA_ARGS__)	
#else
#define DEBUG_PRINT(format,...)
#endif

// ip addr add 192.168.0.12/24 dev ens33


static char buf[1024 + 1];

#define DEFAULT_CLIENT_COUNT 20

#define HOST_NUM 40

struct private_data{
    char ip_str[20];
    char server_ip[20];
    pthread_t thread_id;
    int client_count;
    struct epoll_event evlist[10000];
    uint16_t port;
};

static volatile int thread_finished_count = 0;

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void * client_thread(void * arg){
    struct epoll_event client_ev;
    int epfd;
    int client_socket;
    struct private_data *pd = (struct private_data *)arg;
    int res = 0;

    epfd = epoll_create(1);
    if(epfd < 0){
        perror("epoll_create");
        return NULL;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_port = htons(pd->port);
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(pd->server_ip);//"192.168.0.5");

    struct sockaddr_in client_addr;
    client_addr.sin_port = 0;
    client_addr.sin_family = AF_INET;
    client_addr.sin_addr.s_addr = inet_addr(pd->ip_str);
    pthread_mutex_lock(&mutex);
    for(int i = 0; i < pd->client_count; i++){
        client_socket = socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC,0);// | SOCK_NONBLOCK, 0);
        if(client_socket < 0){
            perror("socket");
            exit(1);
        }
        res = bind(client_socket,(const struct sockaddr *)&client_addr,sizeof(client_addr));
        if(res < 0){
            perror("bind");
            exit(1);
        }

        res = connect(client_socket,(const struct sockaddr *)&server_addr,sizeof(server_addr));    
        if(res != 0){
            perror("connect");
            DEBUG_INFO("connect res = %d",res);
            exit(1);
        }
        client_ev.data.fd = client_socket;
        client_ev.events = EPOLLIN;
        res = epoll_ctl(epfd, EPOLL_CTL_ADD,client_socket,&client_ev);
        if(res < 0){
            perror("connect");
            exit(1);
        }
    }
    pthread_mutex_unlock(&mutex);
    
    DEBUG_INFO("%s Connect %s:%d count=%d",
    pd->ip_str,
    pd->server_ip,
    pd->port,
    pd->client_count);
    thread_finished_count++;
    DEBUG_INFO("thread_finished_count = %d", thread_finished_count);
    while(1){
        int ready = epoll_wait(epfd,pd->evlist,sizeof(pd->evlist)/sizeof(pd->evlist[0]),-1);
        if(ready == -1){
            if(errno == EINTR){
                continue;
            }
            perror("epoll_wait");
            exit(1);
        }
        DEBUG_INFO("ready %d",ready);
        for(int i=0;i<ready;i++){
            int fd = pd->evlist[i].data.fd;
            DEBUG_INFO("fd = %d:events:%s %s %s",fd,
                        (pd->evlist[i].events & EPOLLIN)?"EPOLLIN":"",
                        (pd->evlist[i].events & EPOLLHUP)?"EPOLLHUP":"",
                        (pd->evlist[i].events & EPOLLERR)?"EPOLLERR":"");
            int read_len = read(fd,buf,sizeof(buf) - 1);
            if(read_len == -1){
                if(errno == EINTR){
                    continue;
                }else{
                    perror("Read");
                    epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
                    close(fd);
                    continue;
                }
            }
            DEBUG_INFO("read_len: %d",read_len);
            if(read_len == 0){
                DEBUG_INFO("client disconnected");
                close(fd);
                goto OUT;
            }
            buf[read_len] = '\0';
            DEBUG_INFO("buf = %s",buf);
        }
    }
OUT:
    DEBUG_INFO("bye bye");
    close(epfd);
    free(pd);
    return NULL;
}

int main(int argc, char **argv)
{
    int s;
    
    int res = 0;
    
    int client_count = 10;  
    
    char cmd_str[1024];
    int host_min_num = 12;
    int max_host_num = 2;//HOST_NUM
    int opt;
    char server_ip[20] = "192.168.0.5";

    static int thread_count = 1;
    int port = 6600;
    
    while((opt = getopt(argc,argv,"c:r:t:p:")) != -1){
        switch (opt)
        {
        case 'r':
            DEBUG_INFO("option:%c,server ip = %s",opt,optarg);
            memcpy(server_ip,optarg,strlen(optarg));
            server_ip[strlen(optarg)] = '\0';
            break;
        case 'c':
            DEBUG_INFO("option:%c client_count = %s",opt,optarg);
            client_count = atoi(optarg);
            break;
        case 'p':
            DEBUG_INFO("option:%c client_count = %s",opt,optarg);
            port = atoi(optarg);
            break;
        case 't':
            DEBUG_INFO("option:%c thread_count = %s",opt,optarg);
            thread_count = atoi(optarg);
            break;
        case ':':
            DEBUG_INFO("option:%c needs a value",opt);    
            exit(0);   
            break;
        case '?':
            DEBUG_INFO("unkown option:%c needs a value",optopt);       
            break;
        default:
            break;
        }
    }
    DEBUG_INFO("client_count = %d",client_count);
    DEBUG_INFO("server_ip = %s",server_ip);
    DEBUG_INFO("thread_count = %d",thread_count);

    //创建40个ip地址192.168.0.12 - 192.168.0.51
    for(int host_index = host_min_num; host_index < host_min_num + thread_count;host_index++){
        struct private_data *pd = (struct private_data*)malloc(sizeof(struct private_data));
        if(pd == NULL){
            perror("malloc");
            exit(0);
        }
        snprintf(pd->ip_str, sizeof(pd->ip_str),"192.168.0.%d",host_index);
        DEBUG_INFO("pd->ip_str = %s",pd->ip_str);
        snprintf(cmd_str, sizeof(cmd_str),"sudo ip addr add %s/24 dev ens33 1>/dev/null 2>&1",pd->ip_str);        
        DEBUG_INFO("cmd_str = %s",cmd_str);
        system(cmd_str);
        pd->client_count = client_count;
        pd->port = port;
        memcpy(pd->server_ip, server_ip, sizeof(server_ip));
        res = pthread_create(&pd->thread_id, NULL,client_thread,pd);
        if(res != 0){
            perror("pthread_create");
            exit(1);
        }
    }
    while(1){
        sleep(1);
    }
    return 0;
}

使用调试助手创建服务器

 使用调试助手发送数据,然后关闭服务器,测试结果:

第二个实验:客户端集群

#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/epoll.h>
#include <errno.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <pthread.h>

#define _DEBUG_INFO
#ifdef _DEBUG_INFO
#define DEBUG_INFO(format, ...) printf("%s:%s:%d -- "format"\n" \
,__FILE__,__func__,__LINE__ \
, ##__VA_ARGS__)
#else
#define DEBUG_INFO(format, ...)
#endif

#define _DEBUG_PRINT
#ifdef _DEBUG_PRINT
#define DEBUG_PRINT(format,...)	printf(format,##__VA_ARGS__)	
#else
#define DEBUG_PRINT(format,...)
#endif

// ip addr add 192.168.0.12/24 dev ens33


static char buf[1024 + 1];

#define DEFAULT_CLIENT_COUNT 20

#define HOST_NUM 40

struct private_data{
    char ip_str[20];
    char server_ip[20];
    pthread_t thread_id;
    int client_count;
    struct epoll_event evlist[10000];
    uint16_t port;
};

void * client_thread(void * arg){
    struct epoll_event client_ev;
    int epfd;
    int client_socket;
    struct private_data *pd = (struct private_data *)arg;
    int res = 0;

    epfd = epoll_create(1);
    if(epfd < 0){
        perror("epoll_create");
        return NULL;
    }

    struct sockaddr_in server_addr;
    server_addr.sin_port = htons(pd->port);
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(pd->server_ip);//"192.168.0.5");

    struct sockaddr_in client_addr;
    client_addr.sin_port = 0;
    client_addr.sin_family = AF_INET;
    client_addr.sin_addr.s_addr = inet_addr(pd->ip_str);
    
    for(int i = 0; i < pd->client_count; i++){
        client_socket = socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC,0);// | SOCK_NONBLOCK, 0);
        if(client_socket < 0){
            perror("socket");
            exit(1);
        }
        res = bind(client_socket,(const struct sockaddr *)&client_addr,sizeof(client_addr));
        if(res < 0){
            perror("bind");
            exit(1);
        }

        res = connect(client_socket,(const struct sockaddr *)&server_addr,sizeof(server_addr));    
        if(res < 0){
            perror("connect");
            exit(1);
        }
        client_ev.data.fd = client_socket;
        client_ev.events = EPOLLIN;
        res = epoll_ctl(epfd, EPOLL_CTL_ADD,client_socket,&client_ev);
        if(res < 0){
            perror("connect");
            exit(1);
        }
    }
    
    DEBUG_INFO("%s Connect %s:%d count=%d",
    pd->ip_str,
    pd->server_ip,
    pd->port,
    pd->client_count);
    while(1){
        int ready = epoll_wait(epfd,pd->evlist,sizeof(pd->evlist)/sizeof(pd->evlist[0]),-1);
        if(ready == -1){
            if(errno == EINTR){
                continue;
            }
            perror("epoll_wait");
            exit(1);
        }
        DEBUG_INFO("ready %d",ready);
        for(int i=0;i<ready;i++){
            int fd = pd->evlist[i].data.fd;
            DEBUG_INFO("fd = %d:events:%s %s %s",fd,
                        (pd->evlist[i].events & EPOLLIN)?"EPOLLIN":"",
                        (pd->evlist[i].events & EPOLLHUP)?"EPOLLHUP":"",
                        (pd->evlist[i].events & EPOLLERR)?"EPOLLERR":"");
            int read_len = read(fd,buf,sizeof(buf) - 1);
            if(read_len == -1){
                if(errno == EINTR){
                    continue;
                }else{
                    perror("Read");
                    epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
                    close(fd);
                    continue;
                }
            }
            DEBUG_INFO("read_len: %d",read_len);
            if(read_len == 0){
                DEBUG_INFO("client disconnected");
                close(fd);
                goto OUT;
            }
            buf[read_len] = '\0';
            DEBUG_INFO("buf = %s",buf);
        }
    }
OUT:
    DEBUG_INFO("bye bye");
    close(epfd);
    free(pd);
    return NULL;
}

int main(int argc, char **argv)
{
    int s;
    
    int res = 0;
    
    int client_count = 10;  
    
    char cmd_str[1024];
    int host_min_num = 12;
    int max_host_num = 2;//HOST_NUM
    int opt;
    char server_ip[20] = "192.168.0.5";

    static int thread_count = 1;
    int port = 6600;
    
    while((opt = getopt(argc,argv,"c:r:t:p:")) != -1){
        switch (opt)
        {
        case 'r':
            DEBUG_INFO("option:%c,server ip = %s",opt,optarg);
            memcpy(server_ip,optarg,strlen(optarg));
            server_ip[strlen(optarg)] = '\0';
            break;
        case 'c':
            DEBUG_INFO("option:%c client_count = %s",opt,optarg);
            client_count = atoi(optarg);
            break;
        case 'p':
            DEBUG_INFO("option:%c client_count = %s",opt,optarg);
            port = atoi(optarg);
            break;
        case 't':
            DEBUG_INFO("option:%c thread_count = %s",opt,optarg);
            thread_count = atoi(optarg);
            break;
        case ':':
            DEBUG_INFO("option:%c needs a value",opt);    
            exit(0);   
            break;
        case '?':
            DEBUG_INFO("unkown option:%c needs a value",optopt);       
            break;
        default:
            break;
        }
    }
    DEBUG_INFO("client_count = %d",client_count);
    DEBUG_INFO("server_ip = %s",server_ip);
    DEBUG_INFO("thread_count = %d",thread_count);

    //创建40个ip地址192.168.0.12 - 192.168.0.51
    for(int host_index = host_min_num; host_index < host_min_num + thread_count;host_index++){
        struct private_data *pd = (struct private_data*)malloc(sizeof(struct private_data));
        if(pd == NULL){
            perror("malloc");
            exit(0);
        }
        snprintf(pd->ip_str, sizeof(pd->ip_str),"192.168.0.%d",host_index);
        DEBUG_INFO("pd->ip_str = %s",pd->ip_str);
        snprintf(cmd_str, sizeof(cmd_str),"sudo ip addr add %s/24 dev ens33 1>/dev/null 2>&1",pd->ip_str);        
        DEBUG_INFO("cmd_str = %s",cmd_str);
        system(cmd_str);
        pd->client_count = client_count;
        pd->port = port;
        memcpy(pd->server_ip, server_ip, sizeof(server_ip));
        res = pthread_create(&pd->thread_id, NULL,client_thread,pd);
        if(res != 0){
            perror("pthread_create");
            exit(1);
        }
    }
    while(1){
        sleep(1);
    }
    return 0;
}

编译并运行:

sudo ./_build_/client -c 4 -r 192.168.0.5 -t 3

参数说明:

-c:每个线程创建的客户端数量

-r:远程服务器IP地址

-t:创建的线程数量 

服务器调试工具:

在服务器上看到了12个客户端

3个线程,没个线程4个客户端。

下次,写一个能够接收很多客户端的服务器

例如

sudo ./_build_/client -c 25000 -r 192.168.0.5 -t 40

40个线程,每个线程25000个客户端。

小结


http://www.niftyadmin.cn/n/1397795.html

相关文章

文献管理软件//Zotero的常用插件——Zotero Scholar Citations文献引用量(五)

Zotero Scholar Citations|自动抓取Google Scholar的引用量1 Zotero Scholar Citations功能介绍及下载安装1.1 Zotero Scholar Citations功能介绍1.2 Zotero Scholar Citations下载及安装2 Zotero Scholar Citations使用2.1 Zotero Scholar Citations更新引用量2.2 Zotero Scho…

ppt: .pptx中的内容有问题,Powerpoint可尝试修复此演示文稿,如果您信任此演示文稿的来源,请单击“修复”

问题&#xff1a; 遇到过几次从其他电脑或网址下载".pptx"文件后&#xff0c;双击打开时会报图1提示。 按照提示选择“修复”&#xff0c;问题仍然不能解决。 图1 .pptx文件打不开的报错内容 解决方法&#xff1a; 右键单击该文件&#xff0c;选择“属性”&#…

文献管理软件//Zotero的常用插件——Zutilo快捷操作(六)

Zutilo|快捷操作一、Zutilo介绍二、 Zutilo的下载与安装2.1 Zutilo的下载2.2 Zutilo的安装2.3 Zutilo参数的设置三、Zutilo参数的设置3.1 用户界面3.2 快捷键列表及常用快捷键设置3.2.1 常用快捷方式汇总3.2.1.1 文献列表两侧标签页的显示及隐藏3.2.1.2 文件夹与文献列表界面的…

文献管理软件//Zotero文献管理——文献之间的关联(八)

用endnote软件管理文献时&#xff0c;经常会根据一篇文献搜索许多其他的引用及被引文献&#xff0c;但是时间久了就容易忘记。 Zotero的文献关联这一功能&#xff0c;就可以将这些文献关联起来&#xff0c;一目了然&#xff0c;并可通过简单的单击动作快速阅览关联文献。 三种…

文献管理软件//Zotero的搜索引擎设置(七)

Zotero搜索引擎一、搜索引擎的获取二、搜索引擎的安装配置三、搜索引擎的使用3.1 以“**Connected Papers**”为例3.2 以Google Scholar Search为例3.3 以CrossRef Lookup为例一、搜索引擎的获取 笔者所用到的搜索引擎来源于&#xff1a; 青柠学术 公众号bwiernik/zotero-too…

文献管理软件//Zotero Connector无法自动获取pdf文件的解决方法

笔者在文献管理软件//Zotero的常用插件——Zotero translators//知网下载//知乎翻译器Jasminum&#xff08;茉莉花&#xff09;&#xff08;四&#xff09;中已经撰写过如何利用zotero软件的插件来获取知网文献。 但在实际操作过程中会出现以下提示&#xff08;图1&#xff0c…

文献管理软件//Zotero导入文献的五种方式(九)

Zotero导入文献的五种方式一、利用zotero插件自动获取pdf文件二、利用DOI获取pdf文件三、从剪贴板导入pdf文件3.1 导入单篇文献3.2 导入多篇文献四、利用endnote格式导入文献五、通过已下载的PDF文件导入文献一、利用zotero插件自动获取pdf文件 首先&#xff0c;可以通过以下两…

R: 错误: $ operator is invalid for atomic vectors

利用metaboanalyst包运行代码时提示&#xff1a; 错误: $ operator is invalid for atomic vectors后经检查输入csv文件发现&#xff0c;是因为首行sample name具有重复值导致&#xff0c;如下图&#xff1a; 将其按照下图重新命名后&#xff0c;再次运行代码&#xff0c;问题…