作者主页: 作者主页
本篇博客专栏:Linux
创作时间 :2024年6月20日
进程间通信的目的:
- 数据传输:一个进程需要将它的数据发送给另一个进程
- 资源共享:多个进程之间共享资源
- 通知事件:一个进程需要向另一个或者一组进程发送消息,通知他们发生了某种事件(如进程终止时要通知父进程)
- 进程控制:有些进程完全控制另一些进程的执行,此时控制进程希望能拦截另一个进程的所有陷入和异常,并能够即使知道他们的状态改变
进程间通信的发展
- 管道
- System V进程间通信
- POSIX进程间通信
进程间通信的前提就是先让不同的进程看到同一份(操作系统)资源(一段内存),进程间通信一定是某个进程先需要通信,让OS创建一个共享资源,此时OS必须提供很多系统调用,OS创建的共享资源的不同,系统调用的接口也就不同,所有进程的通信会有不同的种类
进程间通信分类
管道:
- 匿名管道
- pipe管道
System V IPC
- System V 消息队列
- System V 共享内存
- System V 信号量
POSIX IPC
- 消息队列
- 共享内存
- 信号量
- 互斥量
- 条件变量
- 读写锁
管道
匿名管道:
一个进程将同一个文件打开两次,一次以写方式打开,另一次以读方式打开。此时会创建两个struct file,而文件的属性会共用,不会额外创建。
如果此时有创建了子进程,子进程会继承父进程的文件描述表,指向同一个文件,我们把上面分子进程都看到的文件,叫做管道文件,管道只允许单向通信,管道里的内容不需要刷新的磁盘。
未来要用父进程写,子进程读的话,在fork之后,各自关闭掉不用的文件描述符即可。 不用的描述符建议关闭,因为未来可能会误用,或者导致文件描述符泄露。
功能:创建匿名管道
参数:
pipefd[2]:文件描述符数组,其中fd[0]表示读端, fd[1]表示写端。它是输出型参数。
返回值:成功返回0,失败返回错误代码
匿名管道的特性
面向字节流
用来进行具有血缘关系的进程,进行进程间通信(IPC)
文件的生命周期,随进程!管道也是!
单行的数据通信
管道自带同步互斥等保护机制!(同步互斥就是指对于同一个管道,同一时刻只允许一个进程或者线程进行操作,对于保证管道的正确性和稳定性都非常重要它使得多个进程或者线程在有序安全的前提下去利用管道进行有效的数据传输和通信)
使用管道通信的demo
上图是创建管道,pipe的使用的例子。
下面是测试的完整代码:
#include <iostream>
#include <unistd.h>
#include <string>
#include <cstdlib>
#include <sys/types.h>
#include <wait.h>using namespace std;// Father -> read
// child -> write
int main()
{int fds[2] = {0};int n = pipe(fds); // fds输出型参数if (n != 0){cerr << "pipe error" << endl; // cerr打印错误消息,属于2号,而cout属于标准输出,2是标准错误}pid_t id = ::fork();if (id < 0){cerr << "fork error" << endl;return 2;}else if (id == 0){// 子进程// 关闭不需要的fd// 系统函数尽量带上::做区分// 一般f[0]代表管道的读端,f[1]代表着写端::close(fds[0]);int cnt = 0;while (true){// 子进程写入string message = "hello bit,hello ";message += to_string(getpid());message += ", ";message += to_string(cnt);// fds[1]::write(fds[1], message.c_str(), message.size());cnt++;sleep(1);//break;}exit(0);}else{// 父进程// 关闭不需要的fd::close(fds[1]);// 从管道里读char buffer[1024];while (true){ssize_t n = ::read(fds[0], buffer, 1024);if (n > 0){buffer[n] = 0;cout << "child -> father " << buffer << endl;}else if (n == 0){//如果写段关闭//读端读完管道内部的数据,在读取的时候就会收到返回值0,标识对端关闭,也表示读到的文件结尾cout << "n:" << n << endl;cout << "child quit? me too" << endl;//break;}close(fds[0]);break;cout<<endl;}int status = 0;pid_t rid = waitpid(id, nullptr, 0);cout << "father wait child success" << rid << "exit code" << ((status<<8)&0xFF) << ",exit sig:"<<(status&0x7F) << endl;}return 0;
}
管道的四种情况:
- 如果此时管道是空的,并且写段fd还没有关闭,此时读取条件不具备,都进程会被阻塞,读进程会等待,直到写段写入数据
- 如果管道被写满,并且读端fd不读且没有关闭,此时写进程会被阻塞,知道数据被读取
- 如果读端一直在读,并且写段关闭了wfd,读端read返回值会读到0,表示读到文件结尾
- 如果读端rfd直接关闭,写段wfd一直写入,那么写端会被OS直接用13信号关闭掉,相当于进程出现了异常
进程池的实现:
hpp:
Processplool.hpp:
#include <iostream> #include <string> #include <stdlib.h> #include <vector> #include <unistd.h> #include <sys/types.h> #include <functional> #include <sys/wait.h> #include "Task.hpp" #include "Channel.hpp"using work_t = std::function<void()>; using namespace std;// const int num = 5;enum {OK,UsageError,PipeError,ForkError };// 子进程干活 void Worker() {// 子进程干的活while (true){int cmd = 0;int n = ::read(0, &cmd, sizeof(cmd));if (n == sizeof(cmd)){tm.Excute(cmd);// cout << "cmd:" << cmd << endl;}else if (n == 0){cout << "pid:" << getpid() << " quit..." << endl;break;}else{cout << "读取操作发生错误" << endl;}} }class ProcessPool { public:ProcessPool(int n, work_t w): processnum(n), work(w){}// 调试打印函数void DebugPrint(){for (const auto &c : channels){cout << c.Name() << endl;}}int InitProcesspool(){for (int i = 0; i < processnum; i++){int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0){cerr << "pipe errno" << endl;return 2;}pid_t id = fork();if (id < 0)return 3; // 创建子进程失败// 建议通信通道if (id == 0){::close(pipefd[1]); // 读取::dup2(pipefd[0], 0); // 让子进程直接从0里面读// 子进程Worker();::exit(0);}// 父进程::close(pipefd[0]); // writechannels.emplace_back(pipefd[1], id); // 自动创建一个管道并且放进channels中}return OK;}void DispatchTask(){int num = 20;// 派发任务int who = 0;while (num--){// 1、选择一个任务int task = tm.SelectTask();// b、选择一个子进程Channel &curr = channels[who++];who %= channels.size();cout << "############################" << endl;cout << "send:" << task << " to " << curr.Name() << ",任务还剩:" << num << endl;cout << "############################" << endl;// c、派发任务curr.Send(task);sleep(1);}}void CleanProceePool(){// 3.退出进程池for (auto &x : channels){x.Close(); // 关闭}for (auto &x : channels){pid_t rid = ::waitpid(x.Id(), nullptr, 0);if (rid > 0){cout << "child" << rid << "wait success.." << endl;}}}private:vector<Channel> channels;work_t work;int processnum; };
Task.hpp
#pragma once#include <iostream> #include <unordered_map> #include <functional> #include <time.h> #include <unistd.h> #include <sys/types.h>using namespace std; using task_t = function<void()>;void Download() {cout << "我是下载任务" << "pid:" << getpid() << endl; } void Log() {cout << "我是日制任务" << "pid:" << getpid() << endl; } void Sql() {cout << "我是数据库同步任务" << "pid:" << getpid() << endl; }static int number = 0; class TaskManger { public:TaskManger(){srand(time(nullptr));InsertTask(Download);InsertTask(Log);InsertTask(Sql);}void InsertTask(task_t t){tasks[number++] = t;}void Excute(int number){if (tasks.find(number) == tasks.end())return;tasks[number]();}int SelectTask(){return rand() % number;}~TaskManger(){}private:unordered_map<int, task_t> tasks; };TaskManger tm;
Channel.hpp
#ifndef __CHANNEL_HPP_ #define __CHANNEL_HPP_ #include<iostream> #include<unistd.h> #include<string.h> using namespace std; // 先描述再组织 class Channel { public:Channel(int wfd, pid_t who) : _wfd(wfd), _who(who){_name = "Channel- " + to_string(wfd) + " - " + to_string(who);}string Name() const{return _name;}void Send(int cmd){::write(_wfd, &cmd, sizeof(cmd));}void Close(){::close(_wfd);}pid_t Id(){return _who;}~Channel(){// cout << "~Channel" << endl;}private:int _wfd;string _name;pid_t _who; };#endif
Main.cc
#include "Processpool.hpp" #include "Task.hpp"void Usage(string proc) {cout << "Usage:" << proc << "process-num" << endl; }int main(int argc, char *argv[]) {if (argc != 2){Usage(argv[0]);return 1;}try{int num = stoi(argv[1]);ProcessPool *pp=new ProcessPool(num,Worker);pp->InitProcesspool();//1.初始化pp->DispatchTask(); //2.轮询派发任务pp->CleanProceePool();//3.关闭进程// sleep(100);delete pp;return 0;}catch (const std::invalid_argument &e){cerr << "命令行参数不是有效的整数形式: " << e.what() << endl;return 1;}catch (const std::out_of_range &e){cerr << "命令行参数超出整数表示范围: " << e.what() << endl;return 1;} }
这里我们还要补充一个知识:
正如上图,其实我们每次创建一个子进程的时候,子进程都会继承父进程的文件描述符表,所以我们继承的时候,父进程中原本指向前面的管道的那个描述符也会被继承,这样就会有多个文件描述符指向那个进程,这样会导致后面想要在一个进程执行完任务后关闭他的时候无法关闭,所以我们需要在每次创建一个子进程之后关闭掉继承下来的写端
最后:
十分感谢你可以耐着性子把它读完和我可以坚持写到这里,送几句话,对你,也对我:
1.一个冷知识:
屏蔽力是一个人最顶级的能力,任何消耗你的人和事,多看一眼都是你的不对。
2.你不用变得很外向,内向挺好的,但需要你发言的时候,一定要勇敢。
正所谓:君子可内敛不可懦弱,面不公可起而论之。
3.成年人的世界,只筛选,不教育。
4.自律不是6点起床,7点准时学习,而是不管别人怎么说怎么看,你也会坚持去做,绝不打乱自己的节奏,是一种自我的恒心。
5.你开始炫耀自己,往往都是灾难的开始,就像老子在《道德经》里写到:光而不耀,静水流深。
最后如果觉得我写的还不错,请不要忘记点赞✌,收藏✌,加关注✌哦(。・ω・。)
愿我们一起加油,奔向更美好的未来,愿我们从懵懵懂懂的一枚菜鸟逐渐成为大佬。加油,为自己点赞!