selector、epoll详解

news/2024/7/9 16:16:53 标签: epoll, 多线程

文章目录

    • 基础
    • 一、Selector
      • 1.1 原理
      • 1.2 示例
    • 二、epoll
      • 2.1 原理
      • 2.2 示例
      • 2.3 两种触发模式(LT、ET)
      • 2.4 总体流程
    • 三、Reactor

基础

首先,我们要明确同步、异步的层级。在这里讨论经常会出现各种同步、异步的概念让人迷惑,例如Netty用的是多路复用,而多路复用是同步非阻塞的I/O模型,但是Netty却宣称自己是异步的处理框架。其实这都是因为这里的同步、异步是针对不同线程,不同层级来说的。

同步、异步在I/O模型中是针对I/O线程的,但是在后面的Reactor模型中是针对业务线程。所以Netty是一个异步框架,其底层I/O模型使用的是同步的多路复用,但Netty在业务处理层面是异步的。

同时,我们要知道一个Tcp连接,是由两端的套接字唯一确定的。即一个五元组(cliaddr:cliport,serveraddr:serverport,协议)
每一连接创建完毕以后会生成一个表示该Socket的fd返回给上层,如果是java应用,对应一个java.net.Socket对象引用。
当数据到达以后,网卡接收数据,并将数据通过DMA的方式放到内核的网络数据缓冲区中,然后内核线程进行协议解析,识别出tcp报文中的cliAddr、cliPort、serverAddr、serverPort四个元素,唯一确定一个socketfd(这里解析协议的就是网卡驱动程序)。如果报文中的syn标志位为1,则表示该报文是一个新的客户端的连接报文,交给监听serverPort的监听Socket去处理(此时监听Socket发生数据可读事件,然后三次握手,建立tcp连接)。

我们上层应用调用监听socket的accept()系统调用时,tcp连接以及Socket其实已经创建好了,该系统调用只是从监听Socket的连接队列中取一个已经创建完毕的Socket的fd到用户态。(Socket其实就是一个linux下的文件)
在这里插入图片描述

一、Selector

1.1 原理

selector的原理是select系统调用,该系统调用可以在一段时间内,监听用户感兴趣的文件描述符(fd)上的可读、可写和异常等事件。

每次使用的时候,需要我们传入一个fd集合(实际上是3个,readfds、writefds、exceptfds),select()系统调用遍历用户传入的描述符列表。对于每一个fd,调用fd的poll()方法(这个方法和应用层的poll()系统调用是两回事),这将会添加调用者(当前线程)到fd的等待队列,返回适用于当前fd的事件(读,写,异常)。如果任何fd匹配用户传入的条件,select()在更新用户传入的fd_sets后,立即返回。如果没有匹配的fd,select()会一直休眠,一直到用户指定的最大超时时间。

如果在超时时间间隔内,注册到select()上的任一fd有感兴趣的事件发生,fd会通知这个fd的等待队列。这会唤醒调用select()时休眠的用户线程,这时就会该线程会重复轮询一次所有的fd,看看哪一个fd准备好返回给用户。

select()会记录它添加的所有的等待队列,在返回以前,确保从所有的fd移除。

总结:每一次select()调用需要 3 次遍历,第一次是对每个fd分别调用poll,第二次是遍历生成返回给用户的fd_sets,第三次是应用层面遍历进行业务逻辑处理。
在这里插入图片描述
伪代码:

int s = socket(AF_INET, SOCK_STREAM, 0);  
bind(s, ...)
listen(s, ...)

int fds[] =  存放需要监听的socket

while(1){
    int n = select(..., fds, ...)
    for(int i=0; i < fds.count; i++){
        if(FD_ISSET(fds[i], ...)){
            //fds[i]的数据处理
        }
    }
}

此处,有个问题,当某个fd发生了对应的读写事件,是怎么找到那个调用select的线程呢?
其实是每个fd对应的Socket都维护了一个阻塞列表,当用户线程调用select系统调用后,会将线程的pcb加入到每个Socket的阻塞列表中。当哪个fd触发了事件,则通过等待队列找到用户线程,然后唤醒,并将该线程的pcb移除工作队列。所以,每次调用select都需要重新方法要监听的fd集合,这样每次都会造成大量的文件描述符从用户态复制到内核态,浪费了空间。

每一个Socket(实际上可以是能够注册到select()上的任意fd - file descriptor)都有一个waiters的列表,他们(waiter)在等待socket上的活动(在linux上是 struct wait_queue_head_t)。这个Socket上无论什么时候有感兴趣的事件发生(到达新数据,缓冲区有可写的空闲空间,或者某种错误),这个Socket会遍历列表通知等待活动的每一个线程。

