网络socket编程--多路复用

news/2024/7/9 18:30:28 标签: epoll, c语言, 内核, 网络

一、五种网络I/O模型

1.什么是I/O

I/O在计算机中指Input/Output,也就是输入和输出。由于程序和运行时数据是在内存中驻留,由CPU这个超快的计算核心来执行,涉及到数据交换的地方,通常是磁盘、网络等,就需要IO接口。

比如你打开浏览器,访问新浪首页,浏览器这个程序就需要通过网络IO获取新浪的网页。浏览器首先会发送数据给新浪服务器,告诉它我想要首页的HTML,这个动作是往外发数据,叫Output,随后新浪服务器把网页发过来,这个动作是从外面接收数据,叫Input。所以,通常,程序完成IO操作会有Input和Output两个数据流。当然也有只用一个的情况,比如,从磁盘读取文件到内存,就只有Input操作,反过来,把数据写到磁盘文件里,就只是一个Output操作。

I/O编程中,Stream(流)是一个很重要的概念,可以把流想象成一个水管,数据就是水管里的水,但是只能单向流动。Input Stream就是数据从外面(磁盘、网络)流进内存,Output Stream就是数据从内存流到外面去。对于浏览网页来说,浏览器和新浪服务器之间至少需要建立两根水管,才可以既能发数据,又能收数据。所以,程序运行是依靠cpu和内存来进行的,I/O操作是相对于内存而言的,从外部设备进入内存就叫Input,反之从内存输出到外部设备就叫Output.

I/O按照设备来分的话,分为两种,其一是网络I/O,也就是通过网络进行数据的拉取和输出。还有一种是磁盘I/O,主要是对磁盘进行读写工作。

2.阻塞、非阻塞、同步、异步分析

由于CPU和内存的速度远远高于外设的速度,所以,在IO编程中,就存在速度严重不匹配的问题。举个例子来说,比如要把100M的数据写入磁盘,CPU输出100M的数据只需要0.01秒,可是磁盘要接收这100M数据可能需要10秒,怎么办呢?

这就牵扯出来了同步、异步、阻塞与非阻塞这些概念。

举一个烧水的例子来具体解释这几个概念。
说到烧水,我们都是通过热水壶来烧水的。在很久之前,科技还没有这么发达的时候,如果我们要烧水,需要把水壶放到火炉上,我们通过观察水壶内的水的沸腾程度来判断水有没有烧开。随着科技的发展,现在市面上的水壶都有了提醒功能,当我们把水壶插电之后,水壶水烧开之后会通过声音提醒我们水开了。

2.1 什么是同步、异步

对于烧水这件事儿来说,传统水壶的烧水就是同步的,高科技水壶的烧水就是异步的。

同步请求,A调用B,B的处理是同步的,在处理完之前他不会通知A,只有处理完之后才会明确的通知A。

异步请求,A调用B,B的处理是异步的,B在接到请求后先告诉A我已经接到请求了,然后异步去处理,处理完之后通过回调等方式再通知A。

所以说,同步和异步最大的区别就是被调用方的执行方式和返回时机。同步指的是被调用方做完事情之后再返回,异步指的是被调用方先返回,然后再做事情,做完之后再想办法通知调用方。

2.2 什么是阻塞和非阻塞

还是那个烧水的例子,当你把水放到水壶里面,按下开关后,你可以坐在水壶前面,别的事情什么都不做,一直等着水烧好。你还可以先去客厅看电视,等着水开就好了。

对于你来说,坐在水壶前面等就是阻塞的,去客厅看电视等着水开就是非阻塞的。

阻塞请求,A调用B,A一直等着B的返回,别的事情什么也不干。

非阻塞请求,A调用B,A不用一直等着B的返回,先去忙别的事情了。

所以说,同步和异步最大的区别就是在被调用方返回结果之前的这段时间内,调用方是否一直等待。阻塞指的是调用方一直等待别的事情什么都不做。非阻塞指的是调用方先去忙别的事情。

2.3 阻塞、非阻塞和同步、异步的区别

有人认为阻塞和同步是一回事儿,非阻塞和异步是一回事。但是这是不对的。

首先,前面已经提到过,阻塞、非阻塞和同步、异步其实针对的对象是不一样的。阻塞、非阻塞说的是调用者,同步、异步说的是被调用者。

针对于上文所说的烧水的案例来说,阻塞与非阻塞是形容调用者人的,而同步和异步是形容被调用者烧水这件事儿。
同步里面可以有阻塞的情形,也可以有非阻塞的性情。异步也可以有阻塞的情形和非阻塞的情形。

我们先来看同步场景中是如何包含阻塞和非阻塞情况:

  • 我们是用传统的水壶烧水。在水烧开之前我们一直做在水壶前面,等着水开。这就是阻塞的。
  • 我们是用传统的水壶烧水。在水烧开之前我们先去客厅看电视了,但是水壶不会主动通知我们,需要我们时不时的去厨房看一下水有没有烧开。这就是非阻塞的。

再来看异步场景中是如何包含阻塞和非阻塞情况

  • 我们是用带有提醒功能的水壶烧水。在水烧发出提醒之前我们一直做在水壶前面,等着水开。这就是阻塞的。
  • 我们是用带有提醒功能的水壶烧水。在水烧发出提醒之前我们先去客厅看电视了,等水壶发出声音提醒我们。这就是非阻塞的。

我回到开始的那个问题:由于CPU和内存的速度远远高于外设的速度,所以,在IO编程中,就存在速度严重不匹配的问题。举个例子来说,比如要把100M的数据写入磁盘,CPU输出100M的数据只需要0.01秒,可是磁盘要接收这100M数据可能需要10秒,怎么办呢?

有两种办法:

第一种是CPU等着,也就是程序暂停执行后续代码,等100M的数据在10秒后写入磁盘,再接着往下执行,这种模式称为同步IO;

另一种方法是CPU不等待,只是告诉磁盘,“您老慢慢写,不着急,我接着干别的事去了”,于是,后续代码可以立刻接着执行,这种模式称为异步IO。

同步和异步的区别就在于是否等待IO执行的结果。好比你去麦当劳点餐,你说“来个汉堡”,服务员告诉你,对不起,汉堡要现做,需要等5分钟,于是你站在收银台前面等了5分钟,拿到汉堡再去逛商场,这是同步IO。

