1. 初始化进程池
进程池的实现是依靠匿名管道,通过进程间通信使得父进程能够管理多个进程任务,相当于父进程拥有了很多个进程——进程池,通过不同的进程完成指定的任务。 所以我们需要创建多个匿名管道和子进程,进行进程间通信,发送信息给子进程让它们根据接收到的信息处理相关任务。 因为有多个管道和子进程,为了方便父进程使用不同管道发送对应信息给子进程,我们需要将管道的文件描述符以及对应子进程的pid保存起来,我们选择将它们封装在一个c++hannel类中。又因为有多个匿名管道和子进程,所以将多个channel类对象储存在c++stl中的容器vector中来方便父进程进行管理进程池。
代码如下:
代码语言:javascript代码运行次数:0运行复制int InitProcesspool(int num,std::vector<Channel>& channels){ for(int i = 0; i < num; i++)//使用循环创建多个匿名管道和子进程 { //1.创建匿名管道 int pipefd[2] = {0}; int n = pipe(pipefd); if(n < 0) return 2;//根据不同的返回值判断原因,也可以使用枚举来约定返回值代表的内容 //2.创建子进程 pid_t id = fork(); if(id < 0) return 3; //3.建立通信管道,父子进程关闭读端或写端 if(id == 0)//子进程 { //子进程读取,关闭写端 ::close(pipefd[1]); //dup2 dup2(pipefd[0],0); //子进程需要执行的内容 Work(); ::exit(0); } //父进程 //父进程写入,关闭读端 ::close(pipefd[0]); channels.emplace_back(pipefd[1],id);//保存在channel对象中并存入vector } return 0;}
对于Channel类:
代码语言:javascript代码运行次数:0运行复制class Channel{public: Channel(int fd,pid_t who):_fd(fd),_who(who) { _name = "Channel-"+std::to_string(fd)+"-"+std::to_string(who); } std::string GetName() { return _name; } int GetFd() { return _fd; } pid_t GetWho() { return _who; } void Send(int num)//父进程往匿名管道发送信息 { ::write(_fd,&num,sizeof(num)); } ~Channel() { }private: int _fd;//保存匿名管道通信的文件描述符 std::string _name;//名字(自己取的) pid_t _who;//子进程pid};
执行任务之前我们需要先确定有哪些任务,怎么执行…所以我们需要进行任务管理,同样我们也是使用一个类TaskManager来进行任务管理:
代码语言:javascript代码运行次数:0运行复制#include<iostream>#include<unordered_map>#include<functional>#include<ctime>using task_t = std::function<void()>;//函数指针//不同任务函数 void Load() { std::cout<<"正在执行加载任务..."<<std::endl; } void Del() { std::cout<<"正在执行删除任务..."<<std::endl; } void Log() { std::cout<<"正在执行日志任务..."<<std::endl; }static int number = 0;class TaskManager{public: TaskManager() { srand(time(nullptr)); InsertTask(Load); InsertTask(Del); InsertTask(Log); } int SelectTask() { return rand()%number; } void InsertTask(task_t t) { m[number++] = t; } void Excute(int num) { if(m.find(num) == m.end()) return; m[num]();//执行任务 } ~TaskManager() { }private: std::unordered_map<int,task_t> m;//使用map封装数字与对应的任务函数指针};TaskManager tm;
选择新创建一个源文件Task.hpp来封装上述内容,上述任务管理类中我们使用map来保存数字与任务函数指针的相关关系,这样通过数字就可以确定是哪一个任务函数;此外选择任务使用的方法是随机数的方法,大家可以根据自己的想法确定不同的方式。
2.2 执行任务发送任务代码语言:javascript代码运行次数:0运行复制void ExcuteTask(std::vector<Channel>& channels){ int n = 0; int count = 10; while(count--)//执行10次任务 { //1.选择任务,获取任务编号 int tasknum = tm.SelectTask(); //2.选择子进程,使用轮询选择,派发任务 channels[n++].Send(tasknum); n%=channels.size(); std::cout<<std::endl; std::cout<<"*****成功发送"<<10-count<<"个任务*****"<<std::endl; sleep(2);//每个2s发送一个任务 }}
//子进程接受并执行任务void Work(){ while(true) { int num = 0; int n = ::read(0,&num,sizeof(num)); if(n == sizeof(num))//读取成功 tm.Excute(num);//不要发成n else if(n == 0) { break; } else { break; } }}
void CleanProcesspool(std::vector<Channel>& channels){ for(auto& c : channels) ::close(c.GetFd()); for(auto& c : channels) { pid_t rid = ::waitpid(c.GetWho(),nullptr,0); if(rid <= 0) { std::cout<<std::endl; std::cout<<"清理子进程失败..."<<std::endl; return; } } std::cout<<std::endl; std::cout<<"成功清理子进程..."<<std::endl;}
注意这里不能使用一个循环来进行清理,如下面代码是错误的:
代码语言:javascript代码运行次数:0运行复制void CleanProcesspool(std::vector<Channel>& channels){ for(auto& c : channels)//只使用一次循环 { ::close(c.GetFd()); pid_t rid = ::waitpid(c.GetWho(),nullptr,0); if(rid <= 0) { std::cout<<std::endl; std::cout<<"清理子进程失败..."<<std::endl; return; } } std::cout<<std::endl; std::cout<<"成功清理子进程..."<<std::endl;}
这是因为在创建子进程时,子进程会继承父进程的文件描述符表,因此在第一个匿名管道创建后,例如父进程的4号文件描述符指向该匿名管道写端,那么在创建第二个子进程时,该子进程会继承4号文件描述符也指向第一个匿名管道写端,因此创建的子进程越多,前面匿名管道写端被指向的就越多,所以仅仅关闭一个进程的写端指向,还有其他的写端指向,所以读端无法读到0,也就无法退出,如下图所示:
如果要使用一个循环来清理回收子进程,我们可以从后往前关闭文件描述符,因为最后一个管道写端只有父进程指向。
4. 封装与完整实现对于父进程管理进程池我们可以封装一个类来更好的进行管理与实现:
代码语言:javascript代码运行次数:0运行复制#include <iostream>#include <string>#include <vector>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <sys/wait.h>#include "Task.hpp"#include "Channel.hpp"// masterclass ProcessPool{public: int InitProcesspool(int num) { for (int i = 0; i < num; i++) { // 1.创建匿名管道 int pipefd[2] = {0}; int n = pipe(pipefd); if (n < 0) return 2; // 2.创建子进程 pid_t id = fork(); if (id < 0) return 3; // 3.建立通信管道,父子进程关闭读端或写端 if (id == 0) // 子进程 { // 子进程读取,关闭写端 ::close(pipefd[1]); // dup2 dup2(pipefd[0], 0); Work(); ::exit(0); } // 父进程 // 父进程写入,关闭读端 ::close(pipefd[0]); channels.emplace_back(pipefd[1], id); } return 0; } void ExcuteTask() { int n = 0; int count = 10; while (count--) // 执行10次任务 { // 1.选择任务,获取任务编号 int tasknum = tm.SelectTask(); // 2.选择子进程,使用轮询选择,派发任务 channels[n++].Send(tasknum); n %= channels.size(); std::cout << std::endl; // std::cout<<"**************************"<<std::endl; std::cout << "*****成功发送" << 10 - count << "个任务*****" << std::endl; // std::cout<<"**************************"<<std::endl; // std::cout<<std::endl; sleep(3); } } void CleanProcesspool() { for (auto &c : channels) ::close(c.GetFd()); for (auto &c : channels) { pid_t rid = ::waitpid(c.GetWho(), nullptr, 0); if (rid <= 0) { std::cout << std::endl; std::cout << "清理子进程失败..." << std::endl; return; } } std::cout << std::endl; std::cout << "成功清理子进程..." << std::endl; }private: std::vector<Channel> channels;};
main函数:
代码语言:javascript代码运行次数:0运行复制#include "ProcessPool.hpp"int main(int argc, char* argv[]){ //0.获取应该创建管道个数num个 if(argc!=2) { std::cout<<"请输入管道个数."<<std::endl; return 1; } int num = std::stoi(argv[1]); std::vector<Channel> channels; ProcessPool* pp = new ProcessPool; //1.初始化进程池——创建进程池 pp->InitProcesspool(num); //2.执行任务 pp->ExcuteTask(); //3.任务执行完成,回收子进程 pp->CleanProcesspool(); delete pp; return 0;}
运行结果如下:
以上就是【Linux】匿名管道通信场景——进程池的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号