Swoole RPC 的实现

2019年5月19日 352点热度 0人点赞 0条评论

第 83 篇文章

这是关于 Swoole 学习的第七篇文章:Swoole RPC 的实现。

概述

有位读者说 “上篇文章,下载代码后直接运行成功,代码简洁明了,简直是 Swoole 入门最好的 Demo ”。

“哈哈哈...”

还有读者说 “有一起学习的组织群吗,可以在里面进行疑难答疑?”

这个还真没有,总觉得维护一个微信群不容易,因为自己本身就不爱在群里说话,另外,自己也在很多微信群中,开始氛围挺好的,大家都聊聊技术,后来技术聊的少了改成聊八卦啦,再后来慢慢就安静了,还有在群里起冲突的...

当然我也知道维护一个微信群的好处是非常大的,如果有这方面经验的同学,咱们一起交流交流 ~

还有出版社找我写书的.

他们也真是放心,我自己肚子里几滴墨水还是知道的,目前肯定是不行,以后嘛,再说。

还有一些大佬加了微信,可能是出于对晚辈的提携吧,偷偷告诉你,从大佬的朋友圈能学到很多东西。

我真诚的建议,做技术的应该自己多总结总结,将自己会的东西写出来分享给大家,先不说给别人带来太多的价值,反正对自己的帮助是非常非常大的,这方面想交流的同学,可以加我,我可以给你无私分享。

可能都会说时间少,时间只要挤,总会有的,每个人都 24 小时,时间对每个人是最公平的。说到这推荐大家读一下《暗时间》这本书,这是我这本书的 读书笔记,大家可以瞅瞅。

开始今天的文章吧,这篇文章实现了一个简单的 RPC 远程调用,在实现之前需要先了解什么是 RPC,不清楚的可以看下之前发的这篇文章 《我眼中的 RPC》

下面的演示代码主要使用了 Swoole 的 Task 任务池,通过 OnRequest/OnReceive 获得信息交给 Task 去处理。

举个工作中的例子吧,在电商系统中的两个模块,个人中心模块和订单管理模块,这两个模块是独立部署的,可能不在一个机房,可能不是一个域名,现在个人中心需要通过 用户ID 和 订单类型 获取订单数据。

实现效果

客户端

HTTP 请求

  1. //代码片段

  2. <?php

  3. $demo = [

  4. 'type' => 'SW',

  5. 'token' => 'Bb1R3YLipbkTp5p0',

  6. 'param' => [

  7. 'class' => 'Order',

  8. 'method' => 'get_list',

  9. 'param' => [

  10. 'uid' => 1,

  11. 'type' => 2,

  12. ],

  13. ],

  14. ];


  15. $ch = curl_init();

  16. $options = [

  17. CURLOPT_URL => 'http://10.211.55.4:9509/',

  18. CURLOPT_POST => 1,

  19. CURLOPT_POSTFIELDS => json_encode($demo),

  20. ];

  21. curl_setopt_array($ch, $options);

  22. curl_exec($ch);

  23. curl_close($ch);

TCP 请求

  1. //代码片段

  2. $demo = [

  3. 'type' => 'SW',

  4. 'token' => 'Bb1R3YLipbkTp5p0',

  5. 'param' => [

  6. 'class' => 'Order',

  7. 'method' => 'get_list',

  8. 'param' => [

  9. 'uid' => 1,

  10. 'type' => 2,

  11. ],

  12. ],

  13. ];

  14. $this->client->send(json_encode($demo));

请求方式

  • SW 单个请求,等待结果

发出请求后,分配给 Task ,并等待 Task 执行完成后,再返回。

  • SN 单个请求,不等待结果

发出请求后,分配给 Task 之后,就直接返回。

发送数据

  1. $demo = [

  2. 'type' => 'SW',

  3. 'token' => 'Bb1R3YLipbkTp5p0',

  4. 'param' => [

  5. 'class' => 'Order',

  6. 'method' => 'get_list',

  7. 'param' => [

  8. 'uid' => 1,

  9. 'type' => 2,

  10. ],

  11. ],

  12. ];

  • type 同步/异步设置

  • token 可进行权限验证

  • class 请求的类名

  • method 请求的方法名

  • uid 参数一

  • type 参数二

返回数据

图片

  • request_method 请求方式

  • request_time 请求开始时间

  • response_time 请求结束时间

  • code 标识

  • msg 标识值

  • data 约定数据

  • query 请求参数

代码