你说“来个汉堡”,服务员告诉你,汉堡需要等5分钟,你可以先去逛商场,等做好了,我们再通知你,这样你可以立刻去干别的事情(逛商场),这是异步IO。

很明显,使用异步IO来编写程序性能会远远高于同步IO,但是异步IO的缺点是编程模型复杂。想想看,你得知道什么时候通知你“汉堡做好了”,而通知你的方法也各不相同。如果是服务员跑过来找到你,这是回调模式,如果服务员发短信通知你,你就得不停地检查手机,这是轮询模式。总之,异步IO的复杂度远远高于同步IO。

在Linux下进行网络编程时,我们常常见到同步(Sync)/异步(Async),阻塞(Block)/非阻塞(Unblock)四种调用方式:

同步异步的概念描述的是用户线程与内核的交互方式:同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;而异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

阻塞非阻塞的概念描述的是用户线程调用内核IO操作的方式:阻塞是指IO操作在没有接收完数据或者没有得到结果之前不会返回,需要彻底完成后才返回到用户空间;而非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

在Linux下进行网络编程时,服务器端编程经常需要构造高性能的IO模型,常见的IO模型有五种:

(1)同步阻塞IO(Blocking IO)
(2)同步非阻塞IO(Non-blocking IO)
(3)IO多路复用(IO Multiplexing)
(4)信号驱动IO(signal driven IO)
(5)异步IO(Asynchronous IO)

3.五种IO模型:

1.同步阻塞IO(Blocking IO):

即传统的IO模型,在linux中默认情况下所有的socket都是阻塞模式。当用户进程调用了read()这个系统调用,内核就开始了IO的第一个阶段:准备数据。对于网络IO来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候内核就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当内核一直等到数据准备好了,它就会将数据从内核中拷贝到用户内存,然后内核返回结果,用户进程才解除阻塞的状态,重新运行起来;几乎所有的程序员第一次接触到的网络编程都是从listen()、read()、write() 等接口开始的,这些接口都是阻塞型的,一个简单的改进方案是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
在这里插入图片描述

2.同步非阻塞IO(Non-blocking IO):

默认创建的socket都是阻塞的,同步非阻塞IO是在同步阻塞IO的基础上,将socket设置为NONBLOCK,这个可以使用ioctl()系统调用设置。这样做用户线程可以在发起IO请求后可以立即返回,如果该次读操作并未读取到任何数据,用户线程需要不断地发起IO请求,直到数据到达后,才真正读取到数据,继续执行。整个IO请求的过程中,虽然用户线程每次发起IO请求后可以立即返回,但是为了等到数据,仍需要不断地轮询、重复请求,消耗了大量的CPU的资源。一般很少直接使用这种模型,而是在其他IO模型中使用非阻塞IO这一特性。

在这里插入图片描述

3.IO多路复用(IO Multiplexing):

IO多路复用模型是建立在内核提供的多路分离函数select基础之上的,使用select函数可以避免同步非阻塞IO模型中轮询等待的问题,此外poll、epoll都是这种模型。在该种模式下,用户首先将需要进行IO操作的socket添加到select中,然后阻塞等待select系统调用返回。当数据到达时,socket被激活,select函数返回。用户线程正式发起read请求,读取数据并继续执行。从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

在这里插入图片描述

4 .信号驱动IO(signal driven IO):

调用sigaltion系统调用,当内核中IO数据就绪时以SIGIO信号通知请求进程,请求进程
再把数据从内核读入到用户空间,这一步是阻塞的。

5.异步IO(Asynchronous IO):

即经典的Proactor设计模式,也称为异步非阻塞IO。“真正”的异步IO需要操作系统更强
的支持。在IO多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。而在异步IO模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可。

相比于IO多路复用模型,信号驱动IO和异步IO并不十分常用,不少高性能并发服务程序使用IO多路复用模型+多线程任务处理的架构基本可以满足需求。况且目前操作系统对异步IO的支持并非特别完善,更多的是采用IO多路复用模型模拟异步IO的方式。

在这里我们把前三种做一个形象地类比:

  1. 阻塞IO, 给女神发一条短信, 说我来找你了, 然后就默默的一直等着女神下楼, 这个期间除了等待你不会做其他事情;
  2. 非阻塞IO, 给女神发短信, 如果不回, 接着再发, 一直发到女神下楼, 这个期间你除了发短信等待不会做其他事情;
  3. IO多路复用, 是找一个宿管大妈来帮你监视下楼的女生, 这个期间你可以些其他的事情,例如可以顺便玩玩王者荣耀, 打篮球等等。IO复用又包括 select, poll, epoll 模式.

那么它们的区别是什么?
select大妈 每一个女生下楼, select大妈都不知道这个是不是你的女神, 她需要一个一个询问, 并且select大妈能力还有限, 最多一次帮你监视1024个妹子;

poll大妈 不限制盯着女生的数量, 只要是经过宿舍楼门口的女生, 都会帮你去问是不是你女神;

epoll大妈 不限制盯着女生的数量, 并且也不需要一个一个去问. 那么如何做呢? epoll大妈会为每个进宿舍楼的女生脸上贴上一个大字条,上面写上女生自己的名字, 只要女生下楼了, epoll大妈就知道这个是不是你女神了, 然后大妈再通知你;

二、select多路复用

1、基本概念

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合:

  • 当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。

  • 当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现。

  • 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用。

  • 如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用。

  • 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

2、select()函数

select()函数允许进程指示内核等待多个事件(文件描述符)中的任何一个发生,并只在有一个或多个事件发生或经历一段指定时间后才唤醒它,然后接下来判断究竟是哪个文件描述符发生了事件并进行相应的处理,函数原型如下:

#include <sys/select.h>
#include <sys/time.h>
struct timeval
{
long tv_sec; //seconds
long tv_usec; //microseconds
};
FD_ZERO(fd_set* fds) //清空集合
FD_SET(int fd, fd_set* fds) //将给定的描述符加入集合
FD_ISSET(int fd, fd_set* fds) //判断指定描述符是否在集合中
FD_CLR(int fd, fd_set* fds) //将给定的描述符从文件中删除

int select(int max_fd, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout);

