031 进程间通信 —— 匿名管道篇

进程间通信 —— 匿名管道篇

1. 什么是管道

管道就是一个内核缓冲区,允许一个进程写数据,另一个进程从中读数据。 它像一根水管:一头写入,一头读取,中间是内核帮我们传递数据。

image-20250617212422402

image-20250617211905112

2. 管道的直接原理

1. 底层本质

管道就是操作系统在内核空间里开辟的一块 内存缓冲区,这个缓冲区由内核维护,进程不能直接访问,只能通过 文件描述符 进行读写。

  • 管道使用了 环形缓冲区(循环队列结构),读写两端由内核控制。

  • 当我们调用 pipe(fd),操作系统会:

    • 在内核里创建一个缓冲区。

    • 返回两个文件描述符:

      • fd[0]:读端读进程:read(fd[0], buf, size); 从管道中读取数据(从内核缓冲区读)。
      • fd[1]:写端写进程:write(fd[1], data, size); 把数据写入管道(进入内核缓冲区)。

      pipefd[0]0 → 嘴巴 → 读书 → 读端
      pipefd[1]1 → 钢笔 → 写字 → 写端

image-20250617211550142

3. 匿名管道的接口

1. pipe() 函数原型

在 Linux 中,pipe() 是用于创建 匿名管道 的系统调用,原型如下:

1
2
#include <unistd.h>			// 头文件
int pipe(int pipefd[2]); // 函数声明

参数解释:

pipefd[2] 是一个 整型数组,用来返回两个文件描述符:

  • pipefd[0]读端
  • pipefd[1]写端

返回值:

  • 成功:返回 0
  • 失败:返回 -1,并设置 errno

pipe() 创建的是匿名管道。也就是说:匿名管道不能跨无亲缘关系的进程通信,通常用于 父子进程 或 具有共同祖先进程的兄弟进程 间的通信。

2. demo 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <unistd.h>
using namespace std;

int main()
{
int pipefd[2] = { 0 };
int n = pipe(pipefd);

if (n < 0)
{
return 1;
}

cout << "pipefd[0]: " << pipefd[0] << " , pipefd[1]: " << pipefd[1] << endl;

return 0;
}

运行结果:

  • 代码中 int pipefd[2] = { 0 }; 将数组初始化为 [0, 0],但这只是 临时状态
  • pipe()系统调用,它的核心功能是由操作系统内核实现的。调用时,内核会:
    1. 忽略 我们传入的初始值(pipefd 只是用于接收结果的缓冲区)。
    2. 动态分配 两个可用的文件描述符(通常是当前未用的最小数值),并写入到 pipefd 中。

image-20250617220653484

程序输出了:pipefd[0]: 3 , pipefd[1]: 4。原因:Linux 中,0/1/2 已默认分配给 stdin/stdout/stderr,就不过多赘述了。如果重复创建管道(例如在循环中),描述符会如何变化?

image-20250617222145806

解释:

1
2
3
3, 4    // 上一次分配 3(读), 4(写),关闭 3,但 4 还在
3, 5 // 再次分配最小未用的是 3(读), 5(写)
3, 6 // 再次分配 3(读), 6(写)

实现一个最基础的 匿名管道通信模型,用于父子进程之间传输数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <iostream>
#include <cstdio>
#include <string>
#include <cstring>
#include <cstdlib>
#include <unistd.h> // 提供 pipe, fork, read, write, sleep 等系统调用
#include <sys/types.h>
#include <sys/wait.h> // 提供 waitpid
using namespace std;

#define N 2
#define NUM 1024

// 子进程写入函数
void Writer(int wfd)
{
string s = "我是子进程"; // 待发送内容
pid_t self = getpid(); // 获取当前子进程 PID
int number = 0;

char buffer[NUM]; // 缓冲区

while (true)
{
sleep(1); // 每秒写一次,节省 CPU

buffer[0] = 0; // 清空缓冲区,表明我们是用它当字符串用

// 构造发送信息:我是子进程-子进程pid-序号
snprintf(buffer, sizeof(buffer), "%s-%d-%d", s.c_str(), self, number++);
cout << buffer << endl; // 本地输出(写给终端)

// 将内容写入管道(系统调用 write),供父进程读取
write(wfd, buffer, strlen(buffer));
}
}

