Swoole Task 的应用

2019年4月10日 317点热度 0人点赞 0条评论

概述

Swoole 异步Task,主要实现调用异步任务的执行。

常用的场景:异步支付处理、异步订单处理、异步日志处理、异步发送邮件/短信等。

Swoole 的实现方式是 worker 进程处理数据请求,分配给 task 进程执行。

官方介绍:

task 底层使用 Unix Socket 管道通信,是全内存的,没有IO消耗。单进程读写性能可达100万/s,不同的进程使用不同的管道通信,可以最大化利用多核。

本地版本:PHP 7.2.6、Swoole 4.3.1。

不多说,先看效果图:

图片

代码

server.php

  1. <?php


  2. class Server

  3. {

  4. private $serv;


  5. public function __construct() {

  6. $this->serv = new swoole_server('0.0.0.0', 9501);

  7. $this->serv->set([

  8. 'worker_num' => 2, //开启2个worker进程

  9. 'max_request' => 4, //每个worker进程 max_request设置为4次

  10. 'task_worker_num' => 4, //开启4个task进程

  11. 'dispatch_mode' => 2, //数据包分发策略 - 固定模式

  12. ]);


  13. $this->serv->on('Start', [$this, 'onStart']);

  14. $this->serv->on('Connect', [$this, 'onConnect']);

  15. $this->serv->on("Receive", [$this, 'onReceive']);

  16. $this->serv->on("Close", [$this, 'onClose']);

  17. $this->serv->on("Task", [$this, 'onTask']);

  18. $this->serv->on("Finish", [$this, 'onFinish']);


  19. $this->serv->start();

  20. }


  21. public function onStart($serv) {

  22. echo "#### onStart ####".PHP_EOL;

  23. echo "SWOOLE ".SWOOLE_VERSION . " 服务已启动".PHP_EOL;

  24. echo "master_pid: {$serv->master_pid}".PHP_EOL;

  25. echo "manager_pid: {$serv->manager_pid}".PHP_EOL;

  26. echo "########".PHP_EOL.PHP_EOL;

  27. }


  28. public function onConnect($serv, $fd) {

  29. echo "#### onConnect ####".PHP_EOL;

  30. echo "客户端:".$fd." 已连接".PHP_EOL;

  31. echo "########".PHP_EOL.PHP_EOL;

  32. }


  33. public function onReceive($serv, $fd, $from_id, $data) {

  34. echo "#### onReceive ####".PHP_EOL;

  35. echo "worker_pid: {$serv->worker_pid}".PHP_EOL;

  36. echo "客户端:{$fd} 发来的Email:{$data}".PHP_EOL;

  37. $param = [

  38. 'fd' => $fd,

  39. 'email' => $data

  40. ];

  41. $rs = $serv->task(json_encode($param));

  42. if ($rs === false) {

  43. echo "任务分配失败 Task ".$rs.PHP_EOL;

  44. } else {

  45. echo "任务分配成功 Task ".$rs.PHP_EOL;

  46. }

  47. echo "########".PHP_EOL.PHP_EOL;

  48. }


  49. public function onTask($serv, $task_id, $from_id, $data) {

  50. echo "#### onTask ####".PHP_EOL;

  51. echo "#{$serv->worker_id} onTask: [PID={$serv->worker_pid}]: task_id={$task_id}".PHP_EOL;


  52. //业务代码

  53. for($i = 1 ; $i <= 5 ; $i ++ ) {

  54. sleep(2);

  55. echo "Task {$task_id} 已完成了 {$i}/5 的任务".PHP_EOL;

  56. }


  57. $data_arr = json_decode($data, true);

  58. $serv->send($data_arr['fd'] , 'Email:'.$data_arr['email'].',发送成功');

  59. $serv->finish($data);

  60. echo "########".PHP_EOL.PHP_EOL;

  61. }


  62. public function onFinish($serv,$task_id, $data) {

  63. echo "#### onFinish ####".PHP_EOL;

  64. echo "Task {$task_id} 已完成".PHP_EOL;

  65. echo "########".PHP_EOL.PHP_EOL;

  66. }


  67. public function onClose($serv, $fd) {

  68. echo "Client Close.".PHP_EOL;

  69. }

  70. }


  71. $server = new Server();

client.php

  1. <?php


  2. class Client

  3. {

  4. private $client;


  5. public function __construct() {

  6. $this->client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);


  7. $this->client->on('Connect', [$this, 'onConnect']);

  8. $this->client->on('Receive', [$this, 'onReceive']);

  9. $this->client->on('Close', [$this, 'onClose']);

  10. $this->client->on('Error', [$this, 'onError']);

  11. }


  12. public function connect() {

  13. if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {

  14. echo "Error: {$fp->errMsg}[{$fp->errCode}]".PHP_EOL;

  15. return;

  16. }

  17. }


  18. public function onConnect($cli) {

  19. fwrite(STDOUT, "输入Email:");

  20. swoole_event_add(STDIN, function() {

  21. fwrite(STDOUT, "输入Email:");

  22. $msg = trim(fgets(STDIN));

  23. $this->send($msg);

  24. });

  25. }


  26. public function onReceive($cli, $data) {

  27. echo PHP_EOL."Received: ".$data.PHP_EOL;

  28. }


  29. public function send($data) {

  30. $this->client->send($data);

  31. }


  32. public function onClose($cli) {

  33. echo "Client close connection".PHP_EOL;

  34. }


  35. public function onError() {


  36. }

  37. }


  38. $client = new Client();

  39. $client->connect();

小结

一、上面的配置总共开启了几个进程?

总共8个进程(1个master进程、1个manager进程、4个task进程、2个worker进程)

重新运行的可能与上图进程号不一致:

图片

master进程:22481

manager进程:22485

task进程:22488、22489、22490、22491

worker进程:22492、22493

参考官方提供的进程图:

图片

二、为什么执行了5次后,worker进程号发生了改变?

因为我们设了置worker进程的max_request=4,一个worker进程在完成最大请求次数任务后将自动退出,进程退出会释放所有的内存和资源,这样的机制主要是解决PHP进程内存溢出的问题。

三、当task执行任务异常,我们kill一个task进程,会再新增一个吗?

会。

四、如何设置 task_worker_num

最大值不得超过 SWOOLE_CPU_NUM * 1000。

查看本机 CPU 核数:

  1. echo "swoole_cpu_num:".swoole_cpu_num().PHP_EOL;

根据项目的任务量决定的,比如:1秒会产生200个任务,执行每个任务需要500ms。

想在1s中执行完成200个任务,需要100个task进程。

100 = 200/(1/0.5)

五、如何设置 worker_num ?

默认设置为本机的CPU核数,最大不得超过 SWOOLE_CPU_NUM * 1000。

比如:1个请求耗时10ms,要提供1000QPS的处理能力,那就必须配置10个进程。

10 = 0.01*1000

假设每个进程占用40M内存,10个进程就需要占用400M的内存。

扩展

  • Server->taskwait

  • Server->taskWaitMulti

  • Server->taskCo

参考文档

  • https://wiki.swoole.com/wiki/page/134.html

推荐阅读

本文欢迎转发,转发请注明作者和出处,谢谢!

图片

35100Swoole Task 的应用

这个人很懒,什么都没留下

文章评论