同时,由于返回给上层的是原fd集合(只是触发事件的fd的标志位不同了),用户线程还需要循环遍历一次来处理判断哪些fd发生了事件,还需要根据标志位判断发生了何种事件。

select将设置fd和监听fd这两步放在了一起,所以,每次都需要重新设置fd,其实可以将select的逻辑分为两步,第一步设置fd集合,第二步才是让内核线程循环检测这些fd是否发生事件,这样就省去了每次都需要复制大量fd的操作。而发生事件的fd对应的Socket标志位会发生改变,当处理完事件后,让内核线程将标志位复原即可(因为不管读还是写,这些事件都需要通过系统调用才可以,这样该调用的结尾添加将标志位复原的逻辑即可)。

1.2 示例

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class NioServer {

    // 保存客户端连接
    static List<SocketChannel> channelList = new ArrayList<>();

    public static void main(String[] args) throws IOException, InterruptedException {

        // 创建NIO ServerSocketChannel,与BIO的serverSocket类似
        ServerSocketChannel serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(9000));
        // 设置ServerSocketChannel为非阻塞
        serverSocket.configureBlocking(false);
        System.out.println("服务启动成功");

        while (true) {
            // 非阻塞模式accept方法不会阻塞,否则会阻塞
            // NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数
            SocketChannel socketChannel = serverSocket.accept();
            if (socketChannel != null) { // 如果有客户端进行连接
                System.out.println("连接成功");
                // 设置SocketChannel为非阻塞
                socketChannel.configureBlocking(false);
                // 保存客户端连接在List中
                channelList.add(socketChannel);
            }
            // 遍历连接进行数据读取
            Iterator<SocketChannel> iterator = channelList.iterator();
            while (iterator.hasNext()) {
                SocketChannel sc = iterator.next();
                ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                // 非阻塞模式read方法不会阻塞,否则会阻塞
                int len = sc.read(byteBuffer);
                // 如果有数据,把数据打印出来
                if (len > 0) {
                    System.out.println("接收到消息:" + new String(byteBuffer.array()));
                } else if (len == -1) { // 如果客户端断开,把socket从集合中去掉
                    iterator.remove();
                    System.out.println("客户端断开连接");
                }
            }
        }
    }
}

epoll_109">二、epoll

2.1 原理

上面说到了select的缺点,即需要大量的fd从用户态复制到内核态,而且每次调用都要发生一次。且需要两次循环才能对所有fd进行事件处理(一次内核线程循环找出哪些fd发生了事件,然后用户线程再循环一次)

epoll则是将设置fd和监听fd这两件事分离开来,这样就不用每次都重新设置fd集合了,而且epoll为每个监听的fd设置了一个回调函数,当某个fd触发事件后,就会通过回调函数将其放入一个就绪队列,然后用户线程调用epoll_wait会阻塞在该队列上,当该就绪队列有数据以后,会将其唤醒,这样就可以取到所有的发生了事件的fd,省去了内核线程的轮询和用户线程的轮询,用户线程得到的fd全部都是发生事件的fd。

epoll分为了三部分,第一步创建epoll,第二步设置要监听的fd集合,第三步用户线程获取发生事件的fd。

  1. epoll_create:创建一个epoll对象,并返回fd到用户态
  2. epoll_ctl:设置要监听的socket,当epoll监听某个socket发生了事件,将其fd放入就绪队列(rdllList),其实这里不是放入fd,而是一个封装了fd的epitem对象。
  3. epoll_wait:用户线程调用,检查rdList是否有数据,没有则阻塞,等待被唤醒。

epoll_ceate的原理比较简单,就是创建一个epoll对象。
epoll_ctl的原理就是根据传入的socket fd集合,将epoll对象的fd(epfd)放入每个socket的等待队列中,当某个socket发生了事件,则将epfd从等待队列中取出,然后通过该fd触发epfd对应的回调。那是怎么找到对应回调的呢?这是因为epoll持有一个红黑树,所有的注册的fd会插入到这棵红黑树上,这样当fd发生了事件后,epfd根据红黑树找到对应的节点,然后触发该节点里对应的回调方法。
epoll_wait的原理是将用户线程的pcb加入到epoll的等待队列(rdllList)中(因为epoll也是一个文件),然后当就绪队列(rdllList)不为空,通过等待队列得到用户线程的pcb,将其唤醒。

原理图:
在这里插入图片描述

伪代码:

//创建一个 epoll 对象 epfd
epoll_create(...);
//将所有需要监听的 socket 添加到 epfd 中
epoll_ctl(epfd, ...); 
//调用 epoll_wait 等待数据
epoll_wait(...)
struct eventpoll {
  ...
  /*红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,
  也就是这个epoll监控的事件*/
  struct rb_root rbr;
  /*双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件*/
  struct list_head rdllist;
  ...
};
---------epitem对象-----------
struct epitem {
  ...
  //红黑树节点
  struct rb_node rbn;
  //双向链表节点
  struct list_head rdllink;
  //事件句柄等信息
  struct epoll_filefd ffd;
  //指向其所属的eventepoll对象
  struct eventpoll *ep;
  //期待的事件类型
  struct epoll_event event;
  ...
}; // 这里包含每一个事件对应着的信息。

2.2 示例

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class NioServer {


    public static void main(String[] args) throws IOException {
        // 创建 NIO serverSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定端口
        serverSocketChannel.bind(new InetSocketAddress(19002));
        // 设置非阻塞
        serverSocketChannel.configureBlocking(false);
        // 打开 selector 处理 channel 即创建 epoll
        Selector selector = Selector.open();
        // 把 serverSocketChannel 注册到 selector 上
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            // 阻塞获取连接
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();
            while (selectionKeyIterator.hasNext()) {
                SelectionKey selectionKey = selectionKeyIterator.next();
                // 如果是连接注册事件
                if (selectionKey.isAcceptable()){
                    ServerSocketChannel serverSocket= (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel=serverSocket.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector,SelectionKey.OP_READ);
                    System.out.println("客户端连接成功");
                }else if(selectionKey.isReadable()){
                    //如果是读取事件
                    SocketChannel socketChannel= (SocketChannel) selectionKey.channel();
                    ByteBuffer byteBuffer=ByteBuffer.allocate(128);
                    int len=socketChannel.read(byteBuffer);
                    if (len>0){
                        System.out.println("接收到客户端消息: "+new String(byteBuffer.array()));
                    }else if (len==-1){
                        System.out.println("客户端断链:"+socketChannel.isConnected());
                        socketChannel.close();
                    }
                }
                // 迭代器中删除本次处理,避免重复处理
                selectionKeyIterator.remove();
            }
        }
    }
}

2.3 两种触发模式(LT、ET)

  • LT:水平触发模式,只要这个文件描述符还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作
  • ET:边缘触发模式,在它检测到有 I/O 事件时,通过 epoll_wait 调用会得到有事件通知的文件描述符,对于每一个被通知的文件描述符,如可读,则必须将该文件描述符一直读到空,让 errno 返回 EAGAIN 为止,否则下次的 epoll_wait 不会返回余下的数据,会丢掉事件。如果ET模式不是非阻塞的,那这个一直读或一直写势必会在最后一次阻塞。

这两种模式对应了我们的用户线程取出所有的发生事件的fd后,epoll是否还将未处理完的fd放回就绪队列。如果是边缘触发模式,如果数据未读取完毕,那么会将fd继续放入就绪队列,而水平触发模式则直接从就绪队列中删除,这样除非下次这个fd继续接收到消息,否则不会被再次处理。

这里有个问题:epoll是怎么知道某个fd是否数据全部读取完毕呢?
解答: 事件回调将socketfd放入就绪队列时,此时epoll会调用ep_send_events系统调用来向用户返回就绪事件,该回调会挨个取出就绪队列中的事件(epitem),然后将其从队列中删除,然后调用poll来检查标志位,如果真的发生了事件,则向用户态写事件,此处检查标志位的操作实现了水平触发和边缘触发,如果是水平触发,则将该事件重新插回就绪队列,边缘则直接删除,不进行任何操作。某个事件重新插回后,如果下次用户进程调用epoll_wait时,epoll会检查,如果该事件对应的fd实际上没有数据,则恢复标志位,直接删除。

