057 高级 IO 之 epoll 详解与 CMake 入门

epoll 和 CMake 的使用

1. epoll 简介

epoll 是 Linux 系统提供的一种 IO 多路复用机制,用来替代传统的 select 和 poll。它的核心优势是:

  • 高效:性能不会随着监听的文件描述符数量增加而下降。
  • 内存友好:只返回就绪的事件,而不是遍历所有文件描述符。
  • 支持边缘触发:可以更灵活地控制事件触发方式。

2. epoll 的三大函数

epoll_createepoll_ctlepoll_wait 是 epoll 机制的三个核心函数,可以类比为:

  • epoll_create():创建一个 epoll 实例(相当于创建一个事件监听器)。
  • epoll_ctl():管理要监听的文件描述符(添加、修改、删除监听列表中的 fd)。
  • epoll_wait():等待事件发生(阻塞等待,直到有事件发生或超时)。

这三兄弟的关系就像一个管理系统的三个操作:

  1. epoll_create():创建一个管理办公室。
  2. epoll_ctl():向办公室登记/修改/删除要监控的员工(文件描述符)。
  3. epoll_wait():在办公室等待,当有员工出事(事件发生)时进行通知。

这种设计使得 epoll 可以高效地管理大量文件描述符,特别适合高并发的服务器程序。

3. epoll_create/epoll_create1 —— 创建 epoll 实例

介绍:创建一个 epoll 实例,返回一个文件描述符,后续的所有 epoll 操作都通过这个 fd 进行。

函数原型:

1
2
3
4
#include <sys/epoll.h>

int epoll_create(int size); // 旧版
int epoll_create1(int flags); // 推荐:带 flags(如 EPOLL_CLOEXEC)

参数

  • size:告诉内核你 大概 要监听多少个文件描述符,它只是一个 提示值,在较新的内核中这个参数已经不重要了,但必须大于 0,内核会动态调整内部数据结构的大小,实际可以监听的 fd 数量 只受系统资源限制(如文件描述符限制、内存等),定义 size = 10,但实际监听 1000 个 fd 也没问题。
  • flags 常用 0EPOLL_CLOEXEC(在 exec 时自动关闭 epfd),即:epoll_create1(0)epoll_create1(EPOLL_CLOEXEC)

返回值

  • 成功:返回一个 epoll 文件描述符 epfd(>= 0),后续用这个 fd 来操作 epoll
  • 失败:返回 -1,同时设置 errno。

使用示例

1
2
3
4
5
6
int epfd = epoll_create(1024);  // 创建一个 epoll 实例,预计监听1024个fd
if (epfd == -1)
{
perror("epoll_create failed");
return -1;
}

4. epoll_ctl —— 控制监听列表(注册 / 修改 / 删除 关注的 fd)

介绍:向 epoll 实例中添加、修改或删除要监听的文件描述符。

函数原型:

1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_event 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
struct epoll_event
{
uint32_t events; // 要监听的事件类型
epoll_data_t data; // 用户数据,可以存储fd或指针
};

typedef union epoll_data
{
void *ptr; // 可以指向任意数据
int fd; // 文件描述符
uint32_t u32; // 32位无符号整数
uint64_t u64; // 64位无符号整数
} epoll_data_t;

参数:

  • epfdepoll_create 返回的 epoll 文件描述符。

  • op:操作类型,告诉 epoll 要做什么。

    • EPOLL_CTL_ADD添加 一个新的文件描述符到监听列表。
    • EPOLL_CTL_MOD修改 已存在文件描述符的监听事件。
    • EPOLL_CTL_DEL:从监听列表中 删除 一个文件描述符。
  • fd:目标被监控的文件描述符/要操作的文件描述符(socket、管道、文件等)。

  • event:指向 epoll_event 结构体的指针。

    • events 指定要关注/监听的事件,常用的 events 事件
      • EPOLLIN:文件描述符 可读
      • EPOLLOUT:文件描述符 可写
      • EPOLLERR:文件描述符 错误
      • EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)。
      • EPOLLRDHUP:对端文件描述符关闭(连接)。
      • EPOLLET:边缘触发模式。
      • EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听该文件描述符的话,需要重新将该文件描述符添加到 epoll 模型中。
    • data 用于回传用户自定义数据(通常存 fd 或结构体指针)。eventEPOLL_CTL_DEL 可传 NULL(在某些内核版本仍需有效结构,传地址更保险)。

注意:添加前务必确保 fd 是合法且已打开。若使用 EPOLLET(边沿触发),必须 把 fd 设为非阻塞(fcntl + O_NONBLOCK),EPOLL_CTL_MOD 用于修改同一 fd 的事件或 user data。

只有删除操作(EPOLL_CTL_DEL)可以传 nullptr,添加和修改操作必须传有效的 epoll_event 指针:

  • EPOLL_CTL_ADD:需要告诉内核监听什么事件 → 必须传 struct epoll_event *
  • EPOLL_CTL_MOD:需要告诉内核修改成什么事件 → 必须传 struct epoll_event *
  • EPOLL_CTL_DEL:只是删除,不需要指定事件 → 可以传 nullptr

传 nullptr 的含义:删除操作不需要关心事件类型,只要告诉内核要删除哪个 fd 即可,内核只需要 fd 值就能找到红黑树(下文会提到)中对应的节点并删除,传 nullptr 可以避免传递不必要的参数,提高效率。