说明:

  1. select监视并等待多个文件描述符的属性发生变化,它监视的属性分3类,分别是readfds(文件描述符有数据到来可读)、writefds(文件描述符可写)、和exceptfds(文件描述符异常)。调用后select函数会阻塞,直到有描述符就绪(有数据可读、可写、或者有错误异常),或者超时(timeout 指定等待时间)发生函数才返回。当select()函数返回后,可以通过遍历 fdset,来找到究竟是哪些文件描述符就绪。

  2. timeout告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。这个参数有三种可能:
    (1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。
    (2) 等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。
    (3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。

  3. select函数的返回值是就绪描述符的数目,超时时返回0,出错返回-1;

  4. 第一个参数max_fd指待测试的fd的总个数,它的值是待测试的最大文件描述符加1。Linux内核从0开始到max_fd-1扫描文件描述符,如果有数据出现事件(读、写、异常)将会返回;假设需要监测的文件描述符是8,9,10,那么Linux内核实际也要监测07,此时真正带测试的文件描述符是010总共11个,即max(8,9,10)+1,所以第一个参数是所有要监听的文件描述符中最大的+1。

  5. 中间三个参数readset、writeset和exceptset指定要让内核测试读、写和异常条件的fd集合,如果不需要测试的可以设置为NULL;

  6. 最后一个参数是设置select的超时时间,如果设置为NULL则永不超时;需要注意的是待测试的描述集总是从0, 1, 2, …开始的。 所以, 假如你要检测的描述符为8, 9, 10, 那么系统实际也要监测0, 1, 2, 3, 4, 5, 6, 7, 此时真正待测试的描述符的个数为11个, 也就是max(8, 9, 10) + 1在Linux内核有个参数__FD_SETSIZE定义了每个FD_SET的句柄个数中,这也意味着select所用到的FD_SET是有限的,也正是这个原因select()默认只能同时处理1024个客户端的连接请求:

/linux/posix_types.h:
#define __FD_SETSIZE 1024

下面是使用select()多路复用实现网络socket服务器多路并发的流程图:

在这里插入图片描述

下面是使用select()多路复用实现的服务器端示例代码:

#include <stdio.h>
#include <stdlib.h>#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>
#include <time.h>
#include <pthread.h>
#include <getopt.h>
#include <libgen.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))

static inline void msleep(unsigned long ms);
static inline void print_usage(char *progname);
int socket_server_init(char *listen_ip, int listen_port);

int main(int argc, char **argv)
{
        int listenfd, connfd;
        int serv_port = 0;
        int daemon_run = 0;
        char *progname = NULL;
        int opt;
        fd_set rdset;
        int rv;
        int i, j;
        int found;
        int maxfd=0;
        char buf[1024];
        int fds_array[1024];
        
struct option long_options[] =
{
        {"daemon", no_argument, NULL, 'b'},
        {"port", required_argument, NULL, 'p'},
        {"help", no_argument, NULL, 'h'},
        {NULL, 0, NULL, 0}
};

        progname = basename(argv[0]);
        /* Parser the command line parameters */
        while ((opt = getopt_long(argc, argv, "bp:h", long_options, NULL)) != -1)
        {
                switch (opt)
                {
                        case 'b':
                        daemon_run=1;
                        break;
                        case 'p':
                        serv_port = atoi(optarg);
                        break;
                        case 'h': /* Get help information */
                        print_usage(progname);
                        return EXIT_SUCCESS;
                        default:
                        break;
                }
        }
        if( !serv_port )
        {
                print_usage(progname);
                return -1;}
                if( (listenfd=socket_server_init(NULL, serv_port)) < 0 )
                {
                        printf("ERROR: %s server listen on port %d failure\n", argv[0],serv_port);
                        return -2;
                }
        printf("%s server start to listen on port %d\n", argv[0],serv_port);
        /* set program running on background */
        if( daemon_run )
        {
                daemon(0, 0);
        }
        for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
        {
                fds_array[i]=-1;
        }
        fds_array[0] = listenfd;
        for ( ; ; )
        {
                FD_ZERO(&rdset);
                for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
                {
                        if( fds_array[i] < 0 )
                        continue;
                        maxfd = fds_array[i]>maxfd ? fds_array[i] : maxfd;
                        FD_SET(fds_array[i], &rdset);
                }
                /* program will blocked here */
                rv = select(maxfd+1, &rdset, NULL, NULL, NULL);
                if(rv < 0)
                {
                        printf("select failure: %s\n", strerror(errno));
                        break;
                }
                else if(rv == 0)
                {
                        printf("select get timeout\n");
                        continue;
                }
                /* listen socket get event means new client start connect now */
                if ( FD_ISSET(listenfd, &rdset) )
                {
                        if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL)) < 0)
                        {
                                printf("accept new client failure: %s\n", strerror(errno));
                                continue;
                        }
                        found = 0;
                        for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
                        {
                                if( fds_array[i] < 0 )
                        {
                        printf("accept new client[%d] and add it into array\n", connfd );
                        fds_array[i] = connfd;
                        found = 1;
                        break;
                        }
                }
                if( !found )
                {
                        printf("accept new client[%d] but full, so refuse it\n", connfd);
                        close(connfd);
                }
        }
        else /* data arrive from already connected client */
        {
                for(i=0; i<ARRAY_SIZE(fds_array); i++)
                {
                        if( fds_array[i]<0 || !FD_ISSET(fds_array[i], &rdset) )
        continue;
                        if( (rv=read(fds_array[i], buf, sizeof(buf))) <= 0)
                        {
                                printf("socket[%d] read failure or get disconncet.\n", fds_array[i]);
                                close(fds_array[i]);
                                fds_array[i] = -1;
                        }
                        else
                        {
                                printf("socket[%d] read get %d bytes data\n", fds_array[i], rv);
                                /* convert letter from lowercase to uppercase */
                                for(j=0; j<rv; j++)
                                buf[j]=toupper(buf[j]);
                                if( write(fds_array[i], buf, rv) < 0 )
                                {
                                        printf("socket[%d] write failure: %s\n", fds_array[i], strerror(errno));
                                        close(fds_array[i]);
                                        fds_array[i] = -1;
                                }
                        }
                }
        }
}

CleanUp:
        close(listenfd);
        return 0;
        }
static inline void msleep(unsigned long ms)
{
        struct timeval tv;
        tv.tv_sec = ms/1000;
        tv.tv_usec = (ms%1000)*1000;
        select(0, NULL, NULL, NULL, &tv);
}
static inline void print_usage(char *progname)
{
        printf("Usage: %s [OPTION]...\n", progname);
        printf(" %s is a socket server program, which used to verify client and echo back string from it\n",
        progname);
        printf("\nMandatory arguments to long options are mandatory for short options too:\n");
        printf(" -b[daemon ] set program running on background\n");
        printf(" -p[port ] Socket server port address\n");
        printf(" -h[help ] Display this help information\n");
        printf("\nExample: %s -b -p 8900\n", progname);
        return ;
}

