网站找哪些单位做实名认证,wordpress 拖动,广告设计与制作专业学什么课程,it项目网站开发的需求文档需要使用 swoole 扩展
我使用的是 swoole 5.x
start 方法启动服务 和 定时器
调整 listenQueue 定时器可以降低消息通讯延迟
定时器会自动推送队列里面的消息
testMessage 方法测试给指定用户推送消息
使用 laravel console 启动
?phpnamespace App\Console\Comman…需要使用 swoole 扩展
我使用的是 swoole 5.x
start 方法启动服务 和 定时器
调整 listenQueue 定时器可以降低消息通讯延迟
定时器会自动推送队列里面的消息
testMessage 方法测试给指定用户推送消息
使用 laravel console 启动
?phpnamespace App\Console\Commands;use App\Services\SocketService;
use Illuminate\Console\Command;class WsServer extends Command
{/*** The name and signature of the console command.** var string*/protected $signature app:wsServer;/*** The console command description.** var string*/protected $description Command description;/*** Execute the console command.*/public function handle(){$SocketService new SocketService();$SocketService-start();}
}socket 服务实现代码
?phpnamespace App\Services;use Swoole\WebSocket\Server;
use Swoole\Timer;
use Illuminate\Support\Facades\Redis;
use RedisException;
use Swoole\Http\Request;class SocketService
{public $port 9501;public $server;public $links;public $cmds [];public function __construct (){$this-links collect([]);$this-server new Server(0.0.0.0, env(APP_SOCKET_PORT, $this-port ));$this-server-on( open, function (Server $server, Request $request){$this-open( $server, $request );} );$this-server-on( message, function (Server $server, $frame){$this-message( $server, $frame );} );$this-server-on( close, function (Server $server, $fd){$this-close( $server, $fd );} );}public function start(){$this-linkManage();$this-listenQueue();$this-server-start();}public function print( $message, $level info ){if( is_array($message) || is_object($message) ){$message json_encode($message, 320);}print_r( [. date(Y-m-d H:i:s) .] . $level . . $message . \n );}public function linkManage(){Timer::tick( 100, function (){//var_dump( listenQueue while: . json_encode($this-cmds, 320) );$cmd array_shift( $this-cmds );if( $cmd ){switch ( $cmd[operate] ){case open:// 活跃$this-links-push( [ fd $cmd[fd], user_id intval($cmd[user_id]??0), updated_at date(Y-m-d H:i:s) ] );$this-print( 添加客户端fd . json_encode($cmd, 320) );break;case close:$newLinks [];foreach ( $this-links as $link ){if( $link[fd] $cmd[fd] ){continue;}$newLinks[] $link;}$this-links collect( $newLinks );$this-print( 删除客户端fd . json_encode($cmd, 320) );break;case heartbeat:$newLinks [];foreach ( $this-links as $link ){if( $link[fd] $cmd[fd] ){$link[updated_at] date(Y-m-d H:i:s);}$newLinks[] $link;}$this-links collect( $newLinks );break;}// $this-print( 连接数量是 . $this-links-count() );// $this-print( 连接数量是 . $this-links-toJson() );}$newLinks [];foreach ( $this-links as $link ){if( strtotime( $link[updated_at] ) (time() - 60) ){$this-print( 长时间未心跳删除客户端fd . json_encode($link, 320) );if( $this-server-isEstablished( $link[fd] ) ){$this-disconnect( $link[fd], 未进行心跳 );}continue;}$newLinks[] $link;}$this-links collect( $newLinks );} );}public function listenQueue(){Timer::tick( 1000, function (){// Redis::rpush( SocketService:listenQueue, serialize([hahah]) )try{$element Redis::lpop(SocketService:listenQueue);if( $element ){$this-print( listenQueue 有新的信息哦 . $element );$data unserialize($element);if( ! empty( $data[user_id]) ){$links $this-links-where( user_id, $data[user_id] )-values()-all();if( empty($links) ){$this-print( 没有在线用户user_id . json_encode($data, 320) );//var_export( $this-links );//var_export( $links );}foreach ( $links as $link ){if( ! $this-server-isEstablished( $link[fd] ) ){array_push( $this-cmds, [ operate close, fd $link[fd] ] );continue;}try{// 生成消息数据$message $this-makeMessage( $data[data], $data[type], $data[message] );// 开始推送$this-runPush( $link[fd], $message );}catch (\Throwable $e){$this-print( 数据推送异常 . json_encode([ $e-getMessage(),$e-getLine(), $e-getFile() ], 320) );}}}}}catch (RedisException $e){Redis::connect();}});}public function open( Server $server, Request $request ){$params $request-get;if( empty( $params[user_id] ) ){$this-disconnect( $request-fd, 缺少用户信息 );return true;}array_push( $this-cmds, [ operate open, fd $request-fd, user_id $params[user_id] ] );// 生成消息数据$message $this-makeMessage( [ fd $request-fd ], connectionSuccessful, 连接成功 );// 开始推送$this-runPush( $request-fd, $message );$this-print( server: handshake success with fd{$request-fd} );}public function message( Server $server, $frame ){//$data json_decode( $frame-data, true );if( is_array( $data ) ){if( $data[type] ping ){array_push( $this-cmds, [ operate heartbeat, fd $frame-fd ] );$this-server-push( $frame-fd, json_encode( [ type pong ] , 320 ) );}else{$this-print( receive from {$frame-fd}:{$frame-data},opcode:{$frame-opcode},fin:{$frame-finish} );}}}public function close(Server $server, $fd){array_push( $this-cmds, [ operate close, fd $fd ] );$this-print( client {$fd} closed );}public function push( $fd, string $data ){$this-server-push($fd, $data);}public function disconnect(int $fd, string $reason , int $code SWOOLE_WEBSOCKET_CLOSE_NORMAL){$this-server-disconnect($fd, $code, $reason);}public function makeMessage( array $data, $type , $message ){return [ type $type, message $message, data $data ];}public function runPush( $fd, $message ){$this-print( 推送消息 {$fd} - . json_encode( $message, 320 ) );$this-server-push( $fd, json_encode( $message , 320 ) );}/*** App\Services\SocketService::testMessage( 92 )* param $user_id* return void*/public static function testMessage( $user_id ){Redis::rpush( SocketService:listenQueue, serialize([user_id $user_id,type testMessage, message 测试消息, data [hello world],]) );}