// 父进程读取函数
void Reader(int rfd)
{
char buffer[NUM]; // 读取缓冲区

while (true)
{
buffer[0] = 0; // 清空

// 读取管道数据,read 是系统调用
ssize_t n = read(rfd, buffer, sizeof(buffer));

if (n > 0)
{
buffer[n] = 0; // 加上字符串结束符 '\0',确保安全打印
cout << "父进程收到消息[" << getpid() << "]# " << buffer << endl;
}
else if (n == 0) // 返回 0 表示对端写入端关闭,读到 EOF
{
printf("父进程读到 EOF!\n");
break;
}
else // 读取失败
{
break;
}
}
}

int main()
{
int pipefd[N] = { 0 }; // pipefd[0]: read 端, pipefd[1]: write 端
int n = pipe(pipefd); // 创建匿名管道

if (n < 0)
{
perror("pipe");
return 1; // 创建失败返回
}

// 创建子进程(fork)
pid_t id = fork();
if (id < 0)
{
perror("fork");
return 2; // 创建失败
}

if (id == 0)
{
// 子进程逻辑
close(pipefd[0]); // 关闭读端,只写

Writer(pipefd[1]); // 执行写入任务

close(pipefd[1]); // 写完关闭写端
exit(0); // 退出子进程
}

// 父进程逻辑
close(pipefd[1]); // 父进程关闭写端,只读

Reader(pipefd[0]); // 执行读取任务(会阻塞等待)

// 等待子进程退出
pid_t rid = waitpid(id, 0, 0);
if (rid < 0)
{
perror("waitpid");
return 3;
}

close(pipefd[0]); // 关闭读端

sleep(5); // 给终端输出留个时间
return 0;
}

snprintf 函数原型

1
2
>#include <cstdio>
>int snprintf(char *str, size_t size, const char *format, ...);

参数解释:

参数名 含义
str 输出缓冲区(目标字符串),用于保存格式化后的字符串
size str 缓冲区的最大容量(包括结尾的 \0
format 格式字符串(类似 printf
... 可变参数,对应 format 中的格式说明符

返回值:

  • 如果成功:返回 欲写入的字符串长度不包括结尾的 \0)。
  • 如果返回值 ≥ size:说明输出被截断(因为目标缓冲区太小)。
  • 如果返回值 < size:说明字符串成功写入,结尾自动加上了 \0

作用总结:

snprintf 是一种 安全版本sprintf,能防止内存溢出。常用于 格式化字符串写入缓冲区。相比 sprintf,它加了一个 长度限制参数 size,从而更安全:

1
2
>sprintf(buf, "%d-%s", id, name);      				// ⚠️ 可能溢出
>snprintf(buf, sizeof(buf), "%d-%s", id, name); // 安全

上面的代码的运行结果就不演示了,从关闭父子进程的读端和写端就可以发现:

image-20250618130238994

站在文件描述符角度深入理解管道

image-20250618130445595

站在内核角度理解管道的本质

image-20250618131847304

4. 重新认识管道

1. 管道也是文件吗?

是的,在 Linux 中,管道是“特殊类型的文件”,非磁盘文件(准确说,是一种特殊的 I/O 通道),完全符合:「一切皆文件」:键盘、鼠标、终端、套接字、管道、设备,全都是文件,统一用文件描述符(int fd)访问。

匿名管道没有名字,但有文件描述符。 它在内核中创建一个缓冲区,并返回两个文件描述符指向它,但它 没有路径名,在 /proc/[pid]/fd/ 中也只表现为:

1
3 -> pipe:[12345]

2. 管道有没有固定大小?可以写多少内容?

有:内核缓冲区大小是有限的: 默认大小一般是 65536 字节(64KB),不同系统下可以通过命令查看:

1
cat /proc/sys/fs/pipe-max-size      # 管道最大容量

3. 匿名管道的 5 个特征

1. 具有血缘关系的进程进行进程间通信

有血缘关系的进程通信:匿名管道(pipe())只支持 父子或兄弟 这种有“血缘”的进程通信(还存在爷孙关系,非常少见)。原因:匿名管道没有文件路径,只能靠 fork() 时继承文件描述符传递给子进程。

2. 匿名管道只能单向通信,双向可以使用多管道

pipefd[0] 是读端,pipefd[1] 是写端,本质就是单向数据流。父子进程需要互相通信 → 开两个 pipe

3. 父子进程协同通信 = 同步 + 互斥(保护管道文件的数据安全)

管道通信是 阻塞 I/O 的体现,天然就是同步机制。详见下方的匿名管道中的 4 中情况

  • 读阻塞(没有数据时) ⇒ 读线程自动挂起,等待写线程唤醒(即写入数据)。
  • 写阻塞(写满时) ⇒ 写线程自动挂起,等待读线程消费。

这是典型的 生产者-消费者模型,操作系统自动帮我们实现了互斥和同步。

4. 管道是面向字节流的

管道是“字节流”接口,readwrite 都是面向字节,没有结构、没有分隔、没有消息边界。

5. 管道是基于文件的,但不落盘,生命周期跟随进程

匿名管道 = 临时文件 = 内核缓冲区,使用文件描述符访问pipe() 创建的管道,不存在磁盘上,进程退出后自动销毁。


4. 匿名管道中的 4 中情况

编号 场景 阻塞? 原因 & 说明
1 管道没数据,读端阻塞 ✅ 是 等待写端写数据
2 管道满了,写端阻塞 ✅ 是 等待读端消费数据
3 写端关闭,读端读取 ❌ 否 read() 返回 0,表示 EOF,不会阻塞
4 读端关闭,写端写入 ❌ 失败 写端收到 SIGPIPE 信号 → 默认会被杀死(转到情况 3)

深度解析:第 4 种情况为什么会崩?

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#include <iostream>
#include <unistd.h> // pipe, fork, write, close, read
#include <cstdlib> // exit
#include <cstring> // strlen
#include <sys/wait.h> // waitpid
#include <cerrno> // errno
#include <cstdio> // perror
using namespace std;

// 子进程写入管道(触发SIGPIPE)
void ChildWrite(int wfd)
{
const char* msg = "你好,父进程!\n";
int count = 0;
while (true)
{
cout << "子进程写入 #" << count++ << endl;
ssize_t n = write(wfd, msg, strlen(msg));
if (n == -1)
{
perror("子进程写入管道失败!");
cout << "errno: " << errno << endl;
break;
}

sleep(1); // 控制写入频率
}

// 正常情况下不会执行到这里(SIGPIPE会终止进程)
cout << "子进程关闭写端,退出..." << endl;
close(wfd);
exit(1);
}

// 父进程读管道后关闭读端(触发子进程SIGPIPE)
void ParentRead(int rfd)
{
char buf[1024];
for (int i = 0; i < 5; ++i)
{
ssize_t n = read(rfd, buf, sizeof(buf) - 1);
if (n > 0)
{
buf[n] = '\0';
cout << "父进程读到:" << buf;
}
sleep(1); // 模拟读操作
}

cout << "父进程关闭读端(将触发子进程SIGPIPE)..." << endl;
close(rfd); // 关闭读端
}

int main()
{
int pipefd[2];
if (pipe(pipefd) == -1)
{
perror("管道创建失败");
return 1;
}

pid_t pid = fork();
if (pid < 0)
{
perror("创建子进程失败");
return 2;
}
else if (pid == 0) // 子进程:关闭读端,持续写入
{
close(pipefd[0]);
ChildWrite(pipefd[1]);
}

close(pipefd[1]); // 父进程:关闭写端,读取数据
ParentRead(pipefd[0]);

cout << "等待10秒,观察僵尸进程(PID=" << pid << ")..." << endl;
sleep(10); // 延迟等待(此时子进程成为僵尸进程)

// 回收子进程(将显示SIGPIPE终止)
int status = 0;
pid_t ret = waitpid(pid, &status, 0);
if (ret > 0)
{
if (WIFSIGNALED(status))
{
int sig = WTERMSIG(status);
cout << "子进程被信号终止,信号编号: " << sig << " (" << strsignal(sig) << ")" << endl;
}
else if (WIFEXITED(status))
{
cout << "子进程正常退出,exit code: " << WEXITSTATUS(status) << endl;
}
else
{
cout << "子进程异常退出" << endl;
}
}
else
{
perror("waitpid 失败");
}

cout << "父进程退出..." << endl;
return 0;
}

运行结果并验证:确实收到了 SIGPIPE 信号。

image-20250618170142443

当我们关闭读端后,系统就会发现没人接收了,就:

  • 向当前进程发送 SIGPIPE
  • 默认行为是 终止进程(kill)

实际开发中一般这么做防御:

1
signal(SIGPIPE, SIG_IGN); // 忽略 SIGPIPE 信号

然后再手动检查 write() 的返回值:

1
2
3
4
5
6
ssize_t n = write(wfd, buffer, len);
if (n == -1)
{
perror("write failed");
// 尝试重连 / 停止写入
}

操作系统哲学:“操作系统不做无意义、不必要、低效的工作。如果做了,就是操作系统的 BUG!”

比如:

  • 没有写端了 → 管道也就没用了,读再久也不会有数据 → 不如直接返回 0
  • 没有读端了 → 继续写是浪费 → 直接 kill 写入进程(SIGPIPE)

如果非要系统继续阻塞读,那就是设计缺陷(不会给无意义的等待)。


5. 匿名管道的应用场景

  1. shell 命令中的管道符 —— ps aux | grep nginx | wc -l 等。
  2. 实时数据处理 —— 监控系统、日志系统、流式数据预处理。
  3. 后端开发的进程管理 —— 主进程 + 子进程池(进程池模型)、数据库连接池等。

5. Shell 的管道符 |

Shell 的管道符 | 是一种 把一个命令的标准输出(stdout)传递给下一个命令的标准输入(stdin) 的方式。形式: 命令1 | 命令2 | 命令3。作用:实现多个命令之间的 数据流式传递,将它们组合成 处理流水线(pipeline)

1. 常见用法示例

1. grep —— 文本搜索利器

grep 用于在文本中按行查找 符合正则表达式的内容,是日志分析、文本处理的核心工具。

基本语法:

1
>grep [选项] "模式" 文件名
常用选项 含义
-i 忽略大小写(ignore case)
-v 反向匹配(只显示不包含模式的行)
-r 递归搜索目录
-n 显示匹配行的行号
--color=auto 高亮显示匹配部分

示例:

1
2
3
>grep error server.log               # 查找包含 "error" 的行
>grep -i http access.log # 忽略大小写搜索
>dmesg | grep -i usb # 在内核日志中查找 usb 相关信息

2. nginx —— 高性能 Web 服务器

此时还不涉及,留个悬念,以后再讲,示例:

1
>ps aux | grep nginx   # 查看 nginx 是否正在运行
  1. 统计当前登录用户数量:

    1
    who | wc -l			# who: 显示当前登录的用户,wc -l: 统计行数
  2. 查看系统中以 nginx 运行的进程数量:

    1
    ps aux | grep nginx | wc -l		# ps aux: 列出所有进程,grep nginx: 过滤包含 nginx 的行,wc -l: 统计行数(即进程数)
  3. 显示 /etc/passwd 中包含 “bash” 的用户名:

    1
    cat /etc/passwd | grep bash | cut -d: -f1		# cut -d: -f1: 用冒号分割字段,提取第一列(用户名)

2. 底层原理(深入理解)

当你写下:

1
A | B

Shell 做了如下事情:

  1. 调用 pipe() 创建一个匿名管道(两个文件描述符:读和写)。
  2. 使用 fork() 创建两个子进程。
  3. 子进程 A:将 stdout 重定向到管道的写端(dup2(pipefd[1], STDOUT_FILENO))。
  4. 子进程 B:将 stdin 重定向到管道的读端(dup2(pipefd[0], STDIN_FILENO))。
  5. A 执行命令 A,输出写入管道。
  6. B 执行命令 B,从管道读取数据。