Linux网络IO基础组件实现

news/2024/7/9 17:16:57 标签: linux, tcp/ip, epoll

Linux网络IO基础组件实现

  • 1 理论基础组件
    • 1.1 Epoll模型概述
      • 1.1.1 Epoll模型对比Select/poll模型的优势
      • 1.1.2 Epoll基本用法
      • 1.1.3 Epoll触发方式
  • 2 基本实现
    • 2.1 Socket管理组件
      • 2.1.1 socket基本操作
      • 2.1.2 socket事件响应
    • 2.2 Epoll事件处理组件
      • 2.2.1 Epoll事件管理
      • 2.2.2 Epoll事件分发

1 理论基础组件

1.1 Epoll模型概述

1.1.1 Epoll模型对比Select/poll模型的优势

(1)Epoll可以使用的最大描述符个数将以线性的关系仅仅依赖于内存
(2)IO效率不随描述符数目增加而线性下降,主要得益于回调处理机制。
(3)使用mmap加速内核与用户空间的消息传递,即内核和用户空间公用一块内存,减少不必要的拷贝。

1.1.2 Epoll基本用法

Epoll用法比较简单分为三个部分:第一是创建Epoll;第二是控制消息;最后是IO处理
创建接口:

epoll_create(1024);//创建一个epoll最大支持1024个文件描述符数量

控制接口:用于维护fd集合

//其中注册:EPOLL_CTL_ADD 修改:EPOLL_CTL_MOD 删除EPOLL_CTL_DEL
epoll_ctl(m_epfd, EPOLL_CTL_ADD, fd, &ev);//用于注册fd,添加相关事件

IO事件触发接口:

epoll_wait(m_epfd, events, 1024, wait_timeout);//轮询注集合的IO事件

1.1.3 Epoll触发方式

(1)水平触发Epoll-LT:具体工作方式,文件描述符就绪后,内核会一直触发直到用户处理。支持阻塞和非阻塞模式。
(2)边缘触发Epoll-ET:具体工作方式,文件描述符就绪后,内核会通知用户处理,用户不处理,将不会继续通知。仅支持非阻塞模式。

2 基本实现

有了前面的理论基础后可以开始进行网络基础组件的设计。本次的设计主要实现两个任务一个是socket的基本操作实现,第二个是事件的IO模型实现。组件内部只关注网络操作,事件分发,对应具体业务进行分离。

2.1 Socket管理组件

2.1.1 socket基本操作

这部分主要是进行socket常规操作,包括服务器监听启动、客户端连接请求、接收数据、发送数据、及关闭socket连接等操作。

int base_socket_listen(void* this, const char* server_ip, uint16_t port, callback_t callback, void* callback_data)
{
    log_debug("base_socket_connect, server_ip=%s, port=%d", server_ip, port);

    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return -1;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return -1;

	priv->local_ip = server_ip;
	priv->local_port = port;
	priv->callback = callback;
	priv->callback_data = callback_data;

	priv->fd = socket(AF_INET, SOCK_STREAM, 0);
	if (priv->fd == INVALID_SOCKET)
	{
		log_error("socket failed, err_code=%d, server_ip=%s, port=%u", _get_error_code(), server_ip, port);
		return -1;
	}

	_set_reuse_addr(priv->fd);
	_set_non_block(priv->fd);

	sockaddr_in serv_addr;
	_set_addr(server_ip, port, &serv_addr);
    int ret = ::bind(priv->fd, (sockaddr*)&serv_addr, sizeof(serv_addr));
	if (ret == SOCKET_ERROR)
	{
        log_error("bind failed, err_code=%d, server_ip=%s, port=%u", _get_error_code(), server_ip, port);
		close(priv->fd);
		return -1;
	}

	ret = listen(priv->fd, 64);
	if (ret == SOCKET_ERROR)
	{
        log_error("listen failed, err_code=%d, server_ip=%s, port=%u", _get_error_code(), server_ip, port);
		close(priv->fd);
		return -1;
	}

	priv->state = SOCKET_STATE_LISTENING;

	log_debug("Listen on %s:%d", server_ip, port);

	manager_add_base_socket(this);
	event_dispatch_add_event(priv->fd, SOCKET_READ | SOCKET_EXCEP);
	return 0;
}

