056 高级 IO

高级 IO

Linux 高级 IO | CSDN

1. 正确认识 IO

1. IO 的本质

I/O(Input / Output)指的是 CPU 与外设之间的数据交互过程。在冯·诺依曼结构中,系统由:CPU(运算与控制)、内存(暂存数据与指令)、外设(磁盘、网卡、显示器、键盘等)组成。

I/O 就是:数据在「内存 ↔ 外设」之间的传输。所以:

  • 输入(Input):外设 → 内存(例如:键盘输入、磁盘读文件、网卡收包)。
  • 输出(Output):内存 → 外设(例如:屏幕显示、磁盘写文件、网卡发包)。

2. IO 的关键特征

  1. :外设的速度远慢于 CPU 和内存。因此,IO 通常是性能瓶颈。
  2. 异步性:外设工作时 CPU 可去做别的事。操作系统通过 中断、DMA(直接内存访问) 来提高效率。

3. 文件 IO 与 网络 IO

IO 类型外设操作系统抽象本质
文件 IO磁盘文件描述符(fd)把数据从磁盘读入内存或写出
网络 IO网卡套接字(socket)把数据从网卡缓冲区读入内存或写出

对操作系统来说,一切皆文件,无论是磁盘文件、管道、套接字,本质都是「文件描述符 + 缓冲区」上的读写操作。

4. 系统调用层面

C/C++ 层面对 IO 的最底层接口如下:

1
2
3
#include <unistd.h>
ssize_t read(int fd, void *buf, size_t count); // 从 fd 读数据到内存
ssize_t write(int fd, const void *buf, size_t count); // 从内存写数据到 fd
  • 读文件时,数据路径是:磁盘 → 内核缓存 → 用户内存。
  • 读网络时,数据路径是:网卡 → 内核缓冲区 → 用户内存。

应用层 read && write 的时候,本质就是把数据从用户层写给 OS,即“拷贝函数”。此时我们只需要知道:IO = 等 + 拷贝。要进行拷贝,必须先判断读写事件这个条件成立!

什么叫做高效 IO 呢?知道 IO = 等 + 拷贝后,就可以得出结论:单位时间内,IO 的过程中,等的比重越小,IO 效率越高!反之,等的比重越大,IO 效率越低。几乎所有提高 IO 效率的策略,本质就是这个!


2. 五种 IO 模型

1. 小故事入题

  1. 张三是个钓鱼佬,他的钓鱼方式是一根儿鱼竿儿,钓鱼时就一直一动不动地盯着鱼漂,直到有鱼上钩。在这段时间内他不能干别的事,我们管这种方式叫做 阻塞式 IO

    • 对应到计算机上:read() 一调用,就会 阻塞当前线程,操作系统先等数据准备好(等待阶段),数据准备好后,再拷贝到用户空间(拷贝阶段),两步都做完,read() 才返回。特点:简单、直观,但效率最低。CPU 在“等鱼”的时间被浪费掉。
  2. 李四也是钓鱼佬,他也是一根儿鱼竿儿,但他的钓鱼方式是:抛完杆儿,等一会了去喝个茶回来看一看上没上鱼,看会书回来看一看,睡一觉回来看一看,跟张三说完话回来看一看(张三并不想搭理李四),这种方式叫做 非阻塞式 IO,即非阻塞轮询,特点: 线程不会被卡死,但 CPU 不停在空转查询,效率依然不高(多线程或高并发场景中非常浪费 CPU)。

  3. 王五一样,但是他很聪明,他在鱼竿上绑了一个铃铛,期间可以看书、睡觉、打游戏,摸鱼……当铃铛响了说明上鱼了,才去看鱼竿儿,这属于 信号驱动式 IO

    • 系统上:程序先注册一个信号回调函数(sigaction),当内核检测到数据可读时,会发信号通知用户进程,用户进程再调用 read() 把数据拷贝到用户空间。特点:通过信号机制通知事件,比轮询更高效。但信号机制复杂、调度开销大,所以实际使用较少。
  4. 赵六是个有钱人,他直接开了一大卡车的鱼竿儿,机械化钓鱼,就假设他布置好了 1000 根鱼竿儿,他不可能盯着每一根,于是雇了个助手,
    助手负责统一盯着所有鱼竿,一旦某根竿有动静就通知他。此时水里的鱼是一定的,但赵六上鱼的概率远远高于其他人,这就是 多路复用/转接。

    • 这就是 select/poll/epoll 的核心思想。 系统提供一个统一的接口,一次性监听多个文件描述符(鱼竿),当其中任意一个“准备就绪”时再通知进程。特点:可同时管理大量连接(高并发)、赵六只需要等“通知”即可,效率大大提升、是高性能网络服务器(如 Nginx、Redis)最常用的模式。
  5. 田七照样是个有钱人,看到他们在钓鱼,于是也想吃鱼,就准备一起,但是很不巧临时有事要走,他一想:我就想吃鱼,怎么来的不重要,于是就叫随行的司机拿着工具去钓鱼,田七则去办其他的事情,告诉司机等他钓好鱼后直接给他打电话就行。这就是 异步 IO,值得注意的是这里的司机其实就是 OS

    • 系统中:用户调用 aio_read() 之后立即返回,操作系统后台完成“数据准备 + 拷贝”两个阶段,完成后通过事件或回调通知应用。特点:真正的 非阻塞 + 无等待,CPU 可以同时干别的工作,适用于 I/O 密集型高并发场景。