返回值

  • 成功:返回 0。
  • 失败:返回 -1,同时设置 errno。

使用示例

1
2
3
4
5
6
7
8
9
10
11
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET; // 监听可读事件,使用边缘触发
ev.data.fd = client_fd; // 将客户端fd保存到data中

// 添加客户端fd到epoll监听列表
int ret = epoll_ctl(epfd, EPOLL_CTL_ADD, client_fd, &ev);
if (ret == -1)
{
perror("epoll_ctl add failed");
return -1;
}

5. epoll_wait —— 等待事件发生

介绍:阻塞等待,直到有文件描述符上的事件发生,然后返回所有就绪的事件。

函数原型:

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

参数(结构体同上)

  • epfdepoll_create 返回的 epoll 文件描述符。
  • events:指向 epoll_event 数组的指针,用于接收(内核返回)就绪的事件信息。
  • maxevents:数组大小(能接收的最大就绪事件数)。
  • timeout:超时/最长等待时间(毫秒)。
    • -1:永久阻塞,直到有事件发生。
    • 0:非阻塞,立即返回。
    • > 0:最多等待 timeout 毫秒。

要点

  • events[i].data 是你在 epoll_ctl 时设置的数据(常用来快速拿到对应的 fd 或连接结构体)。
  • epoll_wait 返回后应遍历 events 数组并处理每个就绪项。
  • maxevents 不应小于你预计一次处理的并发就绪数,通常设置为 64、128 或更大。

返回值

  • > 0:返回就绪事件的数量(events [0..ret-1])。
  • 0:超时(在指定时间内没有事件发生)。
  • -1:出错,同时设置 errno。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];

// 等待事件发生,最多返回10个就绪事件,超时时间为-1(永久等待)
int num_events = epoll_wait(epfd, events, MAX_EVENTS, -1);
if (num_events == -1)
{
perror("epoll_wait failed");
return -1;
}

// 处理所有就绪的事件
for (int i = 0; i < num_events; i++)
{
int fd = events[i].data.fd; // 获取发生事件的文件描述符

if (events[i].events & EPOLLIN) // 如果是可读事件
{
// 处理读操作
if (fd == listen_fd)
{
// 监听socket可读,说明有新连接
}
else
{
// 客户端socket可读,接收数据
}
}
}

6. epoll 的底层原理

epoll 之所以比 selectpoll 快、高效不是靠单一技术,而是 数据结构 + 算法 + 机制 的完美结合:它用了 “红黑树 + 就绪队列 + 回调机制” 这三样核心设计,让“监听谁”“谁就绪了”“怎么取结果”都高效完成,避免了反复轮询和复制。

1. epoll 的三大核心组件

名称数据结构作用
红黑树 rbrstruct rb_root rbr;存放“我要关注哪些 fd、关心哪些事件”的集合(监控列表)
就绪队列 rdliststruct list_head rdlist;存放“已经就绪的 fd 事件”,等 epoll_wait 来取
回调机制 ep_poll_callback函数指针当设备驱动检测到某个 fd 就绪时自动触发,把它加入就绪队列
  • 红黑树(rbr):存储要监听的所有文件描述符,就像“购物清单”,记录了所有要关注的商品(文件描述符),文件描述符天然作为红黑树的 key,查找速度很快 O(log n),每次调用 epoll_ctl 就是在这张清单上增删改项目。
  • 就绪队列(rdlist):存储已经就绪的文件描述符,就像“已到货通知单”,记录了哪些商品已经到了,可以取货,每次调用 epoll_wait 就是来取这张通知单。

这三个东西组合在一起,就构成了一个完整的 epoll 模型(对应内核结构 eventpoll)。

2. 一个核心机制:回调 —— epoll 高效的 秘密武器

  • select/poll 的问题: 程序问操作系统:”我关注的这些 fd,哪些有数据了?”,操作系统:”我一个一个帮你查一遍…”,每次都要遍历所有 fd,效率随 fd 数量增加而下降。
  • epoll 的聪明做法:程序告诉操作系统:”我要关注这些 fd 的事件”,操作系统在内核里建立回调函数(ep_poll_callback),当网卡收到数据时,硬件直接通知内核:”fd 3 有数据了!”,内核自动调用回调函数,把 fd 3 从红黑树移到就绪队列,程序调用 epoll_wait 时,直接取就绪队列就行。

比喻理解: 想象你在网上购物:

  • select/poll:你每隔几分钟就去快递点问:”我的包裹到了吗?”,快递员要查所有包裹。
  • epoll:快递员有你的电话,包裹一到就给你打电话,你再过去取。

3. 整体流程概览

三个函数 epoll_createepoll_ctlepoll_wait,分别对应底层三个阶段:

用户函数内核动作对应的数据结构
epoll_create创建一个 eventpoll 实例初始化红黑树 rbr、就绪队列 rdlist
epoll_ctl把 fd 添加/修改/删除到红黑树操作红黑树节点(每个节点是 epitem)
epoll_wait等待就绪事件从就绪队列里取出已经准备好的事件

4. 内部关键对象:epitem

每一个通过 epoll_ctl 加入监听的 fd,内核都会创建一个对应的结构体 epitem