int socket_server_init(char *listen_ip, int listen_port)
{
        struct sockaddr_in servaddr;
        int rv = 0;
        int on = 1;
        int listenfd;
        if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
        {
                printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
                return -1;
        }
        /* Set socket port reuseable, fix 'Address already in use' bug when socket server         restart */
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
        memset(&servaddr, 0, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_port = htons(listen_port);
        if( !listen_ip ) /* Listen all the local IP address */
        {
                servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        }
        else /* listen the specified IP address */
        {
                if (inet_pton(AF_INET, listen_ip, &servaddr.sin_addr) <= 0)
                {
                        printf("inet_pton() set listen IP address failure.\n");
                        rv = -2;
                        goto CleanUp;
                }
        }
        if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
        {
                printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
                rv = -3;
                goto CleanUp;
        }
        if(listen(listenfd, 13) < 0)
        {
                printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
                rv = -4;
                goto CleanUp;
        }
CleanUp:
        if(rv<0)
        close(listenfd);
        else
        rv = listenfd;
        return rv;
}

基于select的I/O复用模型的是单进程执行可以为多个客户端服务,这样可以减少创建线程或进程所需要的CPU时间片或内存
资源的开销;此外几乎所有的平台上都支持select(),其良好跨平台支持是它的另一个优点。当然它也有两个主要的缺点:

  1. 每次调用 select()都需要把fd集合从用户态拷贝到内核态,之后内核需要遍历所有传递进来的fd,这时如果客户端fd很多时会导致系统开销很大;
  2. 单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过setrlimit()、修改宏定义甚至重新编译内核等方式来提升这一限制,但是这样也会造成效率的降低;

三、poll多路复用

select()和poll()系统调用的本质一样,前者在BSD UNIX中引入的,后者在System V中引入的。poll()的机制与 select() 类似,与 select() 在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是 poll() 没有最大文件描述符数量的限制(但是数量过大后性能也是会下降)。poll() 和 select() 同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

poll函数的原型说明如下:

#include <poll.h>

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

struct pollfd {
    int fd; 	   /* 描述符 */
    short events;  /* 需要监听的事件 */
    short revents; /* 实际发生的事件 */
};

函数描述: 监听多个文件描述符的属性变化,和select类似,但是有很大区别,使用一个 pollfd 指针来替代 select 的三个set的功能。

参数描述:

fd: 结构体指针,可以传入多个结构体,每个结构体都是一个被监听的描述符

nfds: 指定传入的结构体的数量

timeout: 指定等待的毫秒数,无论I/O是否准备好,poll都会返回。timeout指定为负数值表示无限超时,使poll()一直挂起(阻塞)直到一个指定事件发生;timeout为0指示poll调用立即返回并列出准备好I/O的文件描述符,但并不等待其它的事件。这种情况下,poll()就像它的名字那样,一旦选举出来,立即返回。

返回值: 返回有事件的描述符数量,函数返回后,需要轮询来找到发生事件的描述符,错误则返回-1

pollfd 结构体:

​ fd: 表示描述符

​ events: 需要监听的事件掩码,取值如下

​ revents: 实际发生的事件掩码,取值如下
在这里插入图片描述

非法事件
在这里插入图片描述
POLLIN | POLLPRI等价于select()的读事件,POLLOUT |POLLWRBAND等价于select()的写事件。POLLIN等价于
POLLRDNORM |POLLRDBAND,而POLLOUT则等价于POLLWRNORM。例如,要同时监视一个文件描述符是否可读和可写,我们可以设置 events为POLLIN |POLLOUT。在poll返回时,我们可以检查revents中的标志,对应于文件描述符请求的events结构体。如果POLLIN事件被设置,则文件描述符可以被读取而不阻塞。如果POLLOUT被设置,则文件描述符可以写入而不导致阻塞。这些标志并不是互斥的:它们可能被同时设置,表示这个文件描述符的读取和写入操作都会正常返回而不阻塞。

该函数成功调用时,poll()返回结构体中revents域不为0的文件描述符个数;如果在超时前没有任何事件发生,poll()返回0;失败时,poll()返回-1,并设置errno为下列值之一:

  • EBADF 一个或多个结构体中指定的文件描述符无效。

  • EFAULTfds 指针指向的地址超出进程的地址空间。

  • EINTR 请求的事件之前产生一个信号,调用可以重新发起。

  • EINVALnfds  参数超出PLIMIT_NOFILE值。

  • ENOMEM   可用内存不足,无法完成请求。

下面是使用poll()多路复用实现的服务器端示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <ctype.h>
#include <time.h>
#include <pthread.h>
#include <getopt.h>
#include <libgen.h>
#include <sys/types.h> 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <poll.h>
#define ARRAY_SIZE(x) (sizeof(x)/sizeof(x[0]))
static inline void print_usage(char *progname);
int socket_server_init(char *listen_ip, int listen_port);
int main(int argc, char **argv)
{
	 int listenfd, connfd;
	 int serv_port = 0;
	 int daemon_run = 0;
	 char *progname = NULL;
	 int opt;
	 int rv;
	 int i, j;
	 int found;
	 int max;
	 char buf[1024];
	 struct pollfd fds_array[1024];
	 struct option long_options[] =
	 { 
		 {"daemon", no_argument, NULL, 'b'},
		 {"port", required_argument, NULL, 'p'},
		 {"help", no_argument, NULL, 'h'},
		 {NULL, 0, NULL, 0}
	 }; 
	 progname = basename(argv[0]);
	 /* Parser the command line parameters */
	 while ((opt = getopt_long(argc, argv, "bp:h", long_options, NULL)) != -1)
	 { 
		 switch (opt)
		 { 
			 case 'b':
			 daemon_run=1;
			 break;
		 
			 case 'p':
			 serv_port = atoi(optarg);
			 break;
			 
			 case 'h': /* Get help information */
			 print_usage(progname);
			 return EXIT_SUCCESS;
			 default:
			 break;
		 } 
	 } 
	 if( !serv_port )
	 { 
			 print_usage(progname);
			 return -1;
	 }
	 if( (listenfd=socket_server_init(NULL, serv_port)) < 0 )
	 {
		 printf("ERROR: %s server listen on port %d failure\n", argv[0],serv_port);
		 return -2;
	 }
		 printf("%s server start to listen on port %d\n", argv[0],serv_port);
		 /* set program running on background */
	 if( daemon_run )
	 {
		 daemon(0, 0);
	 }
	 for(i=0; i<ARRAY_SIZE(fds_array) ; i++)
	 {
		 fds_array[i].fd=-1;
	 }
	 fds_array[0].fd = listenfd;
	 fds_array[0].events = POLLIN;
	 max = 0;
	 for ( ; ; )
	 {
		 /* program will blocked here */
		 rv = poll(fds_array, max+1, -1);
	 	if(rv < 0)
		 {
			 printf("select failure: %s\n", strerror(errno));
			 break;
		 }
		 else if(rv == 0)
		 {
			 printf("select get timeout\n");
			 continue;
		 }
	 /* listen socket get event means new client start connect now */
		 if (fds_array[0].revents & POLLIN)
		 {
			 if( (connfd=accept(listenfd, (struct sockaddr *)NULL, NULL)) < 0)
			 {
				 printf("accept new client failure: %s\n", strerror(errno));
				 continue;
			 }
			 found = 0;
			 for(i=1; i<ARRAY_SIZE(fds_array) ; i++)
			 {
				 if( fds_array[i].fd < 0 )
	 			{
					 printf("accept new client[%d] and add it into array\n", connfd );
					 fds_array[i].fd = connfd;
					 fds_array[i].events = POLLIN;
					 found = 1;
					 break;
				 }
			 }
			 if( !found )
			 {
				 printf("accept new client[%d] but full, so refuse it\n", connfd);
				 close(connfd);
				 continue;
			 }
			 max = i>max ? i : max;
			 if (--rv <= 0)
				 continue;
	 }
	 else /* data arrive from already connected client */
	 {
		 for(i=1; i<ARRAY_SIZE(fds_array); i++)
		 {
			 if( fds_array[i].fd < 0 )
			 continue;
			 if( (rv=read(fds_array[i].fd, buf, sizeof(buf))) <= 0)
			 {
				 printf("socket[%d] read failure or get disconncet.\n", 		fds_array[i].fd);
				 close(fds_array[i].fd);
				 fds_array[i].fd = -1;
			 }
			 else
			 {
				 printf("socket[%d] read get %d bytes data\n", fds_array[i].fd, rv);
				 /* convert letter from lowercase to uppercase */
				 for(j=0; j<rv; j++)
				 buf[j]=toupper(buf[j]);
				 if( write(fds_array[i].fd, buf, rv) < 0 )
				 {
					 printf("socket[%d] write failure: %s\n", fds_array[i].fd, strerror(errno));
					 close(fds_array[i].fd);
					 fds_array[i].fd = -1;
				 }
			 }
		 }
	 }
}
CleanUp:
	 close(listenfd);
	 return 0;
}
	static inline void print_usage(char *progname)
	{
		 printf("Usage: %s [OPTION]...\n", progname);
		 printf(" %s is a socket server program, which used to verify client and echo back string from it\n",
	progname);
		 printf("\nMandatory arguments to long options are mandatory for short options too:\n");
		 printf(" -b[daemon ] set program running on background\n");
		 printf(" -p[port ] Socket server port address\n");
		 printf(" -h[help ] Display this help information\n");
		 printf("\nExample: %s -b -p 8900\n", progname);
		 return ;
	}
		int socket_server_init(char *listen_ip, int listen_port)
	{
		 struct sockaddr_in servaddr;
		 int rv = 0;
		 int on = 1;
		 int listenfd;
		 if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	 {
		 printf("Use socket() to create a TCP socket failure: %s\n", strerror(errno));
		 return -1;
	 }
		 /* Set socket port reuseable, fix 'Address already in use' bug when socket server restart */
		 setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
		 memset(&servaddr, 0, sizeof(servaddr));
		 servaddr.sin_family = AF_INET; 
		 servaddr.sin_port = htons(listen_port);
		 if( !listen_ip ) /* Listen all the local IP address */
		 {
			 servaddr.sin_addr.s_addr = htonl(INADDR_ANY); 
		 }
		 else /* listen the specified IP address */
		 {
			 if (inet_pton(AF_INET, listen_ip, &servaddr.sin_addr) <= 0)
			 {
			 		printf("inet_pton() set listen IP address failure.\n");
					 rv = -2;
					 goto CleanUp;
			 }
		 }
		 if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
		 {
			 printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
			 rv = -3;
			 goto CleanUp;
		 }
		 if(listen(listenfd, 13) < 0)
		 {
			 printf("Use bind() to bind the TCP socket failure: %s\n", strerror(errno));
			 rv = -4;
			 goto CleanUp;
			 }
CleanUp:
		 if(rv<0)
		 	close(listenfd);
		 else
			 rv = listenfd;
		 return rv;
		}

epoll_723">四、epoll多路复用

在linux 没有实现epoll事件驱动机制之前,我们一般选择用select或者poll等IO多路复用的方法来实现并发服务程序。自Linux 2.6内核正式引入epoll以来,epoll已经成为了目前实现高性能网络服务器的必备技术,在大数据、高并发、集群等一些名词唱得火热之年代,select和poll的用武之地越来越有限,风头已经被epoll占尽。

select的缺点:

  1. 单个进程能够监视的文件描述符的数量存在最大限制,通常是1024,当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;(在linux内核头文件中,有这样的定义:#define
    __FD_SETSIZE 1024)
  2. 内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
  3. select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
  4. select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。

相比select模型,poll使用链表保存文件描述符,因此没有了监视文件数量的限制,但其他三个缺点依然存在。

拿select模型为例,假设我们的服务器需要支持100万的并发连接,则在__FD_SETSIZE 为1024的情况下,则我们至少需要开辟1k个进程才能实现100万的并发连接。除了进程间上下文切换的时间消耗外,从内核/用户空间大量的无脑内存拷贝、数组轮询等,是系统难以承受的。因此,基于select模型的服务器程序,要达到10万级别的并发访问,是一个很难完成的任务。

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。

ET (edge-triggered)是高速工作方式,只支持non-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

ET和LT的区别就在这里体现,LT事件不会丢弃,而是只要读buffer里面有数据可以让用户读,则不断的通知你。而ET则只在事件发生之时通知。可以简单理解为LT是水平触发,而ET则为边缘触发。LT模式只要有事件未处理就会触发,而ET则只在高低电平变换时(即状态从1到0或者0到1)触发。

由于epoll的实现机制与select/poll机制完全不同,上面所说的 select的缺点在epoll上不复存在。设想一下如下场景:有100万个客户端同时与一个服务器进程保持着TCP连接。而每一时刻,通常只有几百上千个TCP连接是活跃的(事实上大部分场景都是这种情况)。如何实现这样的高并发?在select/poll时代,服务器进程每次都把这100万个连接告诉操作系统(从用户态复制句柄数据结构到内核态),让操作系统内核去查询这些套接字上是否有事件发生,轮询完后,再将句柄数据复制到用户态,让服务器应用程序轮询处理已发生的网络事件,这一过程资源消耗较大,因此,select/poll一般只能处理几千的并发连接。

epoll的设计和实现与select完全不同。epoll通过在Linux内核中申请一个简易的文件系统,把原先的select/poll调用分成了3个部分:

  1. 调用epoll_create()建立一个epoll对象(在epoll文件系统中为这个句柄对象分配资源)
  2. 调用epoll_ctl向epoll对象中添加这100万个连接的套接字
  3. 调用epoll_wait收集发生的事件的连接

如此一来,要实现上面说是的场景,只需要在进程启动时建立一个epoll对象,然后在需要的时候向这个epoll对象中添加或者删除连接。同时,epoll_wait的效率也非常高,因为调用epoll_wait时,并没有一股脑的向操作系统复制这100万个连接的句柄数据,内核也不需要去遍历全部的连接。

epoll_759">2.epoll()

epollepoll_create_760">2.1.创建epoll实例:epoll_create()

#include <sys/epoll.h>
int epoll_create(int size);

系统调用epoll_create()创建了一个新的epoll实例,其对应的兴趣列表初始化为空。若成功返回文件描述符,若出错返回-1。参数size指定了我们想要通过epoll实例来检查的文件描述符个数。该参数并不是一个上限,而是告诉内核应该如何为内部数据结构划分初始大小。从Linux2.6.8版以来,size参数被忽略不用。

作为函数返回值,epoll_create()返回了代表新创建的epoll实例的文件描述符。这个文件描述符在其他几个epoll系统调用中用来表示epoll实例。当这个文件描述符不再需要时,应该通过close()来关闭。当所有与epoll实例相关的文件描述符都被关闭时,实例被销毁,相关的资源都返还给系统。从2.6.27版内核以来,Linux支持了一个新的系统调用epoll_create1()。该系统调用执行的任务同epoll_create()一样,但是去掉了无用的参数size,并增加了一个可用来修改系统调用行为的flags参数。目前只支持一个flag标志:EPOLL_CLOEXEC,它使得内核在新的文件描述符上启动了执行即关闭标志。

epollepoll_ctl_770">2.2.修改epoll的兴趣列表:epoll_ctl()

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);

