【Swoole系列3.4】进程间通信

2022年8月4日 271点热度 0人点赞 0条评论

进程间通信

正常情况下,进程的用户空间是互相独立的,一般也是不能互相访问的,就像我们开启了两个一样的游戏,为了登录两个不同的帐号一起挂机。但是,系统空间是公共的区域,我们可以让不同的两个进程同时访问外部的另一个进程,比如说两套程序一起访问 Redis ,这就是一种进程间的通信。也可以访问外部的磁盘数据,共同操作一个文件,不过就像很多人会接触过的一个概念,那就是共同操作相同的数据可能会带来竞争的问题,导致数据不一致的情况发生。

我们今天先不讨论并发竞争的问题,而是主要看看在 Swoole 中,如何不通过第三方来实现两个进程间的通讯。其实这也是大部分操作系统都提供的功能,也是很多其它语言在使用的解决方案。

进程间通讯也叫做 IPC ,之前我们已经提到过一些 IPC 相关的内容,主要使用的就是 UnixSocket 来实现的。

管道 Pipe

管道方式是非常常用的一种方式,比如说我们打印一个进程回调函数中返回的 Process 对象参数,就可以看到这样的内容。

new Swoole\Process(function (Swoole\Process $worker) {
    var_dump($worker);
});
// object(Swoole\Process)#3 (6) {
//   ["pipe"]=>
//   int(6)
//   ["msgQueueId"]=>
//   NULL
//   ["msgQueueKey"]=>
//   NULL
//   ["pid"]=>
//   int(2081)
//   ["id"]=>
//   NULL
//   ["callback":"Swoole\Process":private]=>
//   object(Closure)#4 (1) {
//     ["parameter"]=>
//     array(1) {
//       ["$worker"]=>
//       string(10) "<required>"
//     }
//   }
// }

注意到最上方的 pipe 数据没有,它就是一个管道 ID 。管道是啥?ps -ef | grep php 这个命令熟悉吧,中间的那个 | 就是让这两个命令形成了一个管道调用。前面的输出成为后面的输入。然后呢,我们的进程之间也可以利用这个能力来实现进程间的通讯。

$workers = [];
for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) {
        var_dump($worker);
        $data = $worker->read();
        echo "Child {$worker->pid}:来自 Master 的数据 - \"{$data}\"。", PHP_EOL;
        $worker->write("发送给领导,来自 {$worker->pid} 的数据,通过管道 {$worker->pipe} !");
    });
    $process->start();
    $workers[$process->pid] = $process;
}

foreach ($workers as $pid => $process) {
    $process->write("{$pid},你好!");
    $data = $process->read();
    echo "Master:来自子进程 {$pid} 的数据 - \"{$data}\"", PHP_EOL;
}

// [root@localhost source]# php 3.4进程间通信.php
// Child 2076:来自 Master 的数据 - "2076,你好!"。
// Master:来自子进程 2076 的数据 - "发送给领导,来自 2076 的数据,通过管道 4 !"
// Child 2077:来自 Master 的数据 - "2077,你好!"。
// Master:来自子进程 2077 的数据 - "发送给领导,来自 2077 的数据,通过管道 6 !"

看出来这里的门道了吧,父子进程之间其实在底层是通过 UnixSocket 进行通讯的,我们可以在父进程使用子进程对象的 write() 方法向这个指定的子进程写数据,然后在子进程的回调函数的 Process 的 read() 方法读取数据。反过来也是可以的,子进程的回调函数中 write() 然后父进程通过子进程对象的 read() 方法读取这条数据。

父子进程之间没问题,那么同样的两个子进程之间可以吗?拿到对象了就可以嘛,我们的 write() 方法都是针对指定的那个子进程对象的。

$process1 = new Swoole\Process(function (Swoole\Process $worker) {
    while(1){
        $data = $worker->read();
        if($data){
            echo "Child {$worker->pid}:接收到的数据 - \"{$data}\"。", PHP_EOL;
        }
    }
});
$process1->start();

$messages = [
    "Hello World!",
    "Hello Cat!",
    "Hello King",
    "Hello Leon",
    "Hello Rose"
];
$process2 = new Swoole\Process(function (Swoole\Process $worker) use($process1, $messages) {
    foreach($messages as $msg){
        sleep(1);
        $process1->write("来自 {$worker->pid} {$msg} !");
    }
    sleep(1);
    $worker->kill($process1->pid, SIGKILL);
});
$process2->start();
\Swoole\Process::wait(true);
\Swoole\Process::wait(true);

//  [root@localhost source]# php 3.4进程间通信.php
//  Child 2102:接收到的数据 - "来自 2103 Hello World! !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello Cat! !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello King !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello Leon !"。
//  Child 2102:接收到的数据 - "来自 2103 Hello Rose !"。