1
2
3
4
5
6
7
8
struct epitem
{
struct rb_node rbn; // 红黑树节点 (对应 epoll_ctl 增删改)
struct list_head rdllink; // 链表节点 (对应就绪队列)
struct epoll_filefd ffd; // 文件描述符信息
struct eventpoll *ep; // 所属 epoll 实例
struct epoll_event event; // 要监听的事件类型
};

可以理解为:红黑树节点:表示我关注了 fd 上的这些事件,就绪队列节点:表示 fd 上的事件真的发生了,ffd + event:是谁、关心什么。

5. 特殊处理

  • 普通模式:事件就绪后,继续保留在红黑树中,下次还会通知。
  • EPOLLONESHOT 模式:事件就绪后,会 自动从红黑树删除,想再次监听这个 fd,必须重新添加(调用 epoll_ctl(ADD)),常用于多线程模式,防止同一个 fd 被多个线程同时处理。
  • EPOLLET(边沿触发): 只在状态变化时通知一次,必须配合非阻塞 IO,否则可能漏事件,减少系统调用次数,进一步提升性能。

6. 线程安全保障

  • 就绪队列用互斥锁保护,多线程访问安全
  • 等待队列处理多个线程同时访问的情况

7. 为什么比 select/poll 高效?

  1. O(1) 查找:红黑树保证添加/删除操作是 O(log n),比数组快。
  2. 按需通知:只有事件真正发生时才通知,不需要轮询。
  3. 批量处理:一次 epoll_wait 可以返回多个就绪事件。
  4. 内存友好:只返回就绪的 fd,不是所有 fd。

7. 代码示例(epoll LT)

完整代码请前往 GitHub 查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include <memory>
#include "Socket.hpp"
#include "noCopy.hpp"
#include "Epoller.hpp"
using namespace std;

// 定义epoll事件类型常量
uint32_t EVENT_IN = (EPOLLIN); // 可读事件(EPOLLIN)
uint32_t EVENT_OUT = (EPOLLOUT); // 可写事件(EPOLLOUT)

class EpollServer : public noCopy // 继承防拷贝基类,确保服务器对象不能被拷贝
{
static const int num = 64; // epoll_wait最多返回的事件数量

public:
EpollServer(uint16_t port) // 构造函数,接收端口号
: _port(port), // 初始化端口号
_listsocket_ptr(new Sock()), // 创建监听socket智能指针
_epoller_ptr(new Epoller()) // 创建epoller智能指针
{
}

~EpollServer()
{
_listsocket_ptr->Close(); // 关闭监听socket
}

void Init()
{
_listsocket_ptr->Socket(); // 创建socket
_listsocket_ptr->Bind(_port); // 绑定端口
_listsocket_ptr->Listen(); // 开始监听

log_(Info, "create listen socket success! fd = %d", _listsocket_ptr->Fd()); // 记录监听socket创建成功日志
}

// 处理新连接
void Accept()
{
std::string client_ip; // 存储客户端IP
uint16_t client_port; // 存储客户端端口
int sockfd = _listsocket_ptr->Accept(&client_ip, &client_port); // 接收新连接
if (sockfd > 0) // 如果接收连接成功
{
_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sockfd, EVENT_IN); // 将新连接的fd添加到epoll监听列表,监听可读事件
log_(Info, "获得一个新的连接:客户端说:%s:%d", client_ip.c_str(), client_port); // 记录新连接日志
}
}

// 处理数据接收
void Recver(int fd)
{
char buffer[1024]; // 临时缓冲区,用于接收数据
int n = read(fd, buffer, sizeof(buffer) - 1); // 从指定fd读取数据
if (n > 0) // 成功读取到数据
{
buffer[n] = '\0'; // 添加字符串结束符
std::cout << "收到消息:" << buffer << std::endl;// 输出收到的消息

std::string echo_str = "Sverver echo # " + std::string(buffer); // 构造回显消息
write(fd, echo_str.c_str(), echo_str.size()); // 将回显消息写回客户端
}
else if (n == 0) // 客户端关闭连接(读到文件结束符)
{
log_(Info, "客户端断开连接: fd=%d", fd); // 记录客户端断开连接日志
std::cout << "客户端断开连接" << std::endl; // 输出断开连接信息
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0); // 从epoll监听列表中删除该fd
close(fd); // 关闭该连接的文件描述符
}
else // 读取数据出错
{
log_(Error, "read error: fd=%d", fd); // 记录读取错误日志
std::cout << "read error" << std::endl; // 输出错误信息
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0); // 从epoll监听列表中删除该fd
close(fd); // 关闭该连接的文件描述符
}
}

void Dispatcher(struct epoll_event revs[], int num) // 事件分发器,处理所有就绪的事件
{
for (int i = 0; i < num; i++) // 遍历所有就绪的事件
{
uint32_t events = revs[i].events; // 获取事件类型
int fd = revs[i].data.fd; // 获取发生事件的文件描述符

if (events & EVENT_IN) // 如果是可读事件
{
if (fd == _listsocket_ptr->Fd()) // 如果是监听socket可读(有新连接)
{
Accept(); // 处理新连接
}
else
{
Recver(fd); // 处理数据接收
}
}
else if (events & EVENT_OUT) // 如果是可写事件
{
// 可写事件处理逻辑
}
else // 其他事件
{
// 其他事件处理逻辑
}
}
}

void Start() // 启动服务器主循环
{
_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, _listsocket_ptr->Fd(), EVENT_IN); // 将监听socket添加到epoll监听列表
struct epoll_event revs[num]; // 创建epoll_event数组,用于接收就绪事件

