网站页面设计大小,黑龙江华龙建设集团网站,网络营销推广合作,全国做暧小视频网站一、前言 最近需要做一个有关聊天的小程序#xff0c;逻辑很简单#xff0c;所以不打算用Swoole和workerman之类的#xff0c;最后选择了Ratchet#xff0c;因为简单易用#xff0c;适合小型websocket服务。 二、问题 但是目前我的项目是分布式环境#xff0c;统一通过Ng…一、前言 最近需要做一个有关聊天的小程序逻辑很简单所以不打算用Swoole和workerman之类的最后选择了Ratchet因为简单易用适合小型websocket服务。 二、问题 但是目前我的项目是分布式环境统一通过Nginx的反向代理分配到多个不同服务器那么在其中一个服务器建立了WebSocket连接的用户如何给在另外一个服务器上建立了WebSocket连接的用户发送消息呢这就涉及到了分布式websocket服务但是我不希望太复杂所以采用了消息队列的方式实现效果。 三、安装Ratchet类
直接使用composer安装我用版本是cboden/ratchet: ^0.2.8比较老了。
composer require cboden/ratchet四、创建websocket服务
因为php是同步阻塞型语言通常每次请求都会从头到尾执行完成在PHP中所有的代码都按照顺序执行直到脚本结束为止。但是我们要在一个进程中启动websocket服务还要监听消息队列就需要用到ratchet中的事件循环机制实现异步非阻塞通信效果。
?php
//载入Ratchet类库
require_once APP_PATH.vendor/autoload.php;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
use React\EventLoop\Factory as LoopFactory;
use React\Socket\Server as ReactSocket;set_time_limit(0);
ini_set(default_socket_timeout, -1);
/*** Websocket_Server*/
class ControllerWebsocket_Server
{public function indexAction(){try {$port 8083;// 创建事件循环使用该机制实现异步非阻塞通信$loop LoopFactory::create();// 创建 React Socket 服务器$socket new ReactSocket($loop);$socket-listen($port, 0.0.0.0); // 指定监听的端口和地址// 启动 WebSocket 服务器$server new IoServer(new WsServer(new \ModelWebsocket_Handler($loop)),$socket,$loop);// 启动事件循环$loop-run();} catch (\Exception $e) {echo $e-getMessage();}}
}
其中ModelWebsocket_Handler是封装好的websocket操作类
?php
//载入Ratchet类库
require_once APP_PATH.vendor/autoload.php;
use Ratchet\MessageComponentInterface;
use Ratchet\ConnectionInterface;/*** websocket服务端-相关操作*/
class ModelWebsocket_Handler implements MessageComponentInterface {//数据缓存const REDIS_KEY_RESOURCE_DATA_MAP h:websocket:resource:data:map;//客户端public $clients;public function __construct($loop) {$this-clients new \SplObjectStorage();$this-subscribeMessage($loop);}/*** 连接建立时的逻辑*/public function onOpen(ConnectionInterface $conn) {$this-clients-attach($conn);echo New connection! ({$conn-resourceId})\n;//获取连接请求的参数$params [];$queryString $conn-WebSocket-request-getQuery();parse_str($queryString, $params);//存储资源id相关数据$this-setResourceDataMap($conn-resourceId, $params);}/*** 收到消息时的逻辑*/public function onMessage(ConnectionInterface $from, $msg) {echo Received message: {$msg}\n;foreach ($this-clients as $client) {if ($client $from) {continue;}//发送消息$client-send($msg);}}/*** 连接关闭时的逻辑*/public function onClose(ConnectionInterface $conn) {$this-delResourceDataMap($conn-resourceId);$this-clients-detach($conn);echo Connection {$conn-resourceId} has disconnected\n;}/*** 错误处理逻辑*/public function onError(ConnectionInterface $conn, \Exception $e) {echo An error occurred: {$e-getMessage()}\n;$this-delResourceDataMap($conn-resourceId);$conn-close();}/*** 存储资源id相关数据* * param string $resourceId* param array $data* return bool*/public function setResourceDataMap($resourceId, $data) {$redis Comm_Redis::init(Comm_Redis::REDIS_TVDB, true);$rs $redis-hSet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId, json_encode($data));return $rs;}/*** 获取资源id相关数据* * param string $resourceId* return array*/public function getResourceDataMap($resourceId) {$redis Comm_Redis::init(true);$rs $redis-hGet(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return json_decode($rs, true) ?: [];}/*** 删除资源id相关数据* * param string $resourceId* return bool*/public function delResourceDataMap($resourceId) {$redis Comm_Redis::init(true);$rs $redis-hDel(self::REDIS_KEY_RESOURCE_DATA_MAP, $resourceId);return $rs;}/*** 订阅消息*/public function subscribeMessage($loop){$loop-addPeriodicTimer(1, function () {//在这里可以使用redis订阅消息、也可以使用kafka消费消息然后再比对自身是否存在相应用户的连接如果存在则发送不存在则过滤达到分布式webSocket服务的作用foreach ($this-clients as $client) {$client-send(测试);} });}
}
其中subscribeMessage方法监听消息队列收到消息之后比对自身是否存在相应用户的连接如果存在则发送不存在则过滤达到分布式webSocket服务的作用。
当然如果你能直接找到用户所连接的服务器并且可以直接推给相应的服务器那更好可以节省流量开销和一些额外的逻辑处理。