系统调用epoll_ctl()能够修改由文件描述符epfd所代表的epoll实例中的兴趣列表。若成功返回0,若出错返回-1。

第一个参数epfd是epoll_create()的返回值;
第二个参数op用来指定需要执行的操作,它可以是如下几种值:

  1. EPOLL_CTL_ADD:将描述符fd添加到epoll实例中的兴趣列表中去。对于fd上我们感兴趣的事件,都指定在ev所指向的结构体中。如果我们试图向兴趣列表中添加一个已存在的文件描述符,epoll_ctl()将出现EEXIST错误;
  2. EPOLL_CTL_MOD:修改描述符上设定的事件,需要用到由ev所指向的结构体中的信息。如果我们试图修改不在兴趣列表 中的文件描述符,epoll_ctl()将出现ENOENT错误;
  3. EPOLL_CTL_DEL:将文件描述符fd从epfd的兴趣列表中移除,该操作忽略参数ev。如果我们试图移除一个不在epfd的兴趣列表中的文件描述符,epoll_ctl()将出现ENOENT错误。关闭一个文件描述符会自动将其从所有的epoll实例的兴趣列表 移除;

第三个参数fd指明了要修改兴趣列表中的哪一个文件描述符的设定。该参数可以是代表管道、FIFO、套接字、POSIX消息队列、inotify实例、终端、设备,甚至是另一个epoll实例的文件描述符。但是,这里fd不能作为普通文件或目录的文件描述符;