for (;;) // 服务器无限循环
{
int n = _epoller_ptr->EpollerWait(revs, num); // 等待事件发生,最多返回num个事件
if (n > 0) // 有事件发生
{
log_(Info, "有事件已经就绪,开始处理……其文件描述符是:%d", revs[0].data.fd);// 记录第一个就绪事件的fd
Dispatcher(revs, n); // 分发处理所有就绪的事件
}
else if (n == 0) // 超时(没有事件发生)
{
log_(Info, "timeout..."); // 记录超时日志
}
else // epoll_wait出错
{
log_(Error, "epoll_wait error!"); // 记录错误日志
// break; // 出错时可以选择退出循环
}
}
}

private:
std::shared_ptr<Sock> _listsocket_ptr; // 监听socket的智能指针
std::shared_ptr<Epoller> _epoller_ptr; // Epoller对象的智能指针
uint16_t _port; // 服务器端口号
};

8. epoll 的工作方式

1. 核心区别

模式触发时机是否重复通知是否必须非阻塞实现复杂度通知频率
LT(Level Triggered,epoll 的默认模式)只要内核缓冲区里有数据(高电平状态)一直 通知不强制,可阻塞或非阻塞简单:可分多次处理事件高(只要事件存在就反复通知)
ET(Edge Triggered)只有“状态变化”时触发(从无到有/有到多)只通知一次必须 非阻塞(否则可能永久阻塞)复杂:必须一次性处理完所有数据低(仅状态变化时通知一次)

类比理解: 可以把内核数据缓冲区理解为“水桶”:

  • LT 模式: 只要桶里有水(数据没读完),内核就会一遍又一遍告诉你“有水!”,所以你可以慢慢舀,不急着一次读完。
  • ET 模式: 只有桶第一次被装满、或者水又多了一点时,才会告诉你一次,之后不会再提醒。所以必须一次把水全舀光,否则漏掉的数据就永远没人告诉你了。

性能与设计取舍:

对比项LTET
内核通知次数多(每次都通知)少(状态变化才通知)
CPU 开销稍高更低
编程难度简单较高
安全性容错性强(可多次处理)容错性低(必须彻底处理)
实际应用select、poll 属于 LTNginx、Redis 使用 ET

2. ET 模式的编程要点

  1. 必须设置非阻塞:

    1
    2
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
  2. 循环读写直到返回错误:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    // 读
    while (true)
    {
    ssize_t n = recv(fd, buf, sizeof(buf), 0);
    if (n == -1)
    {
    if (errno == EAGAIN || errno == EWOULDBLOCK)
    {
    break; // 读完
    }
    else
    {
    perror("recv error");
    }
    }
    else if (n == 0)
    {
    // 对端关闭
    close(fd);
    break;
    }
    else
    {
    // 正常读取
    process(buf, n);
    }
    }

    // 写同理,循环 send,直到 EAGAIN
  3. 注册事件时带上 EPOLLET

    1
    2
    3
    4
    epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN | EPOLLET;
    epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

3. 疑难解答

1. LT 模式下,如果我把 fd 设为非阻塞,并在第一次通知时就循环读完所有数据,那和 ET 有什么区别?

逻辑行为上几乎一样,性能也接近。但 LT 模式仍有“条件判断 + 冗余通知”的系统开销,ET 是内核级别的“只在状态变化时触发”,性能更纯粹:

  • LT 仍然会“准备通知”,但因为你已经清空了缓冲区,所以下次 epoll_wait 不会再返回该 fd。
  • 但 LT 仍保留“兜底”能力:万一你漏读了,下次还会提醒你。
  • 所以:ET 的优势不是“必须更快”,而是“强制你写出高效代码”

实际上,高性能服务器(如 Redis、Nginx)选择 ET,是为了避免“意外的重复通知”带来的开销,尤其是在连接数极高的场景。

2. 为什么 ET 必须用非阻塞 IO?

因为 ET 要求你 一次性读完所有数据。如果使用阻塞 IO,当你读到最后一次(缓冲区已空),recv永远阻塞,因为没有新数据到来,epoll 也不会再通知你。非阻塞 IO 在无数据时立即返回 -1 并设置 errno = EAGAIN,让你知道“本次数据已读完”。

3. ET 模式真的更高效吗?

在特定条件下是的

  • 减少 epoll_wait 的唤醒次数 → 降低系统调用开销。
  • 促使应用层批量处理数据 → 更好的 cache locality 和吞吐。
  • TCP 窗口优化:当接收方快速消费数据(ET 强制你这么做),TCP 接收窗口更大,发送方可以一次发更多数据,减少小包和 ACK 开销。

注意:如果 ET 实现不当(如漏读、未设非阻塞),反而会导致连接“假死”,比 LT 更危险。

9. 代码示例(epoll ET)

完整代码请前往 GitHub 查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
#pragma once

#include <iostream>
#include <string>
#include <memory>
#include <cerrno>
#include <functional>
#include <unordered_map>
#include "Log.hpp"
#include "noCopy.hpp"
#include "Epoller.hpp"
#include "Socket.hpp"
#include "Comm.hpp"
#include <cstring>

// 前向声明Connection和TcpServer类,避免循环包含
class Connection;
class TcpServer;

