Manager.php 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. <?php
  2. // +----------------------------------------------------------------------
  3. // | CRMEB [ CRMEB赋能开发者,助力企业发展 ]
  4. // +----------------------------------------------------------------------
  5. // | Copyright (c) 2016~2022 https://www.crmeb.com All rights reserved.
  6. // +----------------------------------------------------------------------
  7. // | Licensed CRMEB并不是自由软件,未经许可不能去掉CRMEB相关版权
  8. // +----------------------------------------------------------------------
  9. // | Author: CRMEB Team <admin@crmeb.com>
  10. // +----------------------------------------------------------------------
  11. namespace app\webscoket;
  12. use app\webscoket\handler\AdminHandler;
  13. use app\webscoket\handler\MerchantHandler;
  14. use app\webscoket\handler\ServiceHandler;
  15. use app\webscoket\handler\UserHandler;
  16. use Swoole\Server;
  17. use Swoole\Websocket\Frame;
  18. use think\Config;
  19. use think\Event;
  20. use think\facade\Cache;
  21. use think\Request;
  22. use think\response\Json;
  23. use think\swoole\Websocket;
  24. use think\swoole\websocket\Room;
  25. /**
  26. * Class Manager
  27. * @package app\webscoket
  28. * @author xaboy
  29. * @day 2020-04-29
  30. */
  31. class Manager extends Websocket
  32. {
  33. /**
  34. * @var \Swoole\WebSocket\Server
  35. */
  36. protected $server;
  37. /**
  38. * @var Ping
  39. */
  40. protected $pingService;
  41. /**
  42. * @var int
  43. */
  44. protected $cache_timeout;
  45. const USER_TYPE = ['admin', 'user', 'mer', 'ser'];
  46. /**
  47. * Manager constructor.
  48. * @param Server $server
  49. * @param Room $room
  50. * @param Event $event
  51. * @param Ping $ping
  52. * @param Config $config
  53. */
  54. public function __construct(Server $server, Room $room, Event $event, Ping $ping, Config $config)
  55. {
  56. parent::__construct($server, $room, $event);
  57. $this->pingService = $ping;
  58. $this->cache_timeout = (int)($config->get('swoole.websocket.ping_timeout', 60000) / 1000) + 2;
  59. app()->bind('websocket_handler_admin', AdminHandler::class);
  60. app()->bind('websocket_handler_user', UserHandler::class);
  61. app()->bind('websocket_handler_mer', MerchantHandler::class);
  62. app()->bind('websocket_handler_ser', ServiceHandler::class);
  63. }
  64. /**
  65. * @param int $fd
  66. * @param Request $request
  67. * @return mixed
  68. * @author xaboy
  69. * @day 2020-05-06
  70. */
  71. public function onOpen($fd, Request $request)
  72. {
  73. $type = $request->get('type');
  74. $token = $request->get('token');
  75. if (!$token || !in_array($type, self::USER_TYPE)) {
  76. return $this->server->close($fd);
  77. }
  78. try {
  79. $data = $this->exec($type, 'login', compact('fd', 'request', 'token'))->getData();
  80. } catch (\Exception $e) {
  81. // var_dump($e->getMessage());
  82. return $this->server->close($fd);
  83. }
  84. if (!isset($data['status']) || $data['status'] != 200 || !($data['data']['uid'] ?? null)) {
  85. // var_dump($data);
  86. return $this->server->close($fd);
  87. }
  88. $type = array_search($type, self::USER_TYPE);
  89. $this->login($type, $fd, $data['data']);
  90. $this->pingService->createPing($fd, time(), $this->cache_timeout);
  91. return $this->send($fd, app('json')->message('ping', ['now' => time()]));
  92. }
  93. public function login($type, $fd, $data)
  94. {
  95. $key = '_ws_' . $type;
  96. Cache::sadd($key, $fd);
  97. Cache::sadd($key . $data['uid'], $fd);
  98. Cache::set('_ws_f_' . $fd, [
  99. 'type' => $type,
  100. 'uid' => $data['uid'],
  101. 'fd' => $fd,
  102. 'payload' => $data['payload'] ?? null,
  103. 'mer_id' => $data['mer_id'] ?? null
  104. ], 3600);
  105. if (isset($data['mer_id'])) {
  106. $groupKey = $key . '_group' . $data['mer_id'];
  107. Cache::sadd($groupKey, $fd);
  108. Cache::expire($groupKey, 3600);
  109. }
  110. $this->refresh($type, $fd, $data['uid']);
  111. }
  112. public function refresh($type, $fd, $uid)
  113. {
  114. $key = '_ws_' . $type;
  115. Cache::expire($key, 3600);
  116. Cache::expire($key . $uid, 3600);
  117. Cache::expire('_ws_f_' . $fd, 3600);
  118. }
  119. public function logout($type, $fd)
  120. {
  121. $data = Cache::get('_ws_f_' . $fd);
  122. $key = '_ws_' . $type;
  123. Cache::srem($key, $fd);
  124. if ($data) {
  125. Cache::delete('_ws_f_' . $fd);
  126. Cache::srem($key . $data['uid'], $fd);
  127. if (($data['mer_id'] ?? null) !== null) {
  128. $groupKey = $key . '_group' . $data['mer_id'];
  129. Cache::srem($groupKey, $fd);
  130. }
  131. }
  132. }
  133. public static function merFd($merId)
  134. {
  135. return Cache::smembers('_ws_2_group' . $merId) ?: [];
  136. }
  137. public static function userFd($type, $uid = '')
  138. {
  139. $key = '_ws_' . $type . $uid;
  140. return Cache::smembers($key) ?: [];
  141. }
  142. /**
  143. * @param $type
  144. * @param $method
  145. * @param $result
  146. * @return null|Json
  147. * @author xaboy
  148. * @day 2020-05-06
  149. */
  150. protected function exec($type, $method, $result)
  151. {
  152. $handler = app()->make('websocket_handler_' . $type);
  153. if (!method_exists($handler, $method)) return null;
  154. /** @var Json $response */
  155. return $handler->{$method}($result);
  156. }
  157. /**
  158. * @param Frame $frame
  159. * @return bool
  160. * @author xaboy
  161. * @day 2020-04-29
  162. */
  163. public function onMessage(Frame $frame)
  164. {
  165. $info = Cache::get('_ws_f_' . $frame->fd);
  166. $result = json_decode($frame->data, true) ?: [];
  167. if (!isset($result['type']) || !$result['type']) return true;
  168. $this->refresh($info['type'], $frame->fd, $info['uid']);
  169. if (($info['mer_id'] ?? null) !== null) {
  170. $groupKey = '_ws_' . $info['type'] . '_group' . $info['mer_id'];
  171. Cache::expire($groupKey, 3600);
  172. }
  173. if ($result['type'] == 'ping') {
  174. return $this->send($frame->fd, app('json')->message('ping', ['now' => time()]));
  175. }
  176. $data = $result['data'] ?? [];
  177. $frame->uid = $info['uid'];
  178. $frame->payload = $info['payload'];
  179. /** @var Json $response */
  180. $response = $this->exec(self::USER_TYPE[$info['type']], $result['type'], compact('data', 'frame', 'info'));
  181. if ($response) return $this->send($frame->fd, $response);
  182. return true;
  183. }
  184. protected function send($fd, Json $json)
  185. {
  186. $this->pingService->createPing($fd, time(), $this->cache_timeout);
  187. if ($this->server->isEstablished($fd) && $this->server->exist($fd)) {
  188. $this->server->push($fd, json_encode($json->getData()));
  189. }
  190. return true;
  191. }
  192. /**
  193. * @param int $fd
  194. * @param int $reactorId
  195. * @author xaboy
  196. * @day 2020-04-29
  197. */
  198. public function onClose($fd, $reactorId)
  199. {
  200. $data = Cache::get('_ws_f_' . $fd);
  201. if ($data) {
  202. $this->logout($data['type'], $fd);
  203. $this->exec(self::USER_TYPE[$data['type']], 'close', $data);
  204. }
  205. $this->pingService->removePing($fd);
  206. }
  207. }