第四个参数ev是指向结构体epoll_event的指针,结构体的定义如下:

typedef union epoll_data
{
void *ptr; /* Pointer to user-defind data */
int fd; /* File descriptor */
uint32_t u32; /* 32-bit integer */
uint64_t u64; /* 64-bit integer */
} epoll_data_t;
struct epoll_event
{
uint32_t events; /* epoll events(bit mask) */
epoll_data_t data; /* User data */
};

参数ev为文件描述符fd所做的设置(epoll_event)如下:

  • events字段是一个位掩码,它指定了我们为待检查的描述符fd上所感兴趣的事件集合;
  • data字段是一个联合体,当描述符fd稍后称为就绪态时,联合的成员可用来指定传回给调用进程的信息;

epoll_wait_811">2.3.事件等待:epoll_wait()

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);

系统调用epoll_wait()返回epoll实例中处于就绪态的文件描述符信息,单个epoll_wait()调用能够返回多个就绪态文件描述符的信息。调用成功后epoll_wait()返回数组evlist中的元素个数,如果在timeout超时间隔内没有任何文件描述符处于就绪态的话就返回0,出错时返回-1并在errno中设定错误码以表示错误原因。

第一个参数epfd是epoll_create()的返回值;

第二个参数evlist所指向的结构体数组中返回的是有关就绪态文件描述符的信息,数组evlist的空间由调用者负责申请;

第三个参数maxevents指定所evlist数组里包含的元素个数;

第四个参数timeout用来确定epoll_wait()的阻塞行为,有如下几种:

  • 如果timeout等于-1,调用将一直阻塞,直到兴趣列表中的文件描述符上有事件产生或者直到捕获到一个信号为止。
  • 如果timeout等于0,执行一次非阻塞式地检查,看兴趣列表中的描述符上产生了哪个事件。
  • 如果timeout大于0,调用将阻塞至多timeout毫秒,直到文件描述符上有事件发生,或者直到捕获到一个信号为止。