OnRequest.php

  1. <?php


  2. if (!defined('SERVER_PATH')) exit("No Access");


  3. class OnRequest

  4. {

  5. private static $query;

  6. private static $code;

  7. private static $msg;

  8. private static $data;


  9. public static function run($serv, $request, $response)

  10. {

  11. try {

  12. $data = decrypt($request->rawContent());

  13. self::$query = $data;

  14. if (empty($data)) {

  15. self::$code = '-1';

  16. self::$msg = '非法请求';

  17. self::end($request, $response);

  18. }


  19. //TODO 验证Token


  20. switch ($data['type']) {

  21. case 'SW': //单个请求,等待结果

  22. $task = [

  23. 'request' => $data,

  24. 'server' => 'http'

  25. ];

  26. $rs = $serv->task(json_encode($task), -1, function ($serv, $task_id, $rs_data) use ($request, $response) {

  27. self::$code = '1';

  28. self::$msg = '成功';

  29. self::$data = $rs_data['response'];

  30. self::end($request, $response);

  31. });

  32. if ($rs === false) {

  33. self::$code = '-1';

  34. self::$msg = '失败';

  35. self::end($request, $response);

  36. }

  37. break;


  38. case 'SN': //单个请求,不等待结果

  39. $task = [

  40. 'request' => $data,

  41. 'server' => 'http'

  42. ];

  43. $rs = $serv->task(json_encode($task));

  44. if ($rs === false) {

  45. self::$code = '-1';

  46. self::$msg = '失败';

  47. } else {

  48. self::$code = '1';

  49. self::$msg = '成功';

  50. }

  51. self::end($request, $response);

  52. break;

  53. default:

  54. self::$code = '-1';

  55. self::$msg = '非法请求';

  56. self::end($request, $response);

  57. }

  58. } catch(Exception $e) {

  59. }

  60. }


  61. private static function end($request = null, $response = null)

  62. {

  63. $rs['request_method'] = $request->server['request_method'];

  64. $rs['request_time'] = $request->server['request_time'];

  65. $rs['response_time'] = time();

  66. $rs['code'] = self::$code;

  67. $rs['msg'] = self::$msg;

  68. $rs['data'] = self::$data;

  69. $rs['query'] = self::$query;

  70. $response->end(json_encode($rs));

  71. self::$data = [];

  72. return;

  73. }

  74. }

OnReceive.php

  1. <?php


  2. if (!defined('SERVER_PATH')) exit("No Access");


  3. class OnReceive

  4. {

  5. private static $request_time;

  6. private static $query;

  7. private static $code;

  8. private static $msg;

  9. private static $data;


  10. public static function run($serv, $fd, $reactor_id, $data)

  11. {

  12. try {

  13. self::$request_time = time();

  14. $data = decrypt($data);

  15. self::$query = $data;


  16. //TODO 验证Token



  17. switch ($data['type']) {

  18. case 'SW': //单个请求,等待结果

  19. $task = [

  20. 'fd' => $fd,

  21. 'request' => $data,

  22. 'server' => 'tcp',

  23. 'request_time' => self::$request_time,

  24. ];

  25. $rs = $serv->task(json_encode($task));

  26. if ($rs === false) {

  27. self::$code = '-1';

  28. self::$msg = '失败';

  29. self::handlerTask($serv, $fd);

  30. }

  31. break;


  32. case 'SN': //单个请求,不等待结果

  33. $task = [

  34. 'fd' => $fd,

  35. 'request' => $data,

  36. 'server' => 'tcp',

  37. 'request_time' => self::$request_time,

  38. ];

  39. $rs = $serv->task(json_encode($task));

  40. if ($rs === false) {

  41. self::$code = '-1';

  42. self::$msg = '失败';

  43. } else {

  44. self::$code = '1';

  45. self::$msg = '成功';

  46. }

  47. self::handlerTask($serv, $fd);

  48. break;

  49. default:

  50. self::$code = '-1';

  51. self::$msg = '非法请求';

  52. self::handlerTask($serv, $fd);

  53. }

  54. } catch(Exception $e) {

  55. }

  56. }


  57. private static function handlerTask($serv, $fd)

  58. {

  59. $rs['request_method'] = 'TCP';

  60. $rs['request_time'] = self::$request_time;

  61. $rs['response_time'] = time();

  62. $rs['code'] = self::$code;

  63. $rs['msg'] = self::$msg;

  64. $rs['data'] = self::$data;

  65. $rs['query'] = self::$query;

  66. $serv->send($fd, json_encode($rs));

  67. }

  68. }

小结

Demo 代码仅供参考,里面有很多不严谨的地方!

服务的调用方与提供方中间需要有一个服务注册中心,很显然上面的代码中没有,需要自己去实现。

服务注册中心,负责管理 IP、Port 信息,提供给调用方使用,还要能负载均衡和故障切换。

根据自己的情况,服务注册中心实现可容易可复杂,用 Redis 也行,用 Zookeeper、Consul 也行。

感兴趣的也可以了解下网关 Kong ,包括 身份认证、权限认证、流量控制、监控预警...

再推荐一个 Swoole RPC 框架 Hprose,支持多语言。

就到这了,上面的 Demo 需要源码的,加我微信。(菜单-> 加我微信-> 扫我)

推荐阅读

本文欢迎转发,转发请注明作者和出处,谢谢!图片

33960Swoole RPC 的实现

这个人很懒,什么都没留下

文章评论