int base_socket_connect(void* this, const char* server_ip, uint16_t port, callback_t callback, void* callback_data)
{
	log_debug("base_socket_connect, server_ip=%s, port=%d", server_ip, port);

    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return -1;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return -1;

	priv->remote_ip = server_ip;
	priv->remote_port = port;
	priv->callback = callback;
	priv->callback_data = callback_data;

	priv->fd = socket(AF_INET, SOCK_STREAM, 0);
	if (priv->fd == INVALID_SOCKET)
	{
        log_error("socket failed, err_code=%d, server_ip=%s, port=%u", _get_error_code(), server_ip, port);
		return -1;
	}

	_set_non_block(priv->fd);
	_set_no_delay(priv->fd);
	sockaddr_in serv_addr;
	_set_addr(server_ip, port, &serv_addr);
	int ret = connect(priv->fd, (sockaddr*)&serv_addr, sizeof(serv_addr));
	if ( (ret == SOCKET_ERROR) && (!_is_block(_get_error_code())) )
	{	
        log_error("connect failed, err_code=%d, server_ip=%s, port=%u", _get_error_code(), server_ip, port);
		close(priv->fd);
		return -1;
	}
	priv->state = SOCKET_STATE_CONNECTING;
	manager_add_base_socket(this);
	event_dispatch_add_event(priv->fd, SOCKET_ALL);
	
	return priv->fd;
}

int base_socket_send(void* this, void* buf, int len)
{
    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return -1;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return -1;

	if (priv->state != SOCKET_STATE_CONNECTED)
		return -1;

	int ret = send(priv->fd, (char*)buf, len, 0);
	if (ret == SOCKET_ERROR)
	{
		int err_code = _get_error_code();
		if (_is_block(err_code))
		{
			ret = 0;
			//log("socket send block fd=%d", priv->fd);
		}
		else
		{
            log_error("send failed, err_code=%d, len=%d", err_code, len);
		}
	}

	return ret;
}

int base_socket_recv(void* this, void* buf, int len)
{
    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return -1;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return -1;

	return recv(priv->fd, (char*)buf, len, 0);
}

int base_socket_colse(void* this)
{
    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return -1;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return -1;

	event_dispatch_remove_event(priv->fd, SOCKET_ALL);
	manager_remove_base_socket(this);
	close(priv->fd);
	base_socket_release_ref();

	return 0;
}

2.1.2 socket事件响应

这部分主要是用于event_dispatch事件触发后调用,当IO事件上报后,进行相关回调操作。

void base_socket_onread(void* this)
{
    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return ;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return ;

	if (priv->state == SOCKET_STATE_LISTENING)
	{
		_accept_new_socket();
	}
	else
	{
		u_long avail = 0;
        int ret = ioctlsocket(priv->fd, FIONREAD, &avail);
		if ( (SOCKET_ERROR == ret) || (avail == 0) )
		{
			priv->call_back(priv->callback_data, SOCKET_MSG_CLOSE, (int)priv->fd, NULL);
		}
		else
		{
			priv->call_back(priv->callback_data, SOCKET_MSG_READ, (int)priv->fd, NULL);
		}
	}
}

void base_socket_onwrite(void* this)
{
    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return ;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return ;

	if (priv->state == SOCKET_STATE_CONNECTING)
	{
		int error = 0;
		socklen_t len = sizeof(error);

		getsockopt(priv->fd, SOL_SOCKET, SO_ERROR, (void*)&error, &len);

		if (error) {
			priv->call_back(priv->callback_data, SOCKET_MSG_CLOSE, (int)priv->fd, NULL);
		} else {
			priv->state = SOCKET_STATE_CONNECTED;
			priv->call_back(priv->callback_data, SOCKET_MSG_CONFIRM, (int)priv->fd, NULL);
		}
	}
	else
	{
		priv->call_back(priv->callback_data, SOCKET_MSG_WRITE, (int)priv->fd, NULL);
	}
}