uint32_t EVENT_IN = (EPOLLIN | EPOLLET); // 定义读事件掩码(EPOLLIN | EPOLLET)
uint32_t EVENT_OUT = (EPOLLOUT | EPOLLET); // 定义写事件掩码(EPOLLOUT | EPOLLET)
const static int g_buffer_size = 1024; // 定义缓冲区大小为1024字节

// using func_t = std::function<void(std::shared_ptr<Connection>)>;
using func_t = std::function<void(std::weak_ptr<Connection>)>; // 定义回调函数类型,参数为weak_ptr类型的连接对象
using except_func = std::function<void(std::weak_ptr<Connection>)>; // 定义异常处理函数类型

// Connection类:管理单个客户端连接
class Connection
{
public:
// 构造函数,接收socket文件描述符
// Connection(int sock, std::shared_ptr<TcpServer> tcp_server_ptr)
// : _sock(sock),
// _tcp_server_ptr(tcp_server_ptr)
// {

// }
Connection(int sock)
: _sock(sock)
{

}

// 设置连接的回调函数
// void SetHandler(func_t recv_cb, func_t send_cb, func_t except_cb)
// {
// _recv_cb = recv_cb;
// _send_cb = send_cb;
// _except_cb = except_cb;
// }
void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb)
{
_recv_cb = recv_cb;
_send_cb = send_cb;
_except_cb = except_cb;
}

// 获取socket文件描述符
int SockFd()
{
return _sock;
}

// 向输入缓冲区追加数据
void AppendInBuffer(const std::string& data)
{
_inbuffer += data;
}

// 向输出缓冲区追加数据
void AppendOutBuffer(const std::string& info)
{
_outbuffer += info;
}

// const std::string& Inbuffer()
// {
// return _inbuffer;
// }
// 获取输入缓冲区的引用
std::string& Inbuffer()
{
return _inbuffer;
}

// 获取输出缓冲区的引用
std::string& OutBuffer()
{
return _outbuffer;
}

// 设置指向TCP服务器的弱引用指针
void SetWeakPtr(std::weak_ptr<TcpServer> tcp_server_ptr)
{
_tcp_server_ptr = tcp_server_ptr;
}

~Connection()
{

}

private:
int _sock; // socket文件描述符
std::string _inbuffer; // 输入缓冲区
std::string _outbuffer; // 输出缓冲区
public:
func_t _recv_cb; // 读回调函数
func_t _send_cb; // 写回调函数
// func_t _except_cb;
except_func _except_cb; // 异常回调函数

std::weak_ptr<TcpServer> _tcp_server_ptr; // 指向TCP服务器的弱引用
std::string _ip; // 客户端IP地址
uint16_t _port; // 客户端端口号
};



// TcpServer类:TCP服务器主类,支持epoll事件驱动
class TcpServer : public std::enable_shared_from_this<TcpServer>, public noCopy
{
static const int num = 64; // 定义epoll事件数组大小为64

public:
TcpServer(uint16_t port, func_t OnMessage) // 构造函数,接收端口号和消息处理函数
: _port(port), // 服务器端口号
_OnMessage(OnMessage), // 消息处理函数
_quit(true), // 退出标志
_epoller_ptr(new Epoller()), // epoll对象指针
_listensock_ptr(new Sock()) // 监听socket对象指针
{

}

void Init() // 初始化服务器
{
_listensock_ptr->Socket(); // 创建socket
SetNonBlockOrDie(_listensock_ptr->Fd()); // 设置为非阻塞模式
_listensock_ptr->Bind(_port); // 绑定端口
_listensock_ptr->Listen(); // 开始监听

// log_(Info, "创建listen socket成功,fd:", _listensock_ptr->Fd());
log_(Info, "TCP服务器初始化成功,监听端口: %d, listen socket fd: %d", _port, _listensock_ptr->Fd());

// 将监听socket添加到epoll中,设置读事件回调
AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);
}

// void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, func_t except_cb, const std::string &ip = "0.0.0.0", uint16_t port = 0)
// {
// // std::shared_ptr<Connection> new_connection=std::make_shared<Connection>(sockfd, std::shared_ptr<TcpServer>(this));
// std::shared_ptr<Connection> new_connection = std::make_shared<Connection>(sockfd, std::shared_ptr<TcpServer>(this));
// new_connection->SetHandler(recv_cb, send_cb, except_cb);
// new_connection->_ip=ip;
// new_connection->_port=port;

// _connections.insert(std::make_pair(sockfd, new_connection));

// _epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sockfd, event);

// log_(Debug, "添加一个新的连接,fd:", sockfd);
// }
// 添加连接到服务器
void AddConnection(int sockfd, uint32_t event, func_t recv_cb, func_t send_cb, except_func except_cb, const std::string& ip = "0.0.0.0", uint16_t port = 0)
{
std::shared_ptr<Connection> new_connection(new Connection(sockfd)); // 创建新的连接对象
new_connection->SetWeakPtr(shared_from_this()); // 设置连接对象对服务器的弱引用
new_connection->SetHandler(recv_cb, send_cb, except_cb); // 设置连接的回调函数

// 设置客户端IP和端口
new_connection->_ip = ip;
new_connection->_port = port;

_connections.insert(std::make_pair(sockfd, new_connection)); // 将连接添加到连接映射表中

_epoller_ptr->EpollerUpdate(EPOLL_CTL_ADD, sockfd, event); // 将socket添加到epoll监控列表中

log_(Debug, "成功添加新连接,fd: %d, 客户端IP: %s, 客户端端口: %d", sockfd, ip.c_str(), port);
}

