01.进程通信
进程之间需要某种协同,所以如何协同的前提条件就是进程直接需要进行通信,传递有效数据
前面提到过,进程是具有独立性的,进程=内核数据结构+代码和数据
前面讲到子进程创建会继承父进程的信息,后面会发生写时拷贝,不属于进程间通信,我们提到的进程间通信,是让其一直通信
进程如何通信呢?
因为进程具有独立性,所以一个进程开辟的资源另一个进程是看不到的,所以进程间通信的前提,先让不同的进程,看到同一份(操作系统)资源(“一段内存”)
一定是某一个进程先需要通信,让OS创建一个共享资源
OS必须提供很多系统调用
OS创建的共享资源的不同,系统调用接口的不同,进程间通信会有不同的种类
进程通信的常见方式:
System V IPC:System V 消息队列,System V 共享内存,System V 信号量
POSIX IPC:消息队列,共享内存,信号量,互斥量,条件变量,读写锁
管道:匿名管道pipe,命名管道
02.管道操作系统打开一个文件,属性初始化struct file,内容写到内核级文件缓冲区
当以读和写两种方式分别打开同一个文件时,操作系统为其分配文件描述符fd 3 4 ,当第二次打开同一个文件的时候,操作系统不需要再将文件的属性,操作方法集,缓冲区再加载一次,只有struct file会被单独创建两次
创建子进程以父进程模版copy一份,子进程文件描述符表也创建一份
创建子进程,还需要为3,4描述符再拷贝两个struct file吗?答案是不用的,进程的独立性跟文件没有关系
这里的拷贝类似浅拷贝的过程,子进程的3,4号指针也会指向文件系统中父进程指向的同一个struct file
所以为什么父子进程会向同一个显示器终端打印数据?就是因为文件描述符指向同一个文件
进程默认会打开三个标准输入输出:0,1,2,怎么做到的?bash的子进程–bash打开了,所有的子进程默认也就打开了,我们只要做好约定即可
我们子进程主动close(0/1/2),不影响父进程继续使用显示器文件
前面也提到,文件会记录自己的硬链接数,这里struct file也会记录指向自己的文件描述符个数,当ref_count等于0时释放文件资源
进程间通信的本质,先让两个不同的进程看到一份公共的资源,这里父子进程看到了同一块文件内核级缓冲区,这里的公共资源,我们就将它叫做管道文件
管道只允许单向通信,不需要刷新到磁盘,所以需要重新设计通信接口
#include
int pipe(int fd[2]);
参数 fd:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端 返回值:成功返回0,失败返回错误代码
本质是对open的封装,不需要文件路径和文件名,所以叫做匿名管道
如果想双向通信,就构建两个管道
测试管道接口代码语言:javascript代码运行次数:0运行复制#include<iostream>#include<string>#include<cerrno>#include<unistd.h>#include<sys/types.h>#include<cstring>#include<sys/wait.h>using namespace std;string getOtherMessage(){ static int cnt=0; string messageid=to_string(cnt); cnt++; pid_t self_id =getpid(); string stringpid =to_string(self_id); string message = "messageid: "; message+=messageid; message+="my pid is: "; message+=stringpid+"\n"; return message;}void SubProcessWrite(int wfd){ string message="I am chile process"; while(true) { string info=message+ getOtherMessage(); write(wfd,info.c_str(),info.size());//写入管道的时候,没必要写入\0,字符串跟文件没关系 sleep(1); }}void ProcessRead(int rfd){ char buffer[1024]; while(true) { ssize_t n =read(rfd,buffer,sizeof(buffer)-1); if(n>0) { buffer[n]=0; cout<< "father get message:"<<buffer<<endl; } }}int main(){ int pipefd[2]; int n =pipe(pipefd); if(n!=0) { cerr<<"errno:"<<errno<<" "<<"errstring:"<<strerror(errno)<<endl; } cout<<"pipefd[0]:"<<pipefd[0]<<",pipefd[1]:"<<pipefd[1]<<endl; sleep(1); //pipefd[0]:r,pipefd[1]:w pid_t id =fork(); //让子进程写,父进程读 if(id==0) { sleep(1); //子进程 close(pipefd[0]); SubProcessWrite(pipefd[1]); exit(0); } sleep(1); close(pipefd[1]); ProcessRead(pipefd[0]); pid_t rid=waitpid(id,nullptr,0); if(rid>0) { cout<<"wait chile process done"<<endl; } return 0;}
1. 管道创建 (pipe)
代码语言:javascript代码运行次数:0运行复制int pipefd[2];int n = pipe(pipefd);
2. 子进程的写入管道
代码语言:javascript代码运行次数:0运行复制if (id == 0){ sleep(1); close(pipefd[0]); // 子进程关闭管道的读端 SubProcessWrite(pipefd[1]); exit(0);}
3. 父进程的读取管道
代码语言:javascript代码运行次数:0运行复制close(pipefd[1]); // 父进程关闭管道的写端ProcessRead(pipefd[0]);
4. 父进程读取的过程 (ProcessRead)
父进程通过 read() 从管道的读端读取数据并存储在 buffer 中。读取到的数据被打印出来,并且没有添加字符串结束符 \0,所以需要手动为 buffer 添加终止符。5. 子进程写入的过程 (SubProcessWrite)
子进程每秒钟向管道写入一条包含 message 和动态生成的消息的字符串。getOtherMessage 会生成类似于 messageid 和 pid 的信息。write() 函数向管道的写端发送数据。6. 等待子进程结束 (waitpid)
代码语言:javascript代码运行次数:0运行复制pid_t rid = waitpid(id, nullptr, 0);if (rid > 0){ cout << "wait child process done" << endl;}
我们观察到的现象,子进程写一条,父进程读一条
管道的四种情况:
读进程被阻塞:管道内是空的同时write fd没有关闭,读取条件不具备管道被写满并且read fd不读且没有关闭,管道被写满,写进程会被阻塞(管道被写满,写条件不具备)管道一直在读但是写端关闭了wfd,读端read返回值会读到0,表示读到了文件结尾rfd直接关闭,写端进程会被操作系统直接使用13号信号关掉,相当于进程出现了异常管道的五种特征:
匿名管道:只用来进行具有血缘关系的进程之间进行通信,常用于父子进程之间通信管道内部,自带进程之间同步的机制,多执行流执行代码的时候,具有明显的顺序性,子进程写一个,父进程读一个管道文件的生命周期是随进程的管道文件在通信的时候,是面向字节流的,write的次数和读取的次数不是一一匹配的管道是半双工的,数据只能向一个方向流动;需要双方通信时,需要建立起两个管道如果单次向管道写入的数据小于PIPE_BUF,写入过程就是原子的,读写是安全的,不会出现写一半就被读走的情况
03.进程池我们能否设计出这么一个东西,父进程(master)提前创建出一批子进程,有任务就把任务交给子进程、
管道里面没有数据,worker进程就在阻塞等待
代码语言:javascript代码运行次数:0运行复制#include <iostream>#include <string>#include <unistd.h>#include <vector>#include <sys/types.h>using namespace std;void work(int rfd){ while(true) sleep(1);}class Channel{public: Channel(int wfd, pid_t id, const string &name) : _wfd(wfd), _subprocessid(id), _name(name) {} int Getwfd() { return _wfd; } pid_t Getid(){ return _subprocessid; } string Getname() { return _name; }private: int _wfd; pid_t _subprocessid; string _name;};int main(int argc, char *argv[]){ if (argc != 2) { cerr << argv[0] << "process num" << endl; return 1; } vector<Channel> channels; int num = stoi(argv[1]); for (int i = 0; i < num; i++) { // 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1); // 创建子进程 pid_t id = fork(); if (id == 0) { // 子 close(pipefd[1]); work(pipefd[0]); exit(0); } // 父 string name = "Channel" + to_string(i); close(pipefd[0]); channels.push_back(Channel(pipefd[1], id, name)); } for(auto& e:channels) { cout<<e.Getname()<<"-"<<e.Getwfd()<<"-"<<e.Getid()<<endl; } return 0;}
Channel 类
Channel 代表一个通信通道,包含: _wfd:用于向子进程写入数据的 管道写端。_subprocessid:存储 子进程 ID。_name:通道的名称(例如 Channel0、Channel1 等)。main 函数
代码语言:javascript代码运行次数:0运行复制int main(int argc, char *argv[]){ if (argc != 2) { cerr << argv[0] << " process num" << endl; return 1; }
创建子进程和管道
代码语言:javascript代码运行次数:0运行复制vector<Channel> channels; int num = stoi(argv[1]); for (int i = 0; i < num; i++){ int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1);
** 创建子进程**
代码语言:javascript代码运行次数:0运行复制pid_t id = fork();if (id == 0){ // 子进程 close(pipefd[1]); // 关闭管道的写端 work(pipefd[0]); // 调用 work() 进入死循环 exit(0); // 这里不会执行到,因为 work() 死循环}
父进程管理通道
代码语言:javascript代码运行次数:0运行复制// 父进程string name = "Channel" + to_string(i);close(pipefd[0]); // 关闭管道的读端channels.push_back(Channel(pipefd[1], id, name));
到这里管道就创建好了
这里会预先构建好任务表,函数指针数组,父进程不断往管道里写入任务码子进程读取
代码语言:javascript代码运行次数:0运行复制#pragma once#include<iostream>#include<ctime>#include<stdlib.h>#include<unistd.h>#define TaskNum 3typedef void(*task_t)();task_t tasks[TaskNum];void Print(){ std::cout<<"I am print task"<<std::endl;}void DownLoad(){ std::cout<<"I am DownLoad task"<<std::endl;}void Flush(){ std::cout<<"I am Flush task"<<std::endl;}void InitTask(){ srand(time(nullptr) ^ getpid() ^ 17777); tasks[0]=Print; tasks[1]=DownLoad; tasks[2]=Flush;}void ExcuteTask(int number)//执行任务{ if(number<0||number>2) return; tasks[number]();}int SelectTask(){ return rand() % TaskNum;}
定义一个大小为 TaskNum(即 3)的函数指针数组 tasks,用于存储任务函数 使用 rand() 生成一个随机数,并通过取模运算(% TaskNum)将其限制在 0 到 TaskNum - 1 之间
接下来通过channel控制子进程:
a.选择一个任务 b.选择一个信道和进程 c.发送任务代码语言:javascript代码运行次数:0运行复制#include <iostream>#include <string>#include <unistd.h>#include <vector>#include <sys/types.h>#include "Task.hpp"using namespace std;void work(int rfd){ while (true) { int command = 0; int n = read(rfd, &command, sizeof(command)); ExcuteTask(command); }}class Channel{public: Channel(int wfd, pid_t id, const string &name) : _wfd(wfd), _subprocessid(id), _name(name) { } int Getwfd() { return _wfd; } pid_t Getid() { return _subprocessid; } string Getname() { return _name; }private: int _wfd; pid_t _subprocessid; string _name;};void CreateChannelandSub(vector<Channel> &channels, int num){ for (int i = 0; i < num; i++) { // 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1); // 创建子进程 pid_t id = fork(); if (id == 0) { // 子 close(pipefd[1]); work(pipefd[0]); exit(0); } // 父 string name = "Channel" + to_string(i); close(pipefd[0]); channels.push_back(Channel(pipefd[1], id, name)); }}void SendTaskCommand(Channel &channels, int taskcommand){ write(channels.Getwfd(), &taskcommand, sizeof(taskcommand));}// 0 1 2 3 4 channelnumint NextChannel(int channelnum){ static int next = 0; int channel = next; next++; next %= channelnum; return channel;}int main(int argc, char *argv[]){ if (argc != 2) { cerr << argv[0] << "process num" << endl; return 1; } int num = stoi(argv[1]); InitTask(); vector<Channel> channels; CreateChannelandSub(channels, num); // 2.通过channel控制子进程 // a.选择一个任务 b.选择一个信道和进程 while (true) { sleep(1); int taskcommand = SelectTask(); int channel_index = NextChannel(channels.size()); // c.发送任务 SendTaskCommand(channels[channel_index], taskcommand); cout << "taskcommand: " << taskcommand << " channel:" << channels[channel_index].Getname() << " sub process:" << channels[channel_index].Getid() << endl; } return 0;}
子进程的工作函数
代码语言:javascript代码运行次数:0运行复制void work(int rfd){ while (true) { int command = 0; int n = read(rfd, &command, sizeof(command)); ExcuteTask(command); }}
发送任务命令
代码语言:javascript代码运行次数:0运行复制void SendTaskCommand(Channel &channel, int taskcommand){ write(channel.Getwfd(), &taskcommand, sizeof(taskcommand));}
int NextChannel(int channelnum){ static int next = 0; int channel = next; next++; next %= channelnum; return channel;}
int main(int argc, char *argv[]){ if (argc != 2) { cerr << argv[0] << " process num" << endl; return 1; } int num = stoi(argv[1]); // 获取子进程数量 InitTask(); // 初始化任务 vectorchannels; CreateChannelandSub(channels, num); // 创建管道和子进程 // 主循环:分配任务给子进程 while (true) { sleep(1); int taskcommand = SelectTask(); // 随机选择一个任务 int channel_index = NextChannel(channels.size()); // 轮询选择子进程 SendTaskCommand(channels[channel_index], taskcommand); // 发送任务 // 打印任务分配信息 cout << "taskcommand: " << taskcommand << " channel:" << channels[channel_index].Getname() << " sub process:" << channels[channel_index].Getid() << endl; } return 0;}
class Channel{public:--- void CloseChannle() { close(_wfd); } void Wait() { pid_t rid=waitpid(_subprocessid,nullptr,0); if(rid>0) { cout<<"wait "<<rid<<" success"<<endl; } }------private: int _wfd; pid_t _subprocessid; string _name;};void CleanUpChannel(vector<Channel>& channels){ for(auto & channel:channels) { channel.CloseChannle(); } for(auto & channel:channels) { channel.Wait(); }}
发送10次任务,当父进程关闭wfd,子进程read读到0也就退出了
void CreateChannelandSub(vector<Channel> &channels, int num,task_t task){ for (int i = 0; i < num; i++) { // 创建管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) exit(1); // 创建子进程 pid_t id = fork(); if (id == 0) { // 子 close(pipefd[1]); dup2(pipefd[0],0); task(); exit(0); } // 父 string name = "Channel" + to_string(i); close(pipefd[0]); channels.push_back(Channel(pipefd[1], id, name)); }}void work(){ while (true) { int command = 0; int n = read(0, &command, sizeof(command)); if(n==sizeof(int))ExcuteTask(command); else if(n==0) { cout<<"sub process: "<<getpid()<<" quit"<<endl; break; } }}
这里可以设置为不从管道中读,从标准输入读取,work本身就是一个task,这里将work改为一种
我现在传的这个函数是没有参数和返回值的,但是work必须要知道管道rfd,所以这里就先让0重定向到rfd,后面我函数里面从0读取即可
以上就是【Linux】进程间通信:匿名管道与进程池的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号