首页 > 运维 > linux运维 > 正文

【Linux】匿名管道通信场景——进程池

看不見的法師
发布: 2025-06-19 16:58:34
原创
188人浏览过

1. 初始化进程池

  进程池的实现是依靠匿名管道,通过进程间通信使得父进程能够管理多个进程任务,相当于父进程拥有了很多个进程——进程池,通过不同的进程完成指定的任务。   所以我们需要创建多个匿名管道和子进程,进行进程间通信,发送信息给子进程让它们根据接收到的信息处理相关任务。   因为有多个管道和子进程,为了方便父进程使用不同管道发送对应信息给子进程,我们需要将管道的文件描述符以及对应子进程的pid保存起来,我们选择将它们封装在一个c++hannel类中。又因为有多个匿名管道和子进程,所以将多个channel类对象储存在c++stl中的容器vector中来方便父进程进行管理进程池。

代码如下:

代码语言:javascript代码运行次数:0运行复制
int InitProcesspool(int num,std::vector<Channel>& channels){    for(int i = 0; i < num; i++)//使用循环创建多个匿名管道和子进程    {        //1.创建匿名管道        int pipefd[2] = {0};        int n = pipe(pipefd);        if(n < 0) return 2;//根据不同的返回值判断原因,也可以使用枚举来约定返回值代表的内容        //2.创建子进程        pid_t id = fork();        if(id < 0) return 3;        //3.建立通信管道,父子进程关闭读端或写端        if(id == 0)//子进程        {            //子进程读取,关闭写端            ::close(pipefd[1]);            //dup2            dup2(pipefd[0],0);            //子进程需要执行的内容            Work();            ::exit(0);        }        //父进程        //父进程写入,关闭读端        ::close(pipefd[0]);        channels.emplace_back(pipefd[1],id);//保存在channel对象中并存入vector    }    return 0;}
登录后复制

对于Channel类:

代码语言:javascript代码运行次数:0运行复制
class Channel{public:    Channel(int fd,pid_t who):_fd(fd),_who(who)    {        _name = "Channel-"+std::to_string(fd)+"-"+std::to_string(who);    }    std::string GetName()    {        return _name;    }    int GetFd()    {        return _fd;    }    pid_t GetWho()    {        return _who;    }    void Send(int num)//父进程往匿名管道发送信息    {        ::write(_fd,&num,sizeof(num));    }    ~Channel()    {    }private:    int _fd;//保存匿名管道通信的文件描述符    std::string _name;//名字(自己取的)    pid_t _who;//子进程pid};
登录后复制
2. 进程池执行任务2.1 任务管理

  执行任务之前我们需要先确定有哪些任务,怎么执行…所以我们需要进行任务管理,同样我们也是使用一个类TaskManager来进行任务管理:

代码语言:javascript代码运行次数:0运行复制
#include<iostream>#include<unordered_map>#include<functional>#include<ctime>using task_t = std::function<void()>;//函数指针//不同任务函数    void Load()    {        std::cout<<"正在执行加载任务..."<<std::endl;    }    void Del()    {        std::cout<<"正在执行删除任务..."<<std::endl;    }    void Log()    {        std::cout<<"正在执行日志任务..."<<std::endl;    }static int number = 0;class TaskManager{public:    TaskManager()    {        srand(time(nullptr));        InsertTask(Load);        InsertTask(Del);        InsertTask(Log);    }    int SelectTask()    {        return rand()%number;    }    void InsertTask(task_t t)    {        m[number++] = t;    }    void Excute(int num)    {        if(m.find(num) == m.end())            return;        m[num]();//执行任务    }    ~TaskManager()    {    }private:    std::unordered_map<int,task_t> m;//使用map封装数字与对应的任务函数指针};TaskManager tm;
登录后复制

  选择新创建一个源文件Task.hpp来封装上述内容,上述任务管理类中我们使用map来保存数字与任务函数指针的相关关系,这样通过数字就可以确定是哪一个任务函数;此外选择任务使用的方法是随机数的方法,大家可以根据自己的想法确定不同的方式。

2.2 执行任务发送任务代码语言:javascript代码运行次数:0运行复制
void ExcuteTask(std::vector<Channel>& channels){    int n = 0;    int count = 10;    while(count--)//执行10次任务    {        //1.选择任务,获取任务编号        int tasknum = tm.SelectTask();        //2.选择子进程,使用轮询选择,派发任务        channels[n++].Send(tasknum);        n%=channels.size();        std::cout<<std::endl;        std::cout<<"*****成功发送"<<10-count<<"个任务*****"<<std::endl;                sleep(2);//每个2s发送一个任务    }}
登录后复制
接受并执行任务代码语言:javascript代码运行次数:0运行复制
//子进程接受并执行任务void Work(){    while(true)    {        int num = 0;        int n = ::read(0,&num,sizeof(num));        if(n == sizeof(num))//读取成功            tm.Excute(num);//不要发成n        else if(n == 0)        {            break;        }        else        {            break;        }    }}
登录后复制
3. 清理进程池代码语言:javascript代码运行次数:0运行复制
void CleanProcesspool(std::vector<Channel>& channels){    for(auto& c : channels)        ::close(c.GetFd());    for(auto& c : channels)    {        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);        if(rid <= 0)        {            std::cout<<std::endl;            std::cout<<"清理子进程失败..."<<std::endl;            return;        }    }    std::cout<<std::endl;    std::cout<<"成功清理子进程..."<<std::endl;}
登录后复制

  注意这里不能使用一个循环来进行清理,如下面代码是错误的:

代码语言:javascript代码运行次数:0运行复制
void CleanProcesspool(std::vector<Channel>& channels){    for(auto& c : channels)//只使用一次循环    {    ::close(c.GetFd());        pid_t rid = ::waitpid(c.GetWho(),nullptr,0);        if(rid <= 0)        {            std::cout<<std::endl;            std::cout<<"清理子进程失败..."<<std::endl;            return;        }    }    std::cout<<std::endl;    std::cout<<"成功清理子进程..."<<std::endl;}
登录后复制

这是因为在创建子进程时,子进程会继承父进程的文件描述符表,因此在第一个匿名管道创建后,例如父进程的4号文件描述符指向该匿名管道写端,那么在创建第二个子进程时,该子进程会继承4号文件描述符也指向第一个匿名管道写端,因此创建的子进程越多,前面匿名管道写端被指向的就越多,所以仅仅关闭一个进程的写端指向,还有其他的写端指向,所以读端无法读到0,也就无法退出,如下图所示:

【Linux】匿名管道通信场景——进程池
【Linux】匿名管道通信场景——进程池

如果要使用一个循环来清理回收子进程,我们可以从后往前关闭文件描述符,因为最后一个管道写端只有父进程指向。

4. 封装与完整实现

  对于父进程管理进程池我们可以封装一个类来更好的进行管理与实现:

代码语言:javascript代码运行次数:0运行复制
#include <iostream>#include <string>#include <vector>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <sys/wait.h>#include "Task.hpp"#include "Channel.hpp"// masterclass ProcessPool{public:    int InitProcesspool(int num)    {        for (int i = 0; i < num; i++)        {            // 1.创建匿名管道            int pipefd[2] = {0};            int n = pipe(pipefd);            if (n < 0)                return 2;            // 2.创建子进程            pid_t id = fork();            if (id < 0)                return 3;            // 3.建立通信管道,父子进程关闭读端或写端            if (id == 0) // 子进程            {                // 子进程读取,关闭写端                ::close(pipefd[1]);                // dup2                dup2(pipefd[0], 0);                Work();                ::exit(0);            }            // 父进程            // 父进程写入,关闭读端            ::close(pipefd[0]);            channels.emplace_back(pipefd[1], id);        }        return 0;    }    void ExcuteTask()    {        int n = 0;        int count = 10;        while (count--) // 执行10次任务        {            // 1.选择任务,获取任务编号            int tasknum = tm.SelectTask();            // 2.选择子进程,使用轮询选择,派发任务            channels[n++].Send(tasknum);            n %= channels.size();            std::cout << std::endl;            // std::cout<<"**************************"<<std::endl;            std::cout << "*****成功发送" << 10 - count << "个任务*****" << std::endl;            // std::cout<<"**************************"<<std::endl;            // std::cout<<std::endl;            sleep(3);        }    }    void CleanProcesspool()    {        for (auto &c : channels)            ::close(c.GetFd());        for (auto &c : channels)        {            pid_t rid = ::waitpid(c.GetWho(), nullptr, 0);            if (rid <= 0)            {                std::cout << std::endl;                std::cout << "清理子进程失败..." << std::endl;                return;            }        }        std::cout << std::endl;        std::cout << "成功清理子进程..." << std::endl;    }private:    std::vector<Channel> channels;};
登录后复制

main函数:

代码语言:javascript代码运行次数:0运行复制
#include "ProcessPool.hpp"int main(int argc, char* argv[]){    //0.获取应该创建管道个数num个    if(argc!=2)    {        std::cout<<"请输入管道个数."<<std::endl;        return 1;    }    int num = std::stoi(argv[1]);    std::vector<Channel> channels;        ProcessPool* pp = new ProcessPool;    //1.初始化进程池——创建进程池    pp->InitProcesspool(num);        //2.执行任务    pp->ExcuteTask();    //3.任务执行完成,回收子进程    pp->CleanProcesspool();    delete pp;        return 0;}
登录后复制

运行结果如下:

【Linux】匿名管道通信场景——进程池
5. 结语

  

以上就是【Linux】匿名管道通信场景——进程池的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号