void base_socket_onclose(void* this)
{
    base_socket_t *p = (void *base_socket_t)this;
    if(!p)  return ;

    base_socket_priv_t *priv = (base_socket_priv_t *)p->priv;
    if(!priv) return ;

	priv->state = SOCKET_STATE_CLOSING;
	priv->call_back(priv->callback_data, SOCKET_MSG_CLOSE, (int)priv->fd, NULL);
}

2.2 Epoll事件处理组件

2.2.1 Epoll事件管理

这部分用于添加socket和移除epoll管理事件

void event_dispatch_add_event(int fd, uint8_t socket_event)
{
	struct epoll_event ev;
	ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLPRI | EPOLLERR | EPOLLHUP;
	ev.data.fd = fd;
	if (epoll_ctl(cxt.epfd, EPOLL_CTL_ADD, fd, &ev) != 0)
	{
		log_error("epoll_ctl() failed, errno=%d", errno);
	}
}

void event_dispatch_remove_event(int fd, uint8_t socket_event)
{
	if (epoll_ctl(cxt.epfd, EPOLL_CTL_DEL, fd, NULL) != 0)
	{
		log_error("epoll_ctl failed, errno=%d", errno);
	}
}

2.2.2 Epoll事件分发

这部分主要是对应定时器事件,循环处理事件添加和检查触发添加的事件;第二部分是启动整个epoll事件管理,包括socket事件响应和外部添加的定时机器,循环处理事件都是在主循环内完成。

void event_dispatch_add_timer(callback_t callback, void* user_data, uint64_t interval)
{
	list<timer_item_t*>::iterator it;
	for (it = m_timer_list.begin(); it != m_timer_list.end(); it++)
	{
		timer_item_t* pitem = *it;
		if (pitem->callback == callback && pitem->user_data == user_data)
		{
			pitem->interval = interval;
			pitem->next_tick = get_tick_count() + interval;
			return;
		}
	}

	timer_item_t* pitem = (timer_item_t*) malloc(sizeof(timer_item_t));
	pitem->callback = callback;
	pitem->user_data = user_data;
	pitem->interval = interval;
	pitem->next_tick = get_tick_count() + interval;
	m_timer_list.push_back(pitem);
}

void event_dispatch_remove_timer(callback_t callback, void* user_data)
{
	list<timer_item_t*>::iterator it;
	for (it = m_timer_list.begin(); it != m_timer_list.end(); it++)
	{
		timer_item_t* pitem = *it;
		if (pitem->callback == callback && pitem->user_data == user_data)
		{
			m_timer_list.erase(it);
			delete pitem;
			return;
		}
	}
}

void _check_timer()
{
	uint64_t curr_tick = get_tick_count();
	list<timer_item_t*>::iterator it;

	for (it = m_timer_list.begin(); it != m_timer_list.end(); )
	{
		timer_item_t* pitem = *it;
		it++;		// iterator maybe deleted in the callback, so we should increment it before callback
		if (curr_tick >= pitem->next_tick)
		{
			pitem->next_tick += pitem->interval;
			pitem->callback(pitem->user_data, NETLIB_MSG_TIMER, 0, NULL);
		}
	}
}

void event_dispatch_add_loop(callback_t callback, void* user_data)
{
    timer_item_t* pitem = new timer_item_t;
    pitem->callback = callback;
    pitem->user_data = user_data;
    m_loop_list.push_back(pitem);
}

void _check_loop()
{
    for (list<timer_item_t*>::iterator it = m_loop_list.begin(); it != m_loop_list.end(); it++) {
        timer_item_t* pitem = *it;
        pitem->callback(pitem->user_data, NETLIB_MSG_LOOP, 0, NULL);
    }
}

void event_dispatch_start_dispatch(uint32_t wait_timeout)
{
	struct epoll_event events[1024];
	int nfds = 0;

    if(cxt.running)
        return;
    cxt.running = true;
    
	while (cxt.running)
	{
		nfds = epoll_wait(cxt.epfd, events, 1024, wait_timeout);
		for (int i = 0; i < nfds; i++)
		{
			int ev_fd = events[i].data.fd;
			base_socket_t* psocket = manager_find_base_socket(ev_fd);
			if (!psocket)
				continue;
				
            if (events[i].events & EPOLLRDHUP)
            {
                psocket->onclose();
            }
            
			if (events[i].events & EPOLLIN)
			{
				psocket->onread();
			}

			if (events[i].events & EPOLLOUT)
			{
				psocket->onwirte();
			}

			if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP))
			{
				psocket->onclose();
			}
			
			psocket->release_ref();
		}

		_check_timer();
        _check_loop();
	}
}

void event_dispatch_stop_dispatch()
{
    cxt.running = false;
}

http://www.niftyadmin.cn/n/927497.html

相关文章

模拟RTSP服务器实现实时推流

模拟RTSP服务器实现实时推流前言1 ZLMediaKit开源项目搭建1.1 源码下载1.3 依赖库1.4 编译1.5 测试2 FFmpeg推流前言 搭建本服务器主要是为了利用FFmpeg进行各种音视频codec的转换输出&#xff0c;验证客户端的解码器兼容情况。当然也可以采用其他的方案来替代。 1 ZLMediaKi…

FFmpeg命令行实践

FFmpeg命令行实践1 概述2 基本原理3 命令行选项4 命令行实践4.1 音频转换4.2 视频转换4.2.1提取mkv中视频流转为h264裸流4.2.2 提取前5s时间转h2644.2.3 根据codec进行转换4.2.4 设置输出码率&#xff0c;帧率4.2.5 设置输出分辨率4.2.5 转yuv裸流文件5 结语1 概述 参考官方文…

Ubuntu18.04 PulseAudio实战

1 前言 2 下载编译 具体构建过程参照官方文档&#xff1a;https://www.freedesktop.org/wiki/Software/PulseAudio/Documentation/Developer/PulseAudioFromGit/ 2.1 下载代码 git clone git://anongit.freedesktop.org/pulseaudio/pulseaudio 2.2 编译 编译过程遇到太多错…

Ubuntu18.04 x264命令行实操

Ubuntu18.04 x264命令行实操1 源码下载编译2 基本参数介绍2.1 profile参数详解2.2 tune参数详解2.3 preset参数详解2.4 速率控制QPABRCBR2-Pass ABRCRFVBV3 命令行实践3.1 查看帮助3.2 输入文件3.2 输出文件3.3 基本命令3.4 参考文献1 源码下载编译 https://code.videolan.org…

dpkg: error processing package xxx解决方法

1 错误内容&#xff1a; rootxxx:/home/my_project/application-necessities# sudo apt-get -y install build-essential nghttp2 libnghttp2-dev libssl-dev Reading package lists... Done Building dependency tree Reading state information... Done build-essent…

curl: (60) SSL certificate problem: certificate has expired终极解决方案

问题&#xff1a; rootubuntu-64bit:/home/work/avs_project/build# curl -I https://nghttp2.org/ -v * Trying 139.162.123.134... * TCP_NODELAY set * Connected to nghttp2.org (139.162.123.134) port 443 (#0) * ALPN, offering h2 * ALPN, offering http/1.1 * Ciph…

mosquitto-1.5.7交叉编译

1 下载源文件 下载地址&#xff1a;http://mosquitto.org/files/source/ tar -xvf mosquitto-1.5.7.tar.gz cd mosquitto-1.5.7 2 修改配置 vi config.mk #修改下面配置 WITH_SRV:no WITH_UUID:no WITH_WEBSOCKETS:no WITH_DOCS:no #添加ssl交叉编译库路径和头文件路径 CFLAGS…

设计模式初探

设计模式初探0 前言1 基础知识1.1 组合优于继承1.2 虚函数继承1.3 类之间的关系2 设计原则1.1 开放封闭原则1.2 单一职责原则1.3 接口隔离原则1.4 里氏替换原则1.5 面向接口原则1.6 依赖倒置原则1.7 封装变化点1.8 组合优于继承2 模板方法3 观察者模式4 策略模式0 前言 设计模…