// 链接管理器
// void Accepter(std::shared_ptr<Connection> connection)
// {
// while (true)
// {
// struct sockaddr_in peer;
// socklen_t len = sizeof(peer);

// int sockfd = ::accept(connection->SockFd(), (struct sockaddr*)&peer, &len);
// if (sockfd > 0)
// {
// uint16_t peer_port = ntohs(peer.sin_port);
// char ipbuf[128];
// inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));

// log_(Debug, "收到一个新的客户端连接,得到的消息:[%s:%d], sockfd:%d", ipbuf, peer_port, sockfd);

// SetNonBlockOrDie(sockfd);
// // listensock只需要设置_recv_cb,其他sock,读,写,异常都设置
// // AddConnection(sockfd, EVENT_IN, nullptr, nullptr, nullptr);
// AddConnection(sockfd, EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1), std::bind(&TcpServer::Excepter, this, std::placeholders::_1), ipbuf, peer_port);
// }
// else
// {
// if (errno == EWOULDBLOCK)
// {
// break;
// }
// else if (errno == EINTR)
// {
// continue;
// }
// else
// {
// break;
// }
// }
// }
// }

// 接受新连接的处理函数
void Accepter(std::weak_ptr<Connection> connection)
{
auto connection_ptr = connection.lock(); // 将weak_ptr转换为shared_ptr以安全访问连接对象
if (!connection_ptr) // 检查转换是否成功
{
// log_(Warning, "无法获取连接对象指针,可能已被销毁");
return;
}

while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);

int sockfd = ::accept(connection_ptr->SockFd(), (struct sockaddr*)&peer, &len); // 接受新的客户端连接
if (sockfd > 0)
{
uint16_t peer_port = ntohs(peer.sin_port);
char ipbuf[128];
inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));

// log_(Debug, "收到一个新的客户端连接,得到的消息:[%s:%d], sockfd:%d", ipbuf, peer_port, sockfd);
log_(Info, "收到新的客户端连接,客户端IP: %s, 客户端端口: %d, 连接fd: %d", ipbuf, peer_port, sockfd);

SetNonBlockOrDie(sockfd); // 设置新连接为非阻塞模式

// 为新连接添加到服务器管理中,设置各种回调函数
AddConnection(sockfd, EVENT_IN,
std::bind(&TcpServer::Recver, this, std::placeholders::_1),
std::bind(&TcpServer::Sender, this, std::placeholders::_1),
std::bind(&TcpServer::Excepter, this, std::placeholders::_1),
ipbuf, peer_port);
}
else
{
if (errno == EWOULDBLOCK) // 处理accept返回错误的情况
{
break; // 没有更多连接,退出循环
}
else if (errno == EINTR)
{
continue; // 被信号中断,继续循环
}
else
{
log_(Error, "accept调用失败,错误码: %d, 错误信息: %s", errno, strerror(errno));
break; // 其他错误,退出循环
}
}
}
}

// 事件管理器
// void Recver(std::shared_ptr<Connection> connection)
// {
// // std::cout << "haha, got you!!!, sockfd:" << connection->SockFd() << std::endl;
// int sockfd = connection->SockFd();

// while(true)
// {
// char buffer[g_buffer_size];
// memset(buffer, 0, sizeof(buffer));

// ssize_t n = recv(sockfd, buf, sizeof(buffer) - 1, 0);
// if (n > 0)
// {
// connection->Append(buffer);
// }
// else if(n == 0)
// {
// log_(Debug,"客户端断开连接," ,sockfd, connection->_ip.c_str(),connection->_port);
// connection->excepter(connection);
// }
// else
// {
// if (errno ==EWOULDBLOCK)
// {
// break;
// }
// else if (errno == EINTR)
// {
// continue;
// }
// else
// {
// log_(Warning,"sockfd: %d,客户端断开连接,errno: %d",fd,connection->_ip.c_str(),connection->_port);
// connection->excepter(connection);
// break;
// }
// }
// }

// OnMessage(connection);
// }

// 接收数据的处理函数
void Recver(std::weak_ptr<Connection> connection)
{
if (connection.expired()) // 检查连接对象是否已经过期
{
// log_(Warning, "连接对象已过期,无法处理接收事件");
return;
}

// 将weak_ptr转换为shared_ptr以安全访问连接对象
auto connection_ptr = connection.lock();
int sockfd = connection_ptr->SockFd();

while (true)
{
char buffer[g_buffer_size];
memset(buffer, 0, sizeof(buffer));

ssize_t n = recv(sockfd, buffer, sizeof(buffer) - 1, 0); // 从socket接收数据
if (n > 0)
{
connection_ptr->AppendInBuffer(buffer); // 接收到数据,添加到输入缓冲区
log_(Debug, "从客户端fd: %d 接收到 %ld 字节数据", sockfd, n);
}
else if (n == 0) // 客户端关闭连接
{
// log_(Debug, "sockfd: %d,客户端消息:%s : %d断开连接(退出)", sockfd, connection_ptr->_ip.c_str(), connection_ptr->_port);
log_(Info, "客户端fd: %d (IP: %s, 端口: %d) 主动断开连接", sockfd, connection_ptr->_ip.c_str(), connection_ptr->_port);
connection_ptr->_except_cb(connection_ptr); // 调用异常回调函数处理连接断开
return;
}
else
{
if (errno == EWOULDBLOCK) // 处理接收错误
{
// log_(Debug, "客户端fd: %d 数据接收完毕", sockfd);
break; // 没有更多数据可读,退出循环
}
else if (errno == EINTR) // 被信号中断,继续循环
{
continue;
}
else // 其他错误
{
// log_(Warning, "sockfd: %d,客户端信息:%s : %d接收错误", sockfd, connection_ptr->_ip.c_str(), connection_ptr->_port);
log_(Error, "从客户端fd: %d 接收数据失败,错误码: %d, 错误信息: %s", sockfd, errno, strerror(errno));
connection_ptr->_except_cb(connection_ptr);
return;
}
}
}

_OnMessage(connection_ptr); // 调用上层消息处理函数处理接收到的数据
}