我们知道 IO = 等 + 拷贝,所有 IO 模型的区别,都在于这两个阶段是 谁在做怎么做。在这个钓鱼的过程中只有田七是异步 IO,其他人都属于同步 IO,因为其他人都直接参与了等的环节,但田七则是间接做了等的操作,实际不参与 IO,田七只是发起 IO,最后拿结果就行,就像我交给你一个黑盒,我不管你干了什么,到时候我从黑盒中拿结果就行。还有一点就是同步 IO 是/属于线程同步吗?答案是老婆和老婆饼的关系 —— 没有关系!

2. 深入理解 IO = 等 + 拷贝 的思维模型

无论哪种 IO,本质都要经历两个阶段:

  1. 等待数据准备好(等待鱼上钩)。
  2. 数据从内核缓冲区拷贝到用户空间(把鱼拽上来)。

区别在于:谁来“等”(线程自己、统一管理者、还是操作系统),什么时候返回(等完再返回,还是先返回之后回调)。

模型类型等待阶段拷贝阶段是否阻塞谁来等典型调用适用场景
阻塞 I/O阻塞阻塞应用线程read()简单场景
非阻塞 I/O轮询阻塞否(轮询)应用线程fcntl(fd, O_NONBLOCK)少量连接
信号驱动 I/O信号触发阻塞内核发信号sigaction()特殊场合
I/O 多路复用阻塞等待事件阻塞拷贝部分阻塞统一等待者(select/poll/epoll)高并发服务器高性能
异步 I/O非阻塞非阻塞全交给内核aio_read()真正异步场景

3. fcntl 函数原型

1. 功能 / 作用

fcntl() —— 它是 Linux 文件描述符控制的“瑞士军刀”,非常常见于网络编程和 I/O 模型设置中,比如设置非阻塞套接字。fcntl()(file control)用于 对已打开的文件描述符进行各种控制操作。几乎所有“修改文件描述符行为”的操作都要通过它完成,例如:设置 / 清除文件描述符的标志(如非阻塞模式)、获取 / 修改文件状态、复制文件描述符、锁定文件、调整文件特性等。

2. 函数原型

1
2
3
4
#include <fcntl.h>
#include <unistd.h>

int fcntl(int fd, int cmd, ... /* arg */);

3. 参数详解

参数位置含义
第 1 个参数:fd要操作哪个文件描述符(文件、socket 等)
第 2 个参数:cmd要执行的命令/控制操作(告诉内核你想干什么)
第 3 个参数:arg(可选)如果需要,就传入新的标志值(常用“旧标志 | 新功能”)

1. 第二个参数(cmd)—— 要执行的操作类型

命令(cmd)含义第三个参数(arg)常用场景
F_GETFL获取文件状态标志读取当前 fd 的打开模式(只读、非阻塞等)
F_SETFL设置文件状态标志新标志(通常是“旧标志 | 新标志”)修改 fd 行为,如设为非阻塞
F_GETFD获取文件描述符标志判断 fd 是否带有 FD_CLOEXEC
F_SETFD设置文件描述符标志FD_CLOEXEC设置执行 exec() 时是否自动关闭 fd
F_DUPFD复制一个新的文件描述符最小可用 fd 编号类似 dup(),但可指定起始编号
F_SETLK设置文件锁(非阻塞)struct flock *文件加锁,不会阻塞
F_SETLKW设置文件锁(阻塞)struct flock *文件加锁,会阻塞等待
F_GETLK获取文件锁状态struct flock *查询当前文件锁

实际开发中 最常用的就是 F_GETFLF_SETFL,尤其用于:设置非阻塞 IO(O_NONBLOCK)、设置追加写(O_APPEND)。

2. 第三个参数(arg)—— 状态标志

标志含义常见场景
O_RDONLY只读打开打开文件用
O_WRONLY只写打开打开文件用
O_RDWR可读可写打开文件用
O_APPEND写操作追加到文件末尾日志文件
O_NONBLOCK非阻塞模式网络 I/O
O_SYNC同步写入(每次写都落盘)文件系统安全要求高的场景
O_ASYNC异步 I/O 模式很少单独使用
O_CREAT不存在则创建文件打开
O_TRUNC打开时清空文件内容文件重写
FD_CLOEXEC执行 exec() 时自动关闭 fd防止文件描述符泄漏

3. 快速记忆法

操作类型缩写记忆用途
F_GETFL / F_SETFLFL = File status Flags控制读写行为
F_GETFD / F_SETFDFD = File Descriptor控制描述符自身属性
O_NONBLOCKNon-blocking设置非阻塞
FD_CLOEXECClose on exec执行新程序时关闭 fd

4. 返回值

  • 成功: 返回值依赖于 cmd,一般为 非负数
  • 失败: 返回 -1,并设置 errno

5. 示例:设置非阻塞套接字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <unistd.h>
#include <errno.h>
#include <cstring>
#include <fcntl.h>
using namespace std;

void setNoBlock(int fd)
{
int fl = fcntl(fd, F_GETFL); // 获取原来文件描述符的属性
if(fl < 0)
{
perror("fcntl");
return;
}

fcntl(fd, F_GETFL, fl | O_NONBLOCK); // 在原标志基础上加上非阻塞
cout << "设置 " << fd << " 为非阻塞模式成功!" << endl;
}

int main()
{
char buffer[1024];

setNoBlock(0);

while(true)
{
// printf("please enter: ");
// fflush(stdout);

ssize_t n = read(0, buffer, sizeof(buffer) - 1);
if(n > 0)
{
buffer[n - 1] = '\0';
cout << "echo: " << buffer << endl;
}
else if(n == 0)
{
cout << "read EOF/read done!" << endl;
}
else
{
cerr << "read error, n = " << n << "error code: " << errno << "error string: " << strerror(errno) << endl;
}
}

return 0;
}

设置成为非阻塞,如果底层 fd 数据没有就绪,recv/read/write/send, 返回值会以出错的形式返回,错误形式的两种情况:a. 真的出错,b. 底层没有就绪。怎么区分呢?需要通过 errno 区分!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <fcntl.h>
#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
using namespace std;