2.4 总体流程

  1. 我们在用户态调用epoll_create创建一个epoll对象,并持有返回的epfd
  2. 我们在用户态创建一个监听socket,监听某个端口,并在用户态持有对应的 listenfd
  3. 将listenfd通过epoll_ctl注册到epoll的红黑树中,在该过程epoll将自己的epfd 通过listenfd插入到 listenSocket的等待队列中
  4. 当网卡收到新的连接(即listenSocket的读写事件),网卡驱动程序解析协议,将报文交给listen socket,然后listen socket通过三次握手创建一个新的Socket,并将epfd从等待队列中移除,通过epfd找到红黑树中对应自己的红黑树节点,将该节点的信息封装为一个epoll_event对象放入到epoll的就绪队列(rdllList)中。
  5. 用户态我们所写的服务器程序通过epoll_wait检测rdllList是否有元素,没有的话,会将自己的pcb放入epoll的等待队列。当不为空,epoll将等待队列中的pcb移除,唤醒进程,然后进程读取rdllList,获取到发生事件的socketfd。
  6. 这样我们服务端程序感知到有新连接到达,通过accpet系统调用从listenSocket的**创建完成连接的队列(具体名字我忘记了)**中取出一个socket,并将fd返回到用户态程序,然后我们就可以对这个fd进行操作,一般是将其注册到epoll中,并监听它的读写事件,epfd会加入到该socket对应的等待队列中。
  7. 这样当Socket发生读写事件后,由于每个socket都由一个五元组唯一确定,所以网卡驱动程序可以将报文交给对应的socket,然后socket从等待队列中移除epfd,类似于上面所讲,找到对应的红黑树节点,封装事件,加入就绪队列。
  8. 这里需要注意,我们的服务器程序可以使用reactor模型再来提高并发度。即主线程是运行epoll_wait且持有epfd的线程,当其检测到事件发生后,会将发生事件的fd交给业务线程去处理,再一次提高并发度。

三、Reactor

Reactor是一种并发编程模式,是运行在用户态的,即它的底层是I/O多路复用。它要求主线程只负责监听fd上是与否有事件发生,有的话立即将该事件和fd交给工作线程,让工作线程去处理。
在这里插入图片描述
流程图:
在这里插入图片描述


http://www.niftyadmin.cn/n/762955.html

相关文章

Java+Selenium框架:切换窗口和iframes

背景:在UI自动化测试过程中,偶尔会有切换窗口和iframes的操作,可能是因为有需要流转的业务流程。 1、窗口之前的切换方法:driver.switchTo().window("");传入handle句柄,代码如下: @Testpublic void test() throws Exception {// driver.navigate().to(Constan…

Java+Selenium框架:Actions类模拟鼠标/按键操作web

Java+Selenium框架:Actions类模拟鼠标操作页面元素,这里要讲的只是对web不同操作的补充; 1、我们都知道UI自动化测试是模拟用户行为操作的,那么怎能不用键盘呢? /*** 实现鼠标悬停 操作鼠标悬停到某个元素才有选项的场景*/@Testpublic void test() {driver.get(Constants…

操作系统的一些问题

文章目录一、进程、线程、协程1.1 进程、线程、协程在OS层是如何实现的&#xff1f;进程线程协程对比windows和linux中的线程1.2 内核态与用户态1.3 僵尸进程、孤儿进程、守护进程1.4 程序是如何启动&#xff1f;二、进程同步与通信2.1 进程通信的方式2.1.1 低级通信方式(同步与…

Java+Selenium框架:Keys类模拟键盘按键事件操作web

Java+Selenium框架:模拟键盘按键事件,主要是讲模拟键盘的组合键操作。 1、既然上篇说道模拟用户行为适用键盘操作,那么肯定有组合快捷键的使用方法咯: @Ignore@Testpublic void test() {driver.get(Constants.yahooUrl);driver.findElement(By.id("uh-signin"))…

Java+Selenium框架: 如何在日历中选择日期

Java+Selenium框架,对日期选择器取决在日期选择控件,但只要元素在dom里面就能找到方法定位该元素。 1、UI自动化测试过程中,会遇到时间控件的选择,有些是input是可以直接sendKeys发送时间字符串数据,有些是必须要选择输入的,以携程为例: @Test public void test1() t…

最长字串、子序列、回文字串、回文子序列

文章目录一、最长公共子序列二、最长公共子串如果是求具体的子串呢&#xff1f;三、最长回文子序列四、最长回文子串五、最短回文子串六、最长重复子串七、最长公共前缀八、最长递增子序列九、最长无重复子数组一、最长公共子序列 题目描述&#xff1a; 给定两个字符串 text1…

如何做到本地的mock环境地址通过外网也能访问?

背景 大多数本地联调的mock是不需要外网访问的&#xff0c;但如果是对外的联调测试环境&#xff0c;是真的访问对外的测试地址&#xff0c;这个时候本地的mock环境将不见得可用&#xff0c;虽然在网上找到了eazy-mock工具 &#xff0c;它非常灵敏且容易上手&#xff0c;但是在…

关于容器、K8s的一点认知

文章目录一、什么是容器二、什么是容器镜像三、什么是Volume四、什么是k8s、Pod五、k8s里的volume六、k8s的控制器模式七、k8s的网络原理八、k8s的调度原理一、什么是容器 容器实质上是一个进程&#xff0c;只是这个进程通过linux提供的namespace机制进行了隔离&#xff0c;以…