数组evlist中,每个元素返回的都是单个就绪态文件描述符的信息。events字段返回了在该描述符上已经发生的事件掩码。data字段返回的是我们在描述符上使用epoll_ctl()注册感兴趣的事件时在ev.data中所指定的值。注意,data字段是唯一可获知同这个事件相关的文件描述符的途径。因此,当我们调用epoll_ctl()将文件描述符添加到感兴趣列表中时,应该要么将ev.date.fd设为文件描述符号,要么将ev.date.ptr设为指向包含文件描述符号的结构体。

当我们调用epoll_ctl()时可以在ev.events中指定的位掩码以及由epoll_wait()返回的evlist[].events中的值如下所示:

在这里插入图片描述

默认情况下,一旦通过epoll_ctl()的EPOLL_CTL_ADD操作将文件描述符添加到epoll实例的兴趣列表中后,它会保持激活状态(即,之后对epoll_wait()的调用会在描述符处于就绪态时通知我们)直到我们显示地通过epoll_ctl()的EPOLL_CTL_DEL操作将其从列表中移除。如果我们希望在某个特定的文件描述符上只得到一次通知,那么可以在传给epoll_ctl()的ev.events中指定EPOLLONESHOT标志。如果指定了这个标志,那么在下一个epoll_wait()调用通知我们对应的文件描述符处于就绪态之后,这个描述符就会在兴趣列表中被标记为非激活态,之后的epoll_wait()调用都不会再通知我们有关这个描述符的状态了。如果需要,我们可以稍后用过调用epoll_ctl()的EPOLL_CTL_MOD操作重新激活对这个文件描述符的检查。