int main()
{
int sockfd = socket(AF_INET, SOCK_STREAM, 0); // 创建 TCP 套接字
if (sockfd < 0)
{
perror("socket error");
return -1;
}

// 获取当前文件状态标志
int flags = fcntl(sockfd, F_GETFL, 0);
if (flags < 0)
{
perror("fcntl F_GETFL error");
return -1;
}

// 设置非阻塞模式(加上 O_NONBLOCK)
if (fcntl(sockfd, F_SETFL, flags | O_NONBLOCK) < 0)
{
perror("fcntl F_SETFL error");
return -1;
}

cout << "套接字已设置为非阻塞模式" << endl;

close(sockfd);
return 0;
}

4. IO 多路转接之 select 函数原型

IO 多路转接 ——— select、poll、epoll | CSDN

1. 作用

select 是最早的 I/O 多路复用函数,用来 同时监控多个文件描述符(fd)是否可读、可写或有异常事件

2. 函数原型

1
2
3
4
5
6
7
8
#include <sys/select.h>
#include <sys/time.h>

int select(int nfds,
fd_set *readfds,
fd_set *writefds,
fd_set *exceptfds,
struct timeval *timeout);

3. 参数解释

参数名含义
nfds监控的最大文件描述符 + 1(例如要监控 fd=5,就写 nfds=6
readfds想监控“可读事件”的文件描述符集合(如 recv()read() 是否不阻塞)
writefds想监控“可写事件”的文件描述符集合(如 send()write() 是否不阻塞)
exceptfds监控“异常事件”的文件描述符集合(一般不用,传 NULL
timeout超时时间(阻塞多久)。传 NULL 就是 一直阻塞

1. nfds 参数

nfds = 要监控的最高文件描述符编号 + 1

  • 想监控 fd 3, fd 5?最高是 5,nfds 就写 5 + 1 = 6
  • 想监控 fd 1, fd 2, fd 100?最高是 100,nfds 就是 100 + 1 = 101
  • 不是系统最大支持的文件描述符大小! 那个值通常很大(如 1024 或 65536),不需要监控那么多。

记住:nfds 是一个范围select 会检查从 0 到 nfds - 1 这个范围内的所有 fd(只要在 fd_set 里设置了它们)。nfds 就是这个范围的 上限(不包含)。简化记忆nfds = max_fd + 1select 会在这个范围 [0, max_fd] 内,看你 fd_set 里标记了哪些 fd,然后监控它们。

2. fd_set 的本质

fd_set 是内核与用户空间之间传递“文件描述符就绪信息”的一张位图。

阶段谁在操作含义
输入阶段(用户 → 内核)用户调用 select()告诉内核:“我关心这些 fd(读、写、异常)。”
输出阶段(内核 → 用户)内核执行完 I/O 检查后告诉用户:“这些 fd 已经就绪(比特位 = 1)。”

也就是说:readfds 是输入输出双向参数(既是输入,也会被输出修改),每一位对应一个 fd1 表示就绪;0 表示未就绪。调用后必须重新设置 fd_set,因为 select 会修改它。fd_set 的内部形式:

1
2
3
4
typedef struct
{
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];
} fd_set;

本质就是一个 位图,每个 bit 对应一个文件描述符号位(fd 号),比如:fd=3 -> fds_bits[0] 的第 3 位。select() 的核心:用户传一张“关注表”(fd_set)给内核,内核帮你看这些 fd 是否有事件,等结果出来后,内核再把“结果表”写回给你。

3. 常用辅助宏

1
2
3
4
FD_CLR(int fd, fd_set* set);		// 把fd从集合中移除(对应 bit = 0)
FD_ISSET(int fd, fd_set* set); // 判断fd是否在集合中就绪(bit==1 返回 true)
FD_SET(int fd, fd_set* set); // 把fd加入集合(设置对应 bit = 1)
FD_ZERO(fd_set* set); // 清空集合(所有 bit 置 0)

一般是这么操作的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
fd_set readfds;          // 定义一个fd集合
FD_ZERO(&readfds); // 清空集合
FD_SET(sockfd, &readfds); // 把sockfd加入关注集合
FD_SET(STDIN_FILENO, &readfds); // 把标准输入加入集合

// 调用select,监听读事件
int ret = select(maxfd + 1, &readfds, NULL, NULL, &timeout);

if (FD_ISSET(sockfd, &readfds))
{
// sockfd 可读
}
if (FD_ISSET(STDIN_FILENO, &readfds))
{
// 标准输入可读
}

调用后,readfds 会被内核修改,表示哪些 fd 已经 就绪

4. struct timeval 的含义

timeout 就是给 select 设置一个 最长等待时间闹钟。如果在闹钟响之前有事(fd 就绪),就立即叫醒你(返回);如果闹钟响了还没事(fd 未就绪),也叫醒你(返回 0)。如果设置为永不响(NULL),就会一直睡(阻塞);如果设置为立刻响({0, 0}),就不会睡(非阻塞)。

1
2
3
4
5
struct timeval
{
time_t tv_sec; // Seconds —— 秒
suseconds_t tv_usec; // Microseconds —— 微秒(1秒=1,000,000微秒)
};

{seconds, microseconds}: select 最多等待 seconds 秒 + microseconds 微秒。

  • 如果在 这个时间之内,有你监控的 fd 就绪了,select立即返回,告诉你哪些 fd 就绪了,剩余的等待时间会被写回到 timeout 结构体中(通常在实际编程中,会每次都重新设置这个值)。
  • 如果 这个时间之内,没有任何你监控的 fd 就绪,select超时返回(返回值为 0)。

示例:

  • NULL → 永久阻塞(传统阻塞式 I/O)。
  • {5,0} → 最多阻塞 5 秒(带超时的阻塞)。
  • {0,0} → 不阻塞,非阻塞轮询式 I/O

4. 返回值

  • > 0:有多少个文件描述符就绪(可读 / 可写 / 异常)。
  • = 0:超时,没有任何事件发生,没有错误,但是也没有 fd 就绪。
  • < 0:出错(一般是信号中断或参数错误)。

5. 使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <iostream>
#include <sys/select.h>
#include <unistd.h>
using namespace std;

int main()
{
fd_set readfds; // 创建fd集合
FD_ZERO(&readfds); // 清空集合
FD_SET(STDIN_FILENO, &readfds); // 把标准输入加入监控集合

struct timeval tv;
tv.tv_sec = 5; // 最多阻塞5秒
tv.tv_usec = 0;

cout << "等待输入中..." << endl;
int ret = select(STDIN_FILENO + 1, &readfds, NULL, NULL, &tv);

if (ret > 0)
{
if (FD_ISSET(STDIN_FILENO, &readfds))
{
cout << "检测到输入:" << endl;
char buf[1024];
read(STDIN_FILENO, buf, sizeof(buf));
cout << "你输入了: " << buf;
}
}
else if (ret == 0)
{
cout << "超时,没有输入" << endl;
}
else
{
perror("select 出错");
}

return 0;
}

一个基于 select 系统调用的单进程、单线程 TCP 服务器:

完整代码请前往 GitHub 进行查看!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
// #include "Log.hpp"
using namespace std;

static const uint16_t defaultport = 8080; // 默认端口号8080
static const int fd_num_max = (sizeof(fd_set) * 8); // 计算fd_set能容纳的最大文件描述符数量,每个bit位代表一个fd
int defaultfd = -1; // 无效文件描述符,用来标识数组中空闲的位置

class SelectServer
{
public:
SelectServer(uint16_t port = defaultport)
: _port(port)
{
for(int i = 0; i < fd_num_max; i++)
{
fd_array[i] = defaultfd; // 将数组中所有位置都初始化为-1,表示空闲
// std::cout << "fd_array[" << i << "]" << " : " << fd_array[i] << std::endl;
}
}

~SelectServer()
{
_listensock.Close(); // 关闭监听socket
}

// 初始化服务器:创建socket、绑定端口、开始监听
bool Init()
{
_listensock.Socket(); // 创建监听socket
_listensock.Bind(_port); // 绑定端口
_listensock.Listen(); // 开始监听

return true;
}

// 打印当前在线的文件描述符列表
void PrintFd()
{
cout << "在线_fd_列表: "; // 输出提示信息
for (int i = 0; i < fd_num_max; i++) // 遍历fd_array数组
{
if (fd_array[i] == defaultfd) // 如果是空闲位置,跳过
{
continue;
}

cout << fd_array[i] << " "; // 输出有效的文件描述符
}

cout << endl;
}

void Accepter()
{
// 连接事件已经就绪
std::string client_ip; // 客户端的IP
uint16_t client_port; // 客户端的端口

int sock = _listensock.Accept(&client_ip, &client_port); // 从监听socket获取新的连接,不会阻塞,因为select已经告诉我们有连接事件就绪
if (sock < 0)
{
return; // 如果accept失败,直接返回
}

log_(Info, "新连接建立成功:客户端IP:%s,客户端端口:%d,socket fd:%d", client_ip.c_str(), client_port, sock);
cout << "Info, 新连接建立成功:客户端IP:" << client_ip << ",客户端端口:" << client_port << ",socket fd:" << sock << endl;

// 将新连接的文件描述符存入fd_array数组中
int pos = 1; // 从数组索引1开始找空闲位置(索引0存放监听socket)
for (; pos < fd_num_max; pos++) // 查找数组中空闲的位置
{
if (fd_array[pos] != defaultfd) // 如果当前位置不为空(-1),继续找下一个
{
continue;
}
else // 找到空闲位置,跳出循环
{
break;
}
}

if (pos == fd_num_max) // 如果遍历完整个数组都没找到空闲位置
{
log_(Warning, "服务器已满,立即关闭%d!", sock); // 服务器已满,关闭新连接
cout << "Warning, 服务器已满,立即关闭" << sock << "!" << endl;
close(sock);
}
else // 找到了空闲位置
{
fd_array[pos] = sock; // 将新连接的fd放入数组
PrintFd(); // 打印当前在线的fd列表

// 后续可能还有其他处理
}
}

// 处理数据接收的函数
void Recver(int fd, int pos) // fd是需要读取数据的文件描述符,pos是该fd在数组中的位置
{
// demo,接收客户端数据
char buffer[1024]; // 临时缓冲区,用于存储接收的数据
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // 从指定fd读取数据到缓冲区,预留1字节给字符串结束符
if (n > 0) // 读取到数据
{
buffer[n] = 0; // 手动添加字符串结束符
cout << "收到一条消息:" << buffer << endl; // 输出接收到的消息
}
else if (n == 0) // 客户端断开连接(读到文件结束符)
{
log_(Info, "客户端退出了,关闭连接的文件描述符是%d", fd); // 记录客户端断开日志
close(fd); // 关闭该连接的文件描述符
fd_array[pos] = defaultfd; // 将数组中对应位置重置为-1,表示该位置空闲(这里本质是从select中移除该fd)
}
else // 读取错误
{
log_(Warning, "接收文件错误,描述符是: %d", fd); // 记录接收错误日志
close(fd); // 关闭该连接的文件描述符
fd_array[pos] = defaultfd; // 将数组中对应位置重置为-1,表示该位置空闲(这里本质是从select中移除该fd)
}
}

// 事件分发函数:根据select返回的结果,处理就绪的文件描述符
void Dispatcher(fd_set& rfds) // rfds是select调用后被修改的fd_set,其中包含了就绪的fd
{
for (int i = 0; i < fd_num_max; i++)
{
int fd = fd_array[i]; // 获取数组中第i个位置的fd
if (fd == defaultfd) // 如果是无效fd(-1),跳过
{
continue;
}

// 检查这个fd是否在select返回的就绪fd集合中
if(FD_ISSET(fd, &rfds)) // FD_ISSET宏用于检查fd是否在rfds集合中
{
if(fd == _listensock.Fd()) // 如果就绪的fd是监听socket
{
Accepter(); // 说明有新的连接请求,调用Accepter处理
}
else // 如果就绪的fd不是监听socket,而是普通的数据连接socket
{
Recver(fd, i); // 调用Recver处理数据接收
}
}
}
}

// 服务器主循环:使用select实现IO多路复用
void Start()
{
int listensock = _listensock.Fd(); // 获取监听socket的文件描述符
fd_array[0] = listensock; // 将监听socket放在数组第0个位置
// 程序启动时,只有数组第0个位置有值,是监听socket的fd,但是,程序运行过程中,Accepter 会向数组中其他位置添加新的连接fd!

for (;;) // 无限循环,服务器持续运行
{
fd_set readfds; // 定义一个fd_set变量,用于存储待检测的读就绪文件描述符集合
FD_ZERO(&readfds); // 清空readfds集合,将所有位都设置为0
// 每次进入循环,都要重新构建readfds集合,因为select会修改它。

int maxfds = fd_array[0]; // 记录当前最大的文件描述符值,用于select调用
for (size_t i = 0; i < fd_num_max;i++) // 第一次循环,遍历fd_array数组,准备select的参数
{
if (fd_array[i] == defaultfd) // 如果数组中该位置是空闲的(-1),跳过
{
continue;
// 这个 continue 非常有用!如果不跳过,fd_array[i] 是 -1,FD_SET(-1, &readfds) 会出错或行为未定义。
// 并且,如果当前只有监听socket,数组后面的位置都是-1,这个循环会遍历所有位置,
// 但由于 continue,只有有效的fd(比如监听socket)会被处理。
}

FD_SET(fd_array[i], &readfds); // 将有效的fd添加到readfds集合中,让select监控这些fd的读就绪状态

if(fd_array[i] > maxfds)
{
maxfds = fd_array[i]; // 更新最大fd值
log_(Info,"最大文件描述符更新,最大文件描述符为:%d", maxfds);
std::cout << "最大文件描述符更新,最大文件描述符为:" << maxfds << std::endl;
}
}

struct timeval timeout = {5, 0};

// 调用select系统调用,监控maxfds+1个文件描述符(0到maxfds),readfds是读就绪集合,nullptr表示不监控写和异常
int n = select(maxfds + 1, &readfds, NULL, NULL, &timeout); // 最后一个参数为nullptr表示不超时,阻塞等待
// select 现在监控的是 0 到 maxfds 范围内所有在 readfds 位图中被置位的 fd。
// 如果只有监听socket,它只监控监听socket。如果有多个连接,它会监控监听socket 和 所有已连接的socket。

// 根据select返回值进行处理
switch(n)
{
// select超时处理
case 0:
cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << endl;
break;
// select调用出错处理
case -1:
cout << "select error, errno:" << errno << endl;
break;
// select成功处理
default:
cout << "得到了一个新的连接请求!" << endl;
Dispatcher(readfds); // 调用Dispatcher处理所有就绪的事件
break;
}
}
}


private:
Sock _listensock; // 监听socket对象
uint16_t _port; // 服务器端口号
int fd_array[fd_num_max]; // 文件描述符数组,用于维护当前所有有效的连接fd(包括监听fd)
// int wfd_array[fd_num_max]; // 扩展写事件处理
};

测试连接:telnet 127.0.0.1 8080,尝试输入一些内容…… 初学时,这段代码要理解其实有一定难度,让我们详细梳理一下:

  1. 初始状态

    • listensock = 3,监听 socket 的 fd,0、1、2 默认被占用就不解释了。
    • fd_array[0] = 3
    • fd_array[1]fd_array[fd_num_max-1] 都是 -1
  2. 第一次 Start 循环

    • fd_set readfds 被清空。
    • 遍历 fd_array,只有 fd_array[0] (值为 3) 有效,所以 FD_SET(3, &readfds)maxfds = 3
    • select(4, &readfds, ...) 被调用,监控 fd 3。
    • 假设现在 客户端 A 连接服务器。
    • select 发现 fd 3 (监听 socket) 可读(有新连接),返回值 > 0。
    • 进入 Dispatcher(readfds)
    • Dispatcher 遍历 fd_array
    • i=0: fd = fd_array[0] = 3FD_ISSET(3, &readfds) 为真。fd (3) == _listensock.Fd() 为真。调用 Accepter()
  3. Accepter 执行

    • accept 被调用,获取 客户端 A 的连接。
    • 假设此时进程内最小可用 fd 是 4(因为 0,1,2 被占用,3 是监听 socket),所以 accept 返回 sock = 4
    • Accepter 开始查找 fd_array 中的空闲位置:pos = 1: fd_array[1] = -1 (defaultfd)。找到空闲位置,break
    • pos 不等于 fd_num_max,所以执行 else 分支:
      • fd_array[1] = 4 (将客户端 A 的 fd 存入数组)。
      • PrintFd()
  4. 第二次 Start 循环

    • fd_set readfds 被清空。
    • 遍历 fd_array
      • i=0: fd_array[0] = 3 (监听 socket)。有效,FD_SET(3, &readfds)maxfds 更新为 3。
      • i=1: fd_array[1] = 4 (客户端 A 的 socket)。有效,FD_SET(4, &readfds)maxfds 更新为 4。
      • i=2fd_num_max-1: fd_array[i] = -1。跳过。
    • select(5, &readfds, ...) 被调用,现在监控 fd 3 和 fd 4
    • 假设现在 客户端 A 发送了一条消息 “Hello”。
    • select 发现 fd 4 (客户端 A 的 socket) 可读(有数据到达),返回值 > 0。
    • 进入 Dispatcher(readfds)
    • Dispatcher 遍历 fd_array
      • i=0: fd = fd_array[0] = 3。检查 FD_ISSET(3, &readfds)。因为这次是 fd 4 就绪,fd 3 没就绪,所以 FD_ISSET(3, &readfds) 为假。跳过
      • i=1: fd = fd_array[1] = 4。检查 FD_ISSET(4, &readfds)。因为这次是 fd 4 就绪,所以 FD_ISSET(4, &readfds) 为真。fd (4) != _listensock.Fd() 为真。进入 else 分支
      • 调用 Recver(4, 1)Recver 被成功调用了!
  5. Recver 执行

    • Recver 从 fd 4 (客户端 A 的 socket) 读取数据 “Hello”,并处理它(例如打印)。

关键点:

  • accept 返回的 不是 监听 socket 的 fd。它返回的是一个 全新的、代表新客户端连接的 socket 的 fd。这个新 fd 会被存储到 fd_array 中。
  • Start 循环中的 for 循环会 持续更新 select 监控的 fd 集合 (readfds)。每次循环,它都会将 fd_array 中所有非 -1 的 fd(包括监听 socket 和所有已连接的客户端 socket)都添加到监控集合中。
  • Dispatcher 循环会 持续检查 fd_array 中所有非 -1 的 fd,看它们是否在 select 返回的就绪集合 (readfds) 中。
  • 如果就绪的 fd 是 监听 socket 的 fd,说明有新连接,走 Accepter
  • 如果就绪的 fd 是 fd_array 中某个已存储的客户端 socket 的 fd(即 fd != _listensock.Fd()),说明这个特定客户端发送了数据,走 Recver

Accepter 中查找空闲位置的意义:

  • fd_array 是服务器用来 追踪和管理 所有当前连接(包括监听 socket)的 本地数据结构
  • accept 返回的 fd 必须被 记录 下来,否则服务器就无法知道有哪些客户端连接,也无法监控它们。
  • 查找空闲位置并存入 fd_array,是为了让 StartDispatcher 能够 知道 这个新连接的存在,并将其加入到 select 的监控范围和事件分发范围中。
  • if (pos == fd_num_max) 检查是为了防止 fd_array 被填满,实现连接数限制。

注意: Recver 非常重要且会被使用。每当任何一个已连接的客户端发送数据时,select 就会检测到该客户端对应的 fd 就绪,Dispatcher 就会找到该 fd 在 fd_array 中的位置,并调用 Recver 来处理该客户端的数据。fd_arrayAccepter 的查找逻辑是实现多客户端管理的关键。

6. 快速上手 select 的编写步骤(重要)

想象你是一个 服务员,要同时服务多个客户(文件描述符)。

  1. 准备工具:创建一个 fd_set(想象成一个 点名册),清空它 (FD_ZERO)。
  2. 记录客户:把你需要服务的客户(文件描述符)一个个记到点名册上 (FD_SET)。比如客户 A (fd = 3),客户 B (fd = 5)。
  3. 确定范围:看看你记下的客户里,编号最大的是谁?比如是客户 B (fd = 5)。nfds 就是这个最大编号 + 1,也就是 6。告诉老板(操作系统):”我要服务编号 0 到 5 的客户”。
  4. 开始观察:调用 select(6, &点名册, NULL, NULL, NULL)。你开始观察点名册上记录的客户,等待他们有需要(比如按铃表示可读)。
  5. 响应需求select 告诉你哪些客户按铃了。你再次查看点名册(FD_ISSET),看具体是哪个客户按的。然后去服务这个客户(比如读取数据)。
  6. 循环往复:回到第 1 步,继续准备、记录、观察、响应。

编写步骤:

  1. 初始化:创建监听 socket,绑定,监听。
  2. 准备容器:用一个数组(如 fd_array)或链表存储所有要监控的 fd。
  3. 主循环 (Start)
    • 清空 fd_set
    • 遍历你的容器(fd_array),将所有有效 fd (fd != -1) 添加到 fd_set (FD_SET)。
    • 同时记录这些 fd 中的 最大值 (maxfd)。
    • 调用 select(maxfd + 1, &fd_set, ...).
  4. 事件分发 (Dispatcher)
    • 再次遍历你的容器(fd_array)。
    • 对于每个有效 fd,用 FD_ISSET(fd, &fd_set) 检查它是否在 select 返回的就绪集合中。
    • 如果是监听 fd 就绪,调用 Accepter
    • 如果是普通连接 fd 就绪,调用 Recver 或相应的处理函数。
  5. 处理连接 (Accepter)accept 得到新 fd,将其添加到你的容器(fd_array)中。
  6. 处理数据 (Recver):读取数据,处理。如果连接断开,从你的容器(fd_array)中移除该 fd(设置为 -1)。

核心思想:用一个 容器(数组/链表)管理所有 fd,用 select 监控 这个容器里的所有 fd,用 select 的返回结果 分发 事件给相应的处理函数。

7. select 优缺点总结

优点:

  1. 可同时等待多个文件描述符,提高 IO 利用率。
  2. “等待”与“操作”分离,IO 操作本身不会被阻塞。
  3. 实现简单,兼容性好,几乎所有平台都支持。

缺点:

  1. fd 数量有限(通常上限 1024)。
  2. 每次调用都要重置 fd 集合,使用繁琐。
  3. 用户态到内核态频繁拷贝,开销大。
  4. 内核遍历所有 fd 检查状态,效率低。
  5. 用户态也需维护 fd 集合,多次遍历,复杂度高。

select 能多路等待,但机制老、开销大、扩展性差,设计比较久远,具有局限性,对初学者也并不友好,于是就有了 Poll 的多路转接方案poll 是对 select 的改进版,解决了它的一些局限。

5. poll 的函数原型

1. 作用

poll = “用数组替代位图的 select”,功能一样但更方便、无 fd 上限。

poll 是比 select 新的 I/O 多路复用函数,用来 同时监控多个文件描述符的可读、可写或异常事件。功能和 select 一样,但使用方式更简单、限制更少。

2. 函数原型

1
2
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

3. 参数解释

参数名含义select 对比
fds一个 pollfd 结构体数组,每个元素描述一个要监控的 fd类似于 readfds / writefds / exceptfds 三个集合的合并版
nfds数组中有多少个元素,即可以传入多少个文件描述符等价于 selectnfds(监控的 fd 数量)
timeout超时/最长等待时间(毫秒)功能同 selecttimeout,只是单位不同(毫秒)

1. struct pollfd 结构体

1
2
3
4
5
6
struct pollfd
{
int fd; // 要监控的文件描述符
short events; // 想监控哪些事件(输入参数)
short revents; // 实际发生的事件(输出参数,由内核填充)
};

2. 事件标志(eventsrevents 的取值)

事件描述是否可作为输入是否可作为输出
POLLIN数据(包括普通数据和优先数据)可读
POLLRDNORM普通数据可读
POLLRDBAND优先级带数据可读(Linux 不支持)
POLLPRI高优先级数据可读,比如 TCP 带外数据
POLLOUT数据(包括普通数据和优先数据)可写
POLLWRNORM普通数据可写
POLLWRBAND优先级带数据可写
POLLRDHUPTCP 连接被对方关闭,或者对方关闭了写操作,它由 GNU 引入
POLLERR错误
POLLHUP挂起。比如管道的写端被关闭后,读端描述符上将收到 POLLHUP 事件
POLLNVAL文件描述符没有打开
常用的事件标志含义
POLLIN可读(读缓冲区有数据)
POLLOUT可写(写缓冲区可用)
POLLERR错误(比如连接异常)
POLLHUP对端关闭连接(挂断)
POLLNVAL无效的 fd

3. 超时/最长等待时间(timeout)含义

取值含义
>0最多等待指定毫秒数(超时返回 0)
0立即返回(非阻塞轮询)
-1一直阻塞(直到有事件)

4. 返回值

  • >0 有多少个 fd 就绪。
  • =0 超时。
  • <0 出错。

5. 使用示例

完整代码请前往 GitHub 查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include "Socket.hpp"
// #include "Log.hpp"
using namespace std;

static const uint16_t defaultport = 8080; // 默认端口号8080
static const int fd_num_max = 64; // poll数组的最大容量,最多同时监听64个文件描述符
int defaultfd = -1; // 无效文件描述符,用来标识数组中空闲的位置
int no_event = 0;

class PollServer
{
public:
PollServer(uint16_t port = defaultport)
: _port(port)
{
for (int i = 0; i < fd_num_max; i++)
{
_event_fds[i].fd = defaultfd; // 初始化每个位置的fd为-1(表示未使用)
_event_fds[i].events = no_event; // 初始化每个位置的监听事件为0(不监听任何事件)
_event_fds[i].revents = no_event; // 初始化每个位置的返回事件为0(无事件发生)

// std::cout << "fd_array[" << i << "]" << " : " << fd_array[i] << std::endl;
}
}

~PollServer()
{
_listensock.Close(); // 关闭监听socket
}

// 初始化服务器:创建socket、绑定端口、开始监听
bool Init()
{
_listensock.Socket(); // 创建监听socket
_listensock.Bind(_port); // 绑定端口
_listensock.Listen(); // 开始监听

return true;
}

// 打印当前在线的文件描述符列表
void PrintFd()
{
cout << "在线_fd_列表: "; // 输出提示信息
for (int i = 0; i < fd_num_max; i++) // 遍历_event_fds数组
{
if (_event_fds[i].fd == defaultfd) // 如果是空闲位置(fd为-1),跳过
{
continue;
}

cout << _event_fds[i].fd << " "; // 输出有效的文件描述符
}

cout << endl;
}

void Accepter()
{
// 有新的客户端连接请求到来
std::string client_ip; // 客户端的IP
uint16_t client_port; // 客户端的端口

int sock = _listensock.Accept(&client_ip, &client_port); // 接收新的客户端连接
if (sock < 0)
{
return; // 如果accept失败,直接返回
}

log_(Info, "新连接建立成功:客户端IP:%s,客户端端口:%d,socket fd:%d", client_ip.c_str(), client_port, sock);
cout << "Info, 新连接建立成功:客户端IP:" << client_ip << ",客户端端口:" << client_port << ",socket fd:" << sock << endl;

// 将新连接的文件描述符添加到poll数组中
int pos = 1; // 从数组索引1开始找空闲位置(索引0存放监听socket)
for (; pos < fd_num_max; pos++) // 查找数组中空闲的位置
{
if (_event_fds[pos].fd != defaultfd) // 如果当前位置已被占用(fd不为-1),继续找下一个
{
continue;
}
else // 找到空闲位置,跳出循环
{
break;
}
}

if (pos == fd_num_max) // 如果遍历完整个数组都没找到空闲位置
{
log_(Warning, "服务器已满,立即关闭%d!", sock); // 服务器连接数已达上限,关闭新连接
cout << "Warning, 服务器已满,立即关闭" << sock << "!" << endl;
close(sock);
}
else // 找到了空闲位置
{
_event_fds[pos].fd = sock; // 将新连接的fd放入数组
_event_fds[pos].events = POLLIN; // 设置该fd的监听事件为POLLIN(监听数据可读)
_event_fds[pos].revents = no_event; // 清空返回事件

PrintFd(); // 打印当前在线的fd列表

// 后续可能还有其他处理
}
}

// 处理数据接收的函数
void Recver(int fd, int pos) // fd是需要读取数据的文件描述符,pos是该fd在数组中的位置
{
// 接收客户端发送的数据
char buffer[1024]; // 临时缓冲区,用于存储接收的数据
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // 从指定fd读取数据到缓冲区,预留1字节给字符串结束符
if (n > 0) // 读取到数据
{
buffer[n] = '\0'; // 手动添加字符串结束符
cout << "收到一条消息:" << buffer << endl; // 输出接收到的消息
}
else if (n == 0) // 客户端断开连接(读到文件结束符)
{
log_(Info, "客户端退出了,关闭连接的文件描述符是%d", fd); // 记录客户端断开日志
close(fd); // 关闭该连接的文件描述符
_event_fds[pos].fd = defaultfd; // 将数组中对应位置重置为-1,表示该位置空闲(从poll监听列表中移除该fd)
}
else // 读取错误
{
log_(Warning, "接收文件错误,描述符是: %d", fd); // 记录接收错误日志
close(fd); // 关闭该连接的文件描述符
_event_fds[pos].fd = defaultfd; // 将数组中对应位置重置为-1,表示该位置空闲(从poll监听列表中移除该fd)
}
}

// 事件分发函数:处理所有就绪的事件
void Dispatcher() // 遍历poll数组,检查哪些fd的事件已经就绪
{
for (int i = 0; i < fd_num_max; i++)
{
int fd = _event_fds[i].fd; // 获取数组中第i个位置的fd
if (fd == defaultfd) // 如果是无效fd(-1),跳过
{
continue;
}

if (_event_fds[i].revents & POLLIN) // 如果这个fd的POLLIN事件已经就绪(有数据可读)
{
if (fd == _listensock.Fd()) // 如果就绪的fd是监听socket
{
Accepter(); // 说明有新的连接请求,调用Accepter处理
}
else // 如果就绪的fd不是监听socket,而是普通的数据连接socket
{
Recver(fd, i); // 调用Recver处理数据接收
}
}
}
}

// 服务器主循环:使用poll实现IO多路复用
void Start()
{
_event_fds[0].fd = _listensock.Fd(); // 把监听socket的fd放到数组的第一个位置
_event_fds[0].events = POLLIN; // 监听socket只关心POLLIN事件(新连接请求)
int timeout = 3000; // 设置poll超时时间为3000毫秒(3秒),如果3秒内没有事件发生,poll会返回0

for (;;) // 服务器无限循环,持续监听和处理事件
{
int n = poll(_event_fds, fd_num_max, timeout); // 调用poll等待事件发生

// 根据poll返回值进行处理
switch (n)
{
// poll超时处理(timeout时间内没有事件发生)
case 0:
cout << "time out, timeout: " << timeout << endl; // poll超时,没有事件发生
break;
// poll调用出错处理
case -1:
cout << "select error, errno:" << errno << endl; // poll调用出错
break;
// poll成功处理(有事件发生)
default:
cout << "得到了一个新的连接请求!" << endl; // poll成功,有事件发生(注意:这里注释可能不准确,有事件发生不一定都是连接请求)
Dispatcher(); // 调用Dispatcher处理所有就绪的事件
break;
}
}
}

private:
Sock _listensock; // 监听socket对象
uint16_t _port; // 服务器端口号
struct pollfd _event_fds[fd_num_max]; // poll需要的数组,存储要监听的fd和事件(结构体数组,数组每一个位置都是结构体)
// struct pollfd *_event_fds;

// int fd_array[fd_num_max];
// int wfd_array[fd_num_max]; // 扩展写事件处理
};

6. poll 小结

1. poll 的优点

  1. 输入输出参数分离: struct pollfd 里有 events(输入)和 revents(输出),不像 select 那样每次都要重新设置集合。
  2. 监控数量不限: 不再受 fd_set 的 1024 比特位限制,想监控多少个 fd,就传多少个 pollfd 元素,具体监控多少个由第二个参数 nfds 决定。
  3. 同时等待多个 fd,提高效率:“等”的时间可以重叠,IO 效率比阻塞式 IO 高得多。

2. poll 的缺点

  1. 返回后仍需遍历: 需要遍历整个 fds 数组,找出哪些 fd 已经就绪。
  2. 用户态与内核态拷贝开销大: 每次调用都要把整个 pollfd 数组复制进内核,fd 多时性能会明显下降。
  3. 内核仍是线性扫描: 内核依然要挨个检查每个 fd,就绪检测效率低,当 fd 数量非常大时,性能退化明显。

3. 与 select 的核心区别

对比项selectpoll
fd 表达方式位图 (fd_set)结构体数组 (pollfd[])
fd 数量限制有(通常 1024)理论无限(由系统资源决定)
参数是否要重置每次都要重新设置不用重置,只更新有变化的项
内核检测机制遍历所有 fd同样遍历,但结构更清晰

pollselect 的改良版,解决了 fd 数量上限和参数重置的问题,但底层依然是“遍历式检测”,性能瓶颈依旧。