// 发送数据的处理函数
void Sender(std::weak_ptr<Connection> connection)
{
if (connection.expired())
{
// log_(Warning, "连接对象已过期,无法处理发送事件");
return; // 检查连接对象是否已经过期
}

// 将weak_ptr转换为shared_ptr以安全访问连接对象
auto connection_ptr = connection.lock();
auto& outbuffer = connection_ptr->OutBuffer();
while (true)
{
// 发送数据到客户端
ssize_t n = send(connection_ptr->SockFd(), outbuffer.c_str(), outbuffer.size(), 0);
if (n > 0)
{
// log_(Debug, "向客户端fd: %d 成功发送 %ld 字节数据", connection_ptr->SockFd(), n);

// 成功发送部分数据,从输出缓冲区中删除已发送的数据
outbuffer.erase(0, n);
if (outbuffer.empty())
{
// log_(Debug, "客户端fd: %d 输出缓冲区已清空", connection_ptr->SockFd());
break; // 缓冲区清空,退出循环
}
}
else if (n == 0)
{
// log_(Debug, "向客户端fd: %d 发送0字节数据", connection_ptr->SockFd());
return; // 发送0字节,退出
}
else
{
// 处理发送错误
if (errno == EWOULDBLOCK)
{
log_(Debug, "客户端fd: %d socket发送缓冲区已满,等待下次发送", connection_ptr->SockFd());
break; // socket缓冲区满,退出循环
}
else if (errno == EINTR)
{
continue; // 被信号中断,继续循环
}
else // 其他错误
{
// log_(Warning, "sockfd: %d, 客户端消息: %s : %d 发送错误", connection_ptr->SockFd(), connection_ptr->_ip.c_str(), connection_ptr->_port);
log_(Error, "向客户端fd: %d 发送数据失败,错误码: %d, 错误信息: %s", connection_ptr->SockFd(), errno, strerror(errno));
connection_ptr->_except_cb(connection_ptr);
return;
}
}
}
if (!outbuffer.empty())
{
// 开启对写事件的关心
EnableEvent(connection_ptr->SockFd(), true, true);
// log_(Debug, "为客户端fd: %d 启用写事件监控", connection_ptr->SockFd());
}
else
{
// 关闭对写事件的关心
EnableEvent(connection_ptr->SockFd(), true, false);
// log_(Debug, "为客户端fd: %d 禁用写事件监控", connection_ptr->SockFd());
}

}

// 异常处理函数
void Excepter(std::weak_ptr<Connection> connection)
{
if (connection.expired())
{
// log_(Warning, "连接对象已过期,无法处理异常事件");
return; // 检查连接对象是否已经过期
}

auto conn = connection.lock(); // 将weak_ptr转换为shared_ptr以安全访问连接对象
if (!conn) // 检查转换是否成功
{
// log_(Warning, "无法获取连接对象指针,可能已被销毁");
return;
}

int fd = conn->SockFd();
// log_(Warning, "异常处理程序套接字文件描述符: %d, 客户端消息:%s : %d 异常退出", conn->SockFd(), conn->_ip.c_str(), conn->_port);
log_(Warning, "处理异常连接,fd: %d, 客户端IP: %s, 客户端端口: %d", conn->SockFd(), conn->_ip.c_str(), conn->_port);

// 1. 移除对特定fd的关心
// EnableEvent(connection->SockFd(), false, false);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_DEL, fd, 0);
// 2. 关闭异常的文件描述符
log_(Debug, "关闭异常连接的文件描述符: %d", fd);
close(fd);
// 3. 从unordered_map中(连接映射表中)移除
log_(Debug, "从连接管理器中移除异常连接: %d", fd);

_connections.erase(fd);
log_(Info, "异常连接处理完成,fd: %d", fd);
}

// 启用或禁用socket的读写事件监控
void EnableEvent(int sock, bool readable, bool writeable)
{
uint32_t events = 0;

// 根据参数设置事件掩码
events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);
_epoller_ptr->EpollerUpdate(EPOLL_CTL_MOD, sock, events);
// log_(Debug, "更新socket: %d 的事件监控,可读: %s, 可写: %s", sock, readable ? "是" : "否", writeable ? "是" : "否");
}

// 检查指定的socket文件描述符是否在服务器管理中
bool IsConnectionSafe(int sockfd)
{
auto iter = _connections.find(sockfd);
if (iter == _connections.end())
{
// log_(Debug, "检查连接安全状态,fd: %d 不存在于连接管理器中", sockfd);
return false;
}
else
{
// log_(Debug, "检查连接安全状态,fd: %d 存在于连接管理器中", sockfd);
return true;
}
}