在这段测试代码中,第一个 process1 的回调操作中,我们一直循环并查看 read() 有没有值。第二个 process2 中则是通过 use 把 process1 对象传递进来了,然后给这个 process1 对象循环间隔 1 秒 write() 消息。

注意我们要回收这两个进程,于是直接挂起主进程。在 process2 中,最底下还有一个 kill 操作,目的是关闭上面那个一直在死循环挂起的 process1 进程。这也是一种进程间通信方式,最后我们也会说。

如果没问题的话,下面注释中的测试效果是一条一条出现的,每条打印间隔 1 秒。

协程 exportSocket

除了上述普通的进程间管道通信外,还有一种协程化的进程间通信。协程的内容我们放到下个大章节才会讲,所以这里我们就简单了解一下。

$workers = [];
for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) {
        $socket = $worker->exportSocket();
        $data = $socket->recv();
        echo "Child {$worker->pid}:来自 Master 的数据 - \"{$data}\"。", PHP_EOL;
        $socket->send("发送给领导,来自 {$worker->pid} 的数据,通过管道 {$worker->pipe} !");
    }, false, SOCK_DGRAM, true);
    $process->start();
    $workers[$process->pid] = $process;
}
foreach ($workers as $pid => $process) {
    \Swoole\Coroutine\run(function () use ($pid, $process) {
        $socket = $process->exportSocket();
        $socket->send("{$pid},你好!");
        $data = $socket->recv();
        echo "Master:来自子进程 {$pid} 的数据 - \"{$data}\"", PHP_EOL;
    });
}
foreach ($workers as $pid => $worker) {
    \Swoole\Process::wait();
}

//  [root@localhost source]# php 3.4进程间通信.php
//  Child 2062:来自 Master 的数据 - "2062,你好!"。
//  Master:来自子进程 2062 的数据 - "发送给领导,来自 2062 的数据,通过管道 4 !"
//  Child 2063:来自 Master 的数据 - "2063,你好!"。
//  Master:来自子进程 2063 的数据 - "发送给领导,来自 2063 的数据,通过管道 6 !"

其实差别不大,只是实例化 Process 的时候,我们需要将它最后一个参数设置为 true 。这个参数的意思在这个进程的回调函数中启用协程能力,开启之后就可以在这个子进程中直接使用协程相关的 API 。

在这里,我们使用的是 exportSocket() 方法先获得了一个 Swoole\Coroutine\Socket 模块。这东西大家见过的,之前我们写异步 UDP 服务的时候就用过。从名字就能看出,它是一个 Socket 编程组件,其实说白了,就是直接返回底层的 UnixSocket 操作相关接口给我们使用了,因此,如果你有 Socket 相关编程经验的话,那就相当轻松了。

在这段测试代码中,其实就是变成了使用 exportSocket() 方法返回的 Swoole\Coroutine\Socket 对象中的 send() 和 recv() 方法来操作数据。其它的和我们上面学习的 read() 和 write() 没什么区别。

队列

除了管道,或者说就是 UnixSocket 方式之外,还有一种方式就是队列,或者也可以叫做 消息队列 。它使用的是另一种 IPC 模式,也就是我们之前讲过的 sysvmsg 模式。你可以把它当成是进程中的一个数据结构,也就是我们学习数据结构时的那个队列。

for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) {
        while($msg = $worker->pop()) {
            if ($msg === false) {
                break;
            }
            echo "Child {$worker->pid}:来自 Master 的数据 - \"{$msg}\"。", PHP_EOL;
            sleep(1);
        }
    });
    $process->useQueue(12);
    // $process->useQueue(1, 1);
    // $process->useQueue(1, 1 | \Swoole\Process::IPC_NOWAIT);
    $process->start();
}
$messages = [
    "Hello World!",
    "Hello Cat!",
    "Hello King",
    "Hello Leon",
    "Hello Rose"
];

foreach ($messages as $msg){
    $process->push($msg);
}

\Swoole\Process::wait(true);
\Swoole\Process::wait(true);

//  [root@localhost source]# php 3.4进程间通信.php
//  Child 2071:来自 Master 的数据 - "Hello World!"。
//  Child 2070:来自 Master 的数据 - "Hello Cat!"。
//  Child 2071:来自 Master 的数据 - "Hello King"。
//  Child 2070:来自 Master 的数据 - "Hello Leon"。
//  Child 2071:来自 Master 的数据 - "Hello Rose"。

对于消息队列来说,我们只需要在创建子进程后设置一个 useQueue() 方法,它的参数第一个是标识 Key ,第二个是争抢模式,如果设置为 1 则是普通的队列消费,只有一个进程会去消费队列中的数据。如果设置为 2 的话,所有的子进程会进入争抢模式,就像我们这里输出的内容一样。