下面是使用epoll()多路复用实现的服务器端示例代码:

  1 #include<stdio.h>
  2 #include<stdlib.h>
  3 #include<unistd.h>
  4 #include<string.h>
  5 #include<errno.h>
  6 #include<ctype.h>
  7 #include<time.h>
  8 #include<pthread.h>
  9 #include<getopt.h>
 10 #include<libgen.h>
 11 #include<sys/types.h>
 12 #include<sys/socket.h>
 13 #include<arpa/inet.h>
 14 #include<netinet/in.h>
 15 #include<sys/epoll.h>
 16 #include<sys/resource.h>
 17 
 18 #define MAX_EVENTS 512
 19 #define ARRAY_SIZE(x) sizeof(x)/sizeof(x[0])
 20 
 21 static inline void print_usage(char *progname);
 22 int socket_server_init(char *listen_ip, int listen_port);
 23 void set_socket_rlimit(void);
 24 
 25 int main(int argc, char **argv)
 26 {
 27         int            listenfd,connfd;
 28         int            serv_port = 0;
 29         char            *progname = NULL;
 30         int            daemon_run = 0;
 31         int            opt;
 32         int             rv;
 33         int             i,j;
 34         int             found;
 35         char            buf[1024];
 36 
 37         int                    epollfd;
 38         struct epoll_event     event;
 39         struct epoll_event     event_array[MAX_EVENTS];
 40         int                    events;
 41 
 42         struct option            long_options[]=
 43         {
 44                 {"daemon", no_argument, NULL, 'b'},
 45                 {"port", required_argument, NULL, 'p'},
 46                 {"help", no_argument, NULL, 'h'},
 47                 {NULL, 0, NULL, 0}
 48         };
 49 
 50         progname = basename(argv[0]);
 51 
 52         while((opt = getopt_long(argc,argv,"bp:h",long_options,NULL))!=-1)
 53         {
 54                 switch(opt)
 55                 {
 56                         case 'b':
 57                                 daemon_run=1;
 58                                 break;
 59 
 60                         case 'p':
 61                                 serv_port = atoi(optarg);
 62                                 break;
 63 
 64                         case 'h':
 65                                 print_usage(progname);
 66                                 return EXIT_SUCCESS;
 67 
 68                         default:
 69                                 break;
 70                 }
 71         }
 72 
 73         if(!serv_port)
 74         {
 75                 print_usage(progname);
 76                 return -1;
 77         }
 78 
 79         set_socket_rlimit();
 80 
 81         if((listenfd=socket_server_init(NULL,serv_port))<0)
 82         {
 83                 printf("ERROR:%s serverlisten on port %d failture\n",argv[0],serv_port);
 84                 return -2;
 85         }
 86         printf("%s server start to listen on port %d\n",argv[0],serv_port);
 87 
 88         if(daemon_run)
 89         {
 90                 daemon(0,0);
 91         }
 92 
 93         if((epollfd = epoll_create(MAX_EVENTS))<0)
 94         {
 95                 printf("epoll_create() failture:%s\n", strerror(errno));
 96                 return -3;
 97         }
 98 
 99         //event.events = EPOLLIN|EPOLLET;
100         event.events = EPOLLIN;
101         event.data.fd = listenfd;
102 
103         if(epoll_ctl(epollfd, EPOLL_CTL_ADD, listenfd, &event) < 0)
104         {
105                 printf("epoll add listen socket failture:%s\n", strerror(errno));
106                 return -4;
107         }
108 
109         for(;;)
110         {
111                 events=epoll_wait(epollfd, event_array, MAX_EVENTS, -1);
112                 if(events<0)
113                 {
114                         printf("epoll failture:%s\n", strerror(errno));
115                         break;
116                 }
117                 else if(events==0)
118                 {
119                         printf("epoll get timeout\n");
120                         continue;
121                 }
122                 for(i=0; i<events; i++)
123                 {
124                         if((event_array[i].events&EPOLLERR)||(event_array[i].events&EPOLLHUP))
125                         {
126                                 printf("epoll_wait get error on fd[%d]:%s\n",event_array[i].data.fd, strerror(errno));
127                                 epoll_ctl(epollfd,EPOLL_CTL_DEL, event_array[i].data.fd,NULL);
128                                 close(event_array[i].data.fd);
129                         }
130 
131                         if(event_array[i].data.fd == listenfd)
132                         {
133                                 if((connfd=accept(listenfd,(struct sockaddr *)NULL,NULL)) < 0)
134                                 {
135                                         printf("accept new client failture:%s\n",strerror(errno));
136                                         continue;
137                                 }
138                                 event.data.fd=connfd;
139                                 event.events=EPOLLIN;
140                                 if(epoll_ctl(epollfd,EPOLL_CTL_ADD,connfd, &event)<0)
141                                 {
142                                         printf("epoll add client socket failture:%s\n",strerror(errno));
143                                         close(event_array[i].data.fd);
144                                         continue;
145                                 }
146                                 printf("epoll add new client socket[%d] ok\n",connfd);
147                                 }
148                                 else
149                                 {
150                                         if((rv=read(event_array[i].data.fd,buf,sizeof(buf)))<=0)
151                                         {
152         printf("socket[%d] read failture or get disconncet and will be removed.\n",event_array[i].data.fd);
153         epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
154         continue;
155                                         }
156                                         else
157                                         {
158                                 printf("socket[%d] read get %d bytes data\n",event_array[i].data.fd);
159                                 for(j=0; j< rv; j++)
160                                         buf[j] = toupper(buf[j]);
161                                 if(write(event_array[i].data.fd, buf, rv) < 0)
162                                 {
163                                         printf("socket[%d] write failture:%s\n",event_array[i].data.fd, strerror(errno));
164                                         epoll_ctl(epollfd, EPOLL_CTL_DEL, event_array[i].data.fd, NULL);
165                                         close(event_array[i].data.fd);
166                                 }
167                                         }
168                                 }
169                         }
170 
171         }
172 Cleanup:
173         close(listenfd);
174         return 0;
175 }
176 
177 static inline void print_usage(char *progname)
178 {
179         printf("Usage %s [OPTION]...\n", progname);
180         return;
181 }
182 
183 int socket_server_init(char *listen_ip, int listen_port)
184 {
185         struct sockaddr_in            servaddr;
186         int                           rv = 0;
187         int                           on = 1;
188         int                           listenfd;
189 
190         if((listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
191         {
192                 printf("Use socket() to create a TCP socket failture:%s\n",strerror(errno));
193                 return -1;
194         }
195 
196         setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on,sizeof(on));
197 
198         memset(&servaddr, 0, sizeof(servaddr));
199         servaddr.sin_family = AF_INET;
200         servaddr.sin_port = htonl(listen_port);
201 
202         if(!listen_ip)
203         {
204                 servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
205         }
206         else
207         {
208                 if(inet_pton(AF_INET, listen_ip, &servaddr.sin_addr) <= 0)
209                 {
210                         printf("inet_pton() set listen IP address failture.\n");
211                         rv =-2;
212                         goto Cleanup;
213                 }
214         }
215         if(bind(listenfd, (struct sockaddr *) &servaddr, sizeof(servaddr)) < 0)
216         {
217                 printf("Use bind() to bind the TCP socket failture:%s\n", strerror(errno));
218                 rv =-3;
219                 goto Cleanup;
220         }
221         if(listen(listenfd, 64)<0)
222         {
223                 printf("Use bind() to bind the TCP socket failture:%s\n",strerror(errno));
224                 rv =-4;
225                 goto Cleanup;
226         }
227 
228 Cleanup:
229         if(rv<0)
230                 close(listenfd);
231         else
232                 rv = listenfd;
233         return rv;
234 }
235 
236 void set_socket_rlimit(void)
237 {
238         struct rlimit limit = {0};
239         getrlimit(RLIMIT_NOFILE, &limit);
240         limit.rlim_cur = limit.rlim_max;
241         setrlimit(RLIMIT_NOFILE, &limit);
242 
243         printf("set socket open fd max count to %d\n", limit.rlim_max);
244 }

http://www.niftyadmin.cn/n/1868768.html

相关文章

Centos7:mysql5.6安装,配置及使用(RPM方式)

1.首先安装好jdk环境,本机所用环境为jdk1.8 2.卸载MariaDB(Centos7自带)与Mysql 2.1卸载:MariaDB #rpm -qa | grep -i mariadb //查询安装的MariaDB#rpm -e --nodeps 查到软件名 //卸载相关MariaDB的所有软件#find / -name mariadb#whereis mariadb //查找是否有相关配置目录…

Linux_cJSON使用--数据封装与解析

一、JOSN的概念&#xff1a; 1.cJSON简介 JSON(JavaScript Object Notation)是一种轻量级的文本数据交换格式, 易于让人阅读。同时也易于机器解析和生成.。JSON虽然是Javascript的一个子集, 但JSON是独立于语言的文本格式,并且采用了类似C语言的一些习惯。为什么我们要使用cJ…

Linux下gcc编译器的编译过程

一、什么是GCC GCC是以GPL许可证所发行的自由软件&#xff0c;也是GNU计划的关键部分。GCC的初衷是为GNU操作系统专门编写一款编译器&#xff0c;现已被大多数类Unix操作系统&#xff08;如Linux、BSD、MacOS X等&#xff09;采纳为标准的编译器&#xff0c;甚至在微软的Windo…

Centos6.4 用rpm方式安装MySql5.6

1、查看系统是否安装了MySQL 使用命令&#xff1a; #rpm -qa | grep mysql 2、卸载已安装的MySQL 卸载mysql命令如下&#xff1a; #rpm -e --nodeps mysql-libs-5.1.61-4.el6.x86_64 要将 /var/lib/mysql文件夹下的所有文件都删除干净 3、…

巅峰对决之Swarm、Kubernetes、Mesos

转载自:http://dockone.io/article/1138 感谢作者和编者的分享 【编者的话】这篇文章对比了三大主流调度框架&#xff1a;Swarm、Kubernetes和Mesos。文章不仅从理论上讨论了各个框架的优缺点&#xff0c;还从两个实际的案例出发&#xff0c;分析了每个框架具体使用方法。这篇文…

Linux下动态库与静态库原理与使用

一、什么是库 在系统中&#xff0c;库就是一个现有的&#xff0c;已经写好可供直接使用的代码&#xff0c;很多程序都依赖库&#xff1b;通常&#xff0c;库大致分为两种&#xff1a;分别是动态库和静态库&#xff1b;在编译过程中&#xff0c;有四个阶段&#xff08;具体参考…

由TCP三路握手引出的问题及深度理解socket编程

最近在复习tcp三路握手三路握手以及四路挥手时&#xff0c;想到平时抓包正常是的状态&#xff0c;那当SYN发后&#xff0c;若服务器异常&#xff0c;会出现什么&#xff1f;于是先去查了相关书籍&#xff0c;找到了以下的流程图&#xff1a; 于是知道可能返回的是RST&#xff…

快速上手mosquitto在ubuntu18.04下的安装及测试

一、什么是MQTT MQTT是一种基于发布/订阅模式的“轻量级”通讯协议&#xff0c;该协议构建于TCP/IP协议上。MQTT最大优点在于&#xff0c;可以以极少的代码和有限的带宽&#xff0c;为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议&#xff…