// 事件分发器,处理epoll返回的事件
void Dispatcher(int timeout)
{
int n = _epoller_ptr->EpollerWait(revs, num, timeout); // 等待epoll事件

// if (n < 0)
// {
// log_(Error, "epoll_wait调用失败,错误码: %d, 错误信息: %s", errno, strerror(errno));
// return;
// }
// else if (n == 0)
// {
// log_(Debug, "epoll_wait超时,未检测到任何事件");
// return;
// }

// log_(Debug, "epoll_wait返回 %d 个事件", n);


for (int i = 0; i < n; i++)
{
uint32_t events = revs[i].events;
int sockfd = revs[i].data.fd;

// if (evets & EPOLLERR)
// {
// events |= (EPOLLIN | EPOLLOUT);
// }

// if (evets & EPOLLHUP)
// {
// events |= (EPOLLIN | EPOLLOUT);
// }

// 处理读事件
if ((events & EPOLLIN) && IsConnectionSafe(sockfd))
{
if (_connections[sockfd]->_recv_cb)
{
// log_(Debug, "处理读事件,fd: %d", sockfd);
_connections[sockfd]->_recv_cb(_connections[sockfd]); // 调用读回调函数
}
}

// 处理写事件
if ((events & EPOLLOUT) && IsConnectionSafe(sockfd))
{
if (_connections[sockfd]->_send_cb)
{
// log_(Debug, "处理写事件,fd: %d", sockfd);
_connections[sockfd]->_send_cb(_connections[sockfd]); // 调用写回调函数
}
}
}
}

// 服务器主循环
void Loop()
{
_quit = false;
log_(Info, "TCP服务器主循环开始运行,端口: %d", _port);

// AddConnection(_listensock_ptr->Fd(), EVENT_IN, std::bind(&TcpServer::Accept, this, std::placeholders::_1),nullptr,nullptr);

while (!_quit) // 持续处理事件直到服务器退出
{
Dispatcher(3000);
PrintConnection(); // 可选,如果需要打印连接状态可以取消注释
}

log_(Info, "TCP服务器主循环结束");
_quit = true;
}

// 打印当前连接列表(调试用)
void PrintConnection()
{
log_(Debug, "当前连接总数: %zu", _connections.size());
std::cout << "当前连接列表:";
for (auto& connection : _connections)
{
std::cout << connection.second->SockFd() << ", ";
std::cout << "inbuffer: " << connection.second->Inbuffer().c_str();
}
std::cout << std::endl;
}

~TcpServer()
{
log_(Info, "TCP服务器正在关闭,清理资源");
}

private:
std::shared_ptr<Epoller> _epoller_ptr; // epoll对象指针
std::shared_ptr<Sock> _listensock_ptr; // 监听socket对象指针
std::unordered_map<int, std::shared_ptr<Connection>> _connections; // 连接管理映射表
struct epoll_event revs[num]; // epoll事件数组
uint16_t _port; // 服务器端口号
bool _quit; // 退出标志

func_t _OnMessage; // 消息处理函数
};

10. 快速上手 CMake

1. CMake 是干什么的

CMake 是一个跨平台的 自动化构建工具。它的核心作用是:根据 CMakeLists.txt 自动生成 Makefile,然后我们只需执行 make 就能编译整个项目。

简单说:写一份 CMakeLists.txt → 执行 cmake → 自动生成 Makefile → 执行 make → 生成可执行文件。

2. 安装 CMake

1
2
sudo apt update
sudo apt install cmake -y

检查版本:

1
cmake --version

3. 编写最简 CMakeLists.txt

在项目的根目录中创建一个名为 CMakeLists.txt 的文件写入内容:

1
2
3
4
5
cmake_minimum_required(VERSION 3.10)		# 指定最低CMake版本
project(EpollServer) # 工程名

set(CMAKE_CXX_STANDARD 11) # 使用C++11标准
add_executable(EpollServer Main.cc log.cpp) # 生成可执行文件 main

这就是最基本的版本。注意:CMake 严格要求文件名必须是 CMakeLists.txt,大小写都必须完全匹配。 原因很简单:CMake 的解析器在目录下只会自动搜索这个 精确名字 的文件(CMakeLists.txt)。不是变量名,不是模糊匹配,也不会识别 CMakelists.txtcmakelist.txt 等写法。比如:

  • ✅ 正确:CMakeLists.txt
  • ❌ 错误:CMakelists.txt / cmakelists.txt

project(EpollServer):只是一个 工程名字,主要用于 CMake 内部标识,与目录名没强绑定。通常我们会让它与最终生成的可执行文件同名,这样方便。

4. 编译和运行

我们建议在项目根目录执行下面的代码,这样生成的临时文件都在 build 里,不会污染源代码目录:

1
2
mkdir build
cd build

运行 cmake 命令生成 Makefile

1
cmake ..

解释:.. 表示让 CMake 去上一级(也就是项目根目录)找 CMakeLists.txt。执行完这步后,build/ 目录里会生成:

1
2
3
Makefile
CMakeCache.txt
CMakeFiles/

执行 make 编译:

1
make

CMake 会自动调用 g++ 编译 Main.cc 并生成可执行文件:

1
EpollServer

可选:运行程序

1
./EpollServer