那么能指定给一个子进程发送队列信息吗?可能真不行,队列是公共的,谁都可以来消费。因此,我们只需要给一个子进程对象 push() 数据,就可以实现所有子进程对象的消费。

另外,在设置第二个参数的时候,我们还可以设置一个 \Swoole\Process::IPC_NOWAIT 常量,可以将队列设置为非阻塞的,比如上面这个测试代码运行完成后,主进程还是在挂起状态,因为我们的子进程中还在等待 pop() 数据。如果设置了 IPC_NOWAIT 的话,pop() 为空或 push() 已满的状态下,进程将不在阻塞。这个大家可以自己动手试试一试哦。

信号

最后就来说说信号通信。这东西吧,就是 Linux 自带的一个东西。干嘛用的呢?当我们执行一个程序的时候,如果想停止,一般会 Ctrl+c 一下吧,其实这个操作就是给这个程序发送了一个要关闭它的信号。同样地,我们 kill -9 也是在向程序发送信号,另外重启 php-fpm 的命令 kill -USR2 php-fpm 其实也是在向它发送信号。

一般来说,信号都是通过 kill 命令传达的,大部分情况下,我们是做一些终止、重启操作的。从名字也能看出来嘛,杀死进程。但不同的信号其实又有不同的含义,比如说 -USR2 表示的用户自定义信号2,我们可以监听这个信号并进行一些特殊的操作。

关于信号的内容其实远比我们看到的要复杂,同时也是 Unix 类操作系统的重要知识。但我们早就已经应用过了,还记得异步 wait() 进程的代码吧。

Swoole\Process::signal(SIGCHLD, function ($sig) {
    //必须为false,非阻塞模式
    while ($ret = Swoole\Process::wait(false)) {
        echo "PID={$ret['pid']}\n";
    }
});

这就是在监听 SIGCHLD 这个信号,表示的是子进程状态改变时的操作。今天我们再弄个复杂一点的内容,让一个子进程发送消息给父进程,然后父进程关闭所有子进程结束程序运行。

$ppid = getmypid();
$workers = [];
for ($i = 1; $i < 3; $i++) {
    $process = new Swoole\Process(function (Swoole\Process $worker) use ($ppid, $i) {
        sleep($i*3);
        echo "PID:{$worker->pid} kill -USR2 Master {$ppid}.", PHP_EOL;
        $worker->kill($ppid, SIGUSR2);
    });

    $process->start();
    $workers[$process->pid] = $process;
}

Swoole\Process::signal(SIGUSR2, function () use ($workers) {
    echo "收到子进程 USR2 信号,结束所有子进程!", PHP_EOL;
    foreach ($workers as $pid => $w) {
        Swoole\Process::kill($pid, SIGKILL);
    }
});

Swoole\Process::signal(SIGCHLD, function ($sig) {
    //必须为false,非阻塞模式
    while ($ret = Swoole\Process::wait(false)) {
        echo "PID={$ret['pid']}\n";
    }
});
Swoole\Timer::tick(2000function () {});

//  [root@localhost source]# php 3.4进程间通信.php
//  PID:2044 kill -USR2 Master 2043.
//  收到子进程 USR2 信号,结束所有子进程!
//  PID=2044
//  PID=2045

我们的两个子进程分别 sleep() 了 3 秒和 6 秒。比较快运行完的肯定是第一个 3 秒的子进程,这时候,我们向主进程发送一个 kill() ,并指定信号为 SIGUSR2 ,也就是我们最常见的 -USR2 这种。然后在主进程的监听中,打印收到信号了,并使用 SIGKILL 结束所有子进程,SIGKILL 其实就是我们最熟悉的 -9 。

运行结果其实就是当 SIGUSR2 监听完成后,会马上 kill 掉两个子进程,它们都马上被 SIGCHLD 回收,而不是等待第二个进程 sleep() 6 秒之后才结束。

总结

一口气又了解了这么多内容,是不是感觉意犹未尽呀。三种通信方式:管道、队列、信号,你都了解了吗?在这里我也只敢说了解,完全达不到掌握精通的程序,能写出来也是查阅了相当多的资料。说实话,如果只是做传统的 PHP 开发,真的很难接触到这些内容,因为我们不用考虑这些事,php-fpm 已经帮我们处理好了。说到这里,下篇我们就将要学习的是进程池与进程管理相关的内容,没错,就是 php-fpm 干的活。

测试代码:

https://github.com/zhangyue0503/swoole/blob/main/3.Swoole%E8%BF%9B%E7%A8%8B/source/3.4%E8%BF%9B%E7%A8%8B%E9%97%B4%E9%80%9A%E4%BF%A1.php

参考文档:

https://wiki.swoole.com/#/process/process

https://wiki.swoole.com/wiki/page/216.html

https://wiki.swoole.com/wiki/page/290.html

79200【Swoole系列3.4】进程间通信

这个人很懒,什么都没留下

文章评论