一、概述
其实之前在 【linux】 ipc 进程间通信(三)(消息队列 & 信号量) 也了解过相关知识,这里的话只是做个补充
消息队列 提供了一个 从一个进程向另外一个进程发送有类型块数据 的方法每个数据块都被认为是有一个 类型,接收者进程接收的数据块可以有不同的类型值消息队列 也有 管道 一样的不足,就是每个消息的最大长度是有上限的 (MSGMAX)每个消息队列的 总的字节数 也是有上限的 (MSGMNB),系统上 **消息队列 **的总数也有上限 (MSGMNI) 的整数数组:[1, 2, 3, 4] 是一个包含整数类型的数据块。字符串数组:["a", "b", "c"] 是一个包含字符串类型的数据块。二、通信形式struct ipc_perm {key_t __key; /* Key supplied to xxxget(2)*/ uid_t uid; /* Effective UID of owner */ gid_t gid; /* Effective GID of owner */ uid_t cuid;/* Effective UID of creator */ gid_t cgid; /* Effective GID of creator */ unsigned short mode;/*Permissions */unsigned short __seq;/*Sequence number */};
struct msqid_ds{struct ipc_perm msg_perm;struct msg msg_first; /* first message on queue,unused */ struct msg msg_last; /* last message in queue,unused */ __kernel_time_t msg_stime; /* last msgsnd time */ __kernel_time_t msg_rtime; /* last msgrcv time*/ __kernel_time_t msg_ctime; /* last change time */ unsigned long msg_lcbytes; /* Reuse junk fields for 32 bit*/ unsigned long msg_lqbytes; /* ditto */ unsigned short msg_cbytes; /* current number of bytes on queue */ unsigned short msg_qnum; /* number of messages in queue */ unsigned short msg_qbytes; /* max number of bytes on queue */ __kernel_ipc_pid_t msg_lspid; /*pid of last msgsnd */ __kernel_ipc_pid_t msg_lrpid; /* last receive pid */;
这些接口之前在 【Linux】 IPC 进程间通信(三)(消息队列 & 信号量 有做了解,这里就简单阐述一下
1. msgget代码语言:javascript代码运行次数:0运行复制NAME msgget - get a System V message queue identifierSYNOPSIS #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgget(key_t key, int msgflg);RETURN VALUE If successful, the return value will be the message queue identifier (a nonnegative integer), otherwise -1 with errno indicating the error.
参数:
key:某个消息队列的名字msgflg:由九个权限标志构成,其用法和创建文件时使用的 mode 模式标志一样返回值
成功时返回一个非负整数,即该消息队列的标识符。失败时,返回 -12. msgctl代码语言:javascript代码运行次数:0运行复制NAME msgctl - System V message control operationsSYNOPSIS #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgctl(int msqid, int cmd, struct msqid_ds *buf);struct msqid_ds { struct ipc_perm msg_perm; /* Ownership and permissions */ time_t msg_stime; /* Time of last msgsnd(2) */ time_t msg_rtime; /* Time of last msgrcv(2) */ time_t msg_ctime; /* Time of creation or last modification by msgctl() */ unsigned long msg_cbytes; /* # of bytes in queue */ msgqnum_t msg_qnum; /* # number of messages in queue */ msglen_t msg_qbytes; /* Maximum # of bytes in queue */ pid_t msg_lspid; /* PID of last msgsnd(2) */ pid_t msg_lrpid; /* PID of last msgrcv(2) */};RETURN VALUE On success, IPC_STAT, IPC_SET, and IPC_RMID return 0. A successful IPC_INFO or MSG_INFO operation returns the index of the highest used entry in the kernel's internal array recording informationabout all message queues. (This information can be used with repeated MSG_STAT or MSG_STAT_ANY operations to obtain information about all queues on the system.) A successful MSG_STAT or MSG_STAT_ANY operation returns the identifier of the queue whose index was given in msqid.On error, -1 is returned with errno indicating the error.
参数:
msgid:由 msgget 函数返回的消息队列标识码cmd:将要采取的动作(有三个可取值),分别如下:命令
说明
IPC_STAT
把 msqid_ds 结构中的数据设置为消息队列的当前关联值(获取指定消息队列的当前状态和属性)
IPC_SET
在进程权限足够的前提下,把消息队列的当前关联值设置为 msqid_ds 数据结构中给出的值(修改指定消息队列的属性)
IPC_RMID
删除消息队列
buf:属性缓冲区返回值
成功时返回 0,失败返回 -13. msgsnd代码语言:javascript代码运行次数:0运行复制NAME msgrcv, msgsnd - System V message queue operationsSYNOPSIS #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
参数:
msgid:由 msgget 函数返回的消息队列标识码msgp:是一个指针,指针指向准备 发送 的消息msgsz:是 msgp 指向的消息长度,这个长度不含保护消息类型的那个 long int 长整型msgflg:控制着当前消息队列满 或 到达系统上限时将要发送的事情, 一般填 0 即可 (msgflag=IPC_NOWAIT 表示队列为满不等待,返回 EAGAIN 错误)**返回值:**成功返回 0,失败返回 -1
关于消息主体
代码语言:javascript代码运行次数:0运行复制struct msgbuf { long mtype; /* message type, must be > 0 */ char mtext[1]; /* message data */};// 以一个 long int 长整数开始,接收者函数将利用这个长整型确定消息的类型
NAME msgrcv, msgsnd - System V message queue operationsSYNOPSIS #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg)
参数:
msgid:由 msgget 函数返回的消息队列标识码msgp:是一个指针,指针指向准备 接收 的消息msgsz:是 msgp 指向的消息长度,这个长度不含保护消息类型的那个 long int 长整型msgtype:实现接收消息的类型,也可以模拟优先级的简单形式进行接收msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事**返回值:**成功返回实际 放到缓冲区里的字符个数,失败返回 -1
msgflg 标志位 – 了解
msgtype = 0:返回队列第一条信息msgtype > 0:返回队列第一条信息等于 msgtype 的信息msgtype 0 & msgflg = MSG_EXCEPT:接收类型不等于 msgtype 的第一条消息5. 基本通信代码Makefile 文件,代码如下:
代码语言:javascript代码运行次数:0运行复制.PHONY:allall:client serverclient:Client.ccg++ -o $@ $^ -std=c++17server:Server.ccg++ -o $@ $^ -std=c++17.PHONY:cleanclean:rm -f client server
先编写一个最基本的头文件 MsgQueue.hpp 封装消息队列,然后用 Server 类继承,代码如下:
代码语言:javascript代码运行次数:0运行复制#ifndef MSGQUEUE_HPP#define MSGQUEUE_HPP#include <iostream>#include <string>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>const std::string pathName = "/tmp";const int pro_id = 0x1314;const int default_fd = -1;class MsgQueue{public: MsgQueue(){} MsgQueue(int size): _msgfd(default_fd) {} // 创建消息队列 void Create() { // 获取唯一键值 key_t key = ftok(pathName.c_str(), pro_id); if(key == -1) { std::cerr << "ftok error" << std::endl; exit(1); } // 按照 16 进制打印 键值 std::cerr << "key: " << key << std::endl; _msgfd = msgget(key, IPC_CREAT | IPC_EXCL | 0600); if(_msgfd == -1) { std::cerr << "msgget error" << std::endl; exit(2); } std::cout << "Create success: " << _msgfd << std::endl; } // 删除消息队列 void Destroy() { int n = msgctl(_msgfd, IPC_RMID, nullptr); if(n == -1) { std::cerr << "msgctl error" << std::endl; exit(3); } std::cout << "Destroy success"<< std::endl; } ~MsgQueue() {}private: int _msgfd;};class Server: public MsgQueue{public: Server() { MsgQueue::Create(); } ~Server() { MsgQueue::Destroy(); }};#endif
Server.cc 测试代码如下:
代码语言:javascript代码运行次数:0运行复制#include "MsgQueue.hpp"int main(){ Server Server; return 0;}
可能会出现 ftok error 的情况,那么就需要改变一下声明的 PROJID 和 PATHNAME,然后重新编译运行应该就可以了,如下:
如果我们不让消息队列创建之后就立马删除,那么就注释一下 Server 的析构函数,再运行结果如下:
通过 ipcs -q 可以查看创建的消息队列,也可以类似于【信号量】使用将其删除,如下:
结论:
消息队列的生命周期是随内核的消息队列支持全双工现在正式开始我们的通信代码,如下:
MsgQueue.hpp代码语言:javascript代码运行次数:0运行复制
#ifndef MSGQUEUE_HPP#define MSGQUEUE_HPP#include <iostream>#include <string>#include <cstring>#include <sys/types.h>#include <sys/ipc.h>#include <sys/msg.h>const std::string pathName = "/tmp";const int pro_id = 0x1314;const int default_fd = -1;const int default_size = 1024;#define GET_MSGQUEUE (IPC_CREAT)#define CREATE_MSGQUEUE (IPC_CREAT | IPC_EXCL | 0666)class MsgQueue{// 有类型数据块 struct msgbuf { long mtype; char mtext[default_size]; };public: MsgQueue(){} MsgQueue(int size): _msgfd(default_fd){} // 创建消息队列 void Create(int flag) { // 获取唯一键值 key_t key = ftok(pathName.c_str(), pro_id); if(key == -1) { std::cerr << "ftok error" << std::endl; exit(1); } // 按照 16 进制打印 键值 std::cerr << "key: " << key << std::endl; _msgfd = msgget(key, flag); if(_msgfd == -1) { std::cerr << "msgget error" << std::endl; exit(2); } std::cout << "Create success: " << _msgfd << std::endl; } // 发送消息 void Send(int type, const std::string &text) { struct msgbuf msg; memset(&msg, 0, sizeof(msg)); msg.mtype = type; memcpy(msg.mtext, text.c_str(), text.size()); // 注意;填长度不能直接写成 sizeof(msg); int n = msgsnd(_msgfd, &msg, sizeof(msg.mtext), 0); if(n == -1) { std::cerr << "msgnd error" << std::endl; return ; } } // 接收消息:参数设置为输出型参数 void Recv(int type, std::string &text) { struct msgbuf msg; memset(&msg, 0, sizeof(msg)); int n = msgrcv(_msgfd, &msg, sizeof(msg.mtext), type, 0); if(n == -1) { std::cerr << "msgrcv error" << std::endl; return ; } msg.mtext[n] = '\0'; text = msg.mtext; } // 获取消息队列属性 void GetAttr() { struct msqid_ds outbuffer; int n = msgctl(_msgfd, IPC_STAT, &outbuffer); if(n == -1) { std::cerr << "msgctl error" << std::endl; return ; } std::cout << "outbuffer.msg_perm__key: " << std::hex << outbuffer.msg_perm.__key << std::endl; } // 删除消息队列 void Destroy() { int n = msgctl(_msgfd, IPC_RMID, nullptr); if(n == -1) { std::cerr << "msgctl error" << std::endl; exit(3); } std::cout << "Destroy success"<< std::endl; } ~MsgQueue() {}private: int _msgfd;};// 需要定义消息类型#define MSG_TYPE_CLIENT 1#define MSG_TYPE_SERVER 2class Server: public MsgQueue{public: Server() { MsgQueue::Create(CREATE_MSGQUEUE); std::cout << "server create success" << std::endl; MsgQueue::GetAttr(); } ~Server() { MsgQueue::Destroy(); }};class Client: public MsgQueue{public: Client() { MsgQueue::Create(GET_MSGQUEUE); std::cout << "client create success" << std::endl; } ~Client() { // MsgQueue::Destroy(); }};#endif
Server.cc
代码语言:javascript代码运行次数:0运行复制#include "MsgQueue.hpp"int main(){ std::string text; Server server; while(true) { //如果消息队列为空,阻塞等待 server.Recv(MSG_TYPE_CLIENT, text); std::cout << "Received: " << text << std::endl; if(text == "exit") { break; // 省去手动操作 } } return 0;}
Client.cc
代码语言:javascript代码运行次数:0运行复制#include "MsgQueue.hpp"int main(){ Client client; while (true) { std::string input; std::cout << "Please input message: "; std::getline(std::cin, input); client.Send(MSG_TYPE_CLIENT, input); if(input == "exit") { break; // 省去手动操作 } } return 0;}
运行结果如下:
? 责任链(Chain of Responsibility)模式的定义:责任链模式也叫职责链模式,为了避免请求发送者与多个请求处理者 耦合 在一起,将所有请求的处理者通过前一对象记住其下一个对象的引用而连成一条链;当有请求发生时,可将请求沿着这条链传递,直到有对象处理它为止。
在责任链模式中,客户只需要将请求发送到责任链上即可,无须关心请求的处理细节和请求的传递过程,所以责任链将请求的发送者和请求的处理者解耦了新需求:
client 发送给 server 的输入内容,拼接上时间,进程 pid 信息server 收到的内容 持久化保存 到文件中文件的内容如果过大,要进行 切片保存 并在指定的目录下 打包保存,命令自定义解决方案:责任链模式
每个处理者都对请求进行检查,以决定是否处理它。如果处理者能够处理该请求,它就处理它否则,它将请求传递给链中的下一个处理者。这个模式使得多个对象都有机会处理请求,从而 避免了请求的发送者和接收者之间的紧耦合
责任链原理示意图:
责任链 UML 类图
基于上面基本通信代码的基础上,做的改进,如下:
ChainOfResponsibility.hpp代码语言:javascript代码运行次数:0运行复制
#ifndef CHAIN_OF_RESPONSIBILITY_HPP#define CHAIN_OF_RESPONSIBILITY_HPP#include <iostream>#include <memory>#include <cstring>#include <string>#include <sstream>#include <ctime>#include <sys/types.h>#include <unistd.h>#include <filesystem>#include <fstream>#include <sys/wait.h>// 责任链基类class HandlerText{public: virtual void Excute(const std::string &text) = 0; void SetNext(std::shared_ptr<HandlerText> next) { _next = next; } void Enable() { _enable = true; } void Disable() { _enable = false; } virtual ~HandlerText() {}protected: // protected 需要被子类继承 std::shared_ptr<HandlerText> _next; // 下一个责任链节点 bool _enable = true; // 是否启用该节点};// 对文本进行格式化处理class HandlerTextFormat : public HandlerText{public: void Excute(const std::string &text) override { std::string format_result = text + "\n"; // 初始,避免已经被处理过 if(_enable) // 该节点被开始,进行处理 { std::stringstream ss; ss << time(nullptr) << "-" << getpid() << "-" << text << "\n"; format_result = ss.str(); std::cout << "step 1: 格式化消息: " << text << " 结果: " << format_result << std::endl; } if(_next){ _next->Excute(format_result); // 将处理结果表现在 text 内部 传递给下一个节点 } else{ std::cout << "到达责任链处理结尾, 完成责任链处理" << std::endl; } }};// 文件的基本信息:文件路径, 文件名std::string defaultfilepath = "./tmp/";std::string defaultfilename = "test.log";// 对文本进行文件保存class HandlerTextSaveFile : public HandlerText{public: HandlerTextSaveFile(const std::string &filepath = defaultfilepath, const std::string &filename = defaultfilename) : _filepath(filepath), _filename(filename) { // 形成默认目录名 filesystem if(std::filesystem::exists(_filepath)) return ; try { std::filesystem::create_directories(_filepath); } catch(std::filesystem::filesystem_error const& e) { std::cerr << e.what() << "\n"; } } void Excute(const std::string &text) override { if(_enable) { // 保存到文件中 std::string file = _filepath + _filename; std::ofstream ofs(file, std::ios::app); if(!ofs.is_open()){ std::cerr << "open file error" << file << std::endl; return ; } ofs << text; ofs.close(); std::cout << "step2: 保存消息" << text << " 到文件:" << file << std::endl; } if(_next){ _next->Excute(text); // 将处理结果表现在 text 内部 传递给下一个节点 } else{ std::cout << "到达责任链处理结尾, 完成责任链处理" << std::endl; } }private: std::string _filepath; std::string _filename;};// 对文件内容长度进行检查,如果长度过长,对文件内容进行打包备份const int defaultmaxline = 5; // 最大行数class HandlerTextBackup : public HandlerText{public: HandlerTextBackup(const std::string &filepath = defaultfilepath, const std::string &filename = defaultfilename, const int &maxline = defaultmaxline) : _filepath(filepath), _filename(filename), _maxline(maxline) { } void Excute(const std::string &text) override { if(_enable) // 该节点被开始,进行处理 { // 该节点开启,对文件进行检查,如果超范围,就要切片并且进行打包备份 std::string file = _filepath + _filename; std::cout << "Step 3: 检查文件: " << file << " 大小是否超范围" << std::endl; if(IsOutOfRange(file)) { // 如果超了范围,需要切片备份 std::cout << "目标文件超范围, 并且进行切片备份" << file << std::endl; BackUp(file); } } if(_next) { _next->Excute(text); // 将处理结果表现在 text 内部 传递给下一个节点 } else { std::cout << "到达责任链处理结尾, 完成责任链处理" << std::endl; } // std::cout << "备份文本:" << text << std::endl; }private: bool IsOutOfRange(const std::string &file) { std::ifstream ifs(file); if(!ifs.is_open()) { std::cerr << "open file error" << std::endl; return false; } int lines = 0; std::string line; while(std::getline(ifs, line)){ lines++; } ifs.close(); return lines > _maxline; } void BackUp(const std::string &file) { // 1589234 std::string suffix = std::to_string(time(nullptr)); // "./tmp/test.txt" --> "./tmp/test.txt.1589234" std::string backup_file = file + "." + suffix; // 备份文件名 // 只需要文件名,不需要路径 std::string src_file = _filename + "." + suffix; std::string tar_file = src_file + ".tgz"; // 切片备份并打包 pid_t pid = fork(); if(pid == 0) { // child // 1. 先对文件进行重命名,Linux 上对文件重命名是原子性的 // "test.txt" --> "text.txt.1314132" std::string backup_file = file + "." + std::to_string(time(nullptr)); // 2. 让子进程进行数据备份 std::filesystem::rename(file, backup_file); std::cout << "step 4: 备份文件: " << file << " 到文件: " << backup_file << std::endl; // 3. 对备份文件进行打包,打包成为 .tgz,需要使用 exec* 系统调用 // 3.1 对备份文件进行打包 .tgz // "test.txt" --> "text.txt.1314132" --> "text.txt.1314132.tgz" // 3.1.1 更改工作路径(chdir) std::filesystem::current_path(_filepath); // 3.1.2 调用 tar 命令进行打包 execlp("tar", "tar", "-czf", tar_file.c_str(), src_file.c_str(), nullptr); exit(1); // exec* 系统调用失败,返回 1 } // parent int status; pid_t rid = waitpid(pid, &status, 0); if(rid > 0) { if(WIFEXITED(status) && WEXITSTATUS(status) == 0) { // 打包成功,删除源文件 std::filesystem::remove(backup_file); std::cout << "step 5: 删除备份文件: " << backup_file << std::endl; } } }private: std::string _filepath; std::string _filename; int _maxline; // 最大行数};// 责任链入口类class HandlerEntry{public: HandlerEntry() { // 构造责任链节点 _format = std::make_shared<HandlerTextFormat>(); _save = std::make_shared<HandlerTextSaveFile>(); _backup = std::make_shared<HandlerTextBackup>(); // 设置责任链节点的处理顺序 --(链表) _format->SetNext(_save); _save->SetNext(_backup); } void EnableHandler(bool isformat, bool issave, bool isbackup) { isformat ? _format->Enable() : _format->Disable(); issave ? _save->Enable() : _save->Disable(); isbackup ? _backup->Enable() : _backup->Disable(); } void Run(const std::string& text) { _format->Excute(text); } ~HandlerEntry() { }private: std::shared_ptr<HandlerText> _format; std::shared_ptr<HandlerText> _save; std::shared_ptr<HandlerText> _backup;};#endif
Server.cc
代码语言:javascript代码运行次数:0运行复制#include "MsgQueue.hpp"#include "ChainOfResponsibility.hpp"int main(){ std::string text; Server server; HandlerEntry he; he.EnableHandler(true, true, true); //要哪个功能,就写 true while(true) { //如果消息队列为空,阻塞等待 server.Recv(MSG_TYPE_CLIENT, text); std::cout << "Received: " << text << std::endl; if(text == "exit") { break; // 省去手动操作 } // 加工处理数据,就可以采用责任链模式 he.Run(text); } return 0;}
结果如下:
责任链模式是一种对象行为型模式,其主要优点如下:
降低了对象之间的耦合度。该模式使得一个对象无须知道到底是哪一个对象处理其请求以及链的结构,发送者和接收者也无须拥有对方的明确信息。增强了系统的可扩展性。可以根据需要增加新的请求处理类,满足开闭原则。增强了给对象指派职责的灵活性。当工作流程发生变化,可以动态地改变链内的成员或者调动它们的次序,也可动态地新增或者删除责任。责任链简化了对象之间的连接。每个对象只需保持一个指向其后继者的引用,不需保持其他所有处理者的引用,这避免了使用众多的 if 或者 if···else 语句。责任分担。每个类只需要处理自己该处理的工作,不该处理的传递给下一个对象完成,明确各类的责任范围,符合类的单一职责原则。其主要缺点如下:
不能保证每个请求一定被处理。由于一个请求没有明确的接收者,所以不能保证它一定会被处理,该请求可能一直传到链的末端都得不到处理。对比较长的职责链,请求的处理可能涉及多个处理对象,系统性能将受到一定影响。职责链建立的合理性要靠客户端来保证,增加了客户端的复杂性,可能会由于职责链的错误设置而导致系统出错,如可能会造成循环调用。其应用场景如下:
有多个对象可以处理一个请求,哪个对象处理该请求由运行时刻自动确定。可动态指定一组对象处理请求,或添加新的处理者。在不明确指定请求处理者的情况下,向多个处理者中的一个提交请求。以上就是【Linux】责任链模式和消息队列的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号