2019-05-14 09:33:33  204548 0

workerman基础之gateway使用

 标签:   

GatewayWorker介绍


GatewayWorker是基于Workerman开发的一套TCP长连接的应用框架,实现了单发、群发、广播等接口,内置了mysql类库,GatewayWorker分为Gateway进程和Worker进程,天然支持分布式部署,能够支持庞大的连接数(百万甚至千万连接级别的应用)。可用于开发IM聊天应用、移动通讯、游戏后台、物联网、智能家居后台等等。


GatewayWorker工作原理


QQ截图20190512095653.jpg



1、可以方便的实现客户端之间的通讯

2、Gateway与Worker之间是基于socket长连接通讯,也就是说Gateway、Worker可以部署在不同的服务器上,非常容易实现分布式部署,扩容服务器

3、Gateway进程只负责网络IO,业务实现都在Worker进程上,可以reload Worker进程,实现在不影响用户的情况下完成代码热更新。


工作流程


1、Register、Gateway、BusinessWorker进程启动
2、Gateway、BusinessWorker进程启动后向Register服务进程发起长连接注册自己
3、Register服务收到Gateway的注册后,把所有Gateway的通讯地址保存在内存中
4、Register服务收到BusinessWorker的注册后,把内存中所有的Gateway的通讯地址发给BusinessWorker
5、BusinessWorker进程得到所有的Gateway内部通讯地址后尝试连接Gateway
6、如果运行过程中有新的Gateway服务注册到Register(一般是分布式部署加机器),则将新的Gateway内部通讯地址列表将广播给所有BusinessWorker,BusinessWorker收到后建立连接
7、如果有Gateway下线,则Register服务会收到通知,会将对应的内部通讯地址删除,然后广播新的内部通讯地址列表给所有BusinessWorker,BusinessWorker不再连接下线的Gateway
8、至此Gateway与BusinessWorker通过Register已经建立起长连接
9、客户端的事件及数据全部由Gateway转发给BusinessWorker处理,BusinessWorker默认调用Events.php中的onConnect onMessage onClose处理业务逻辑。
10、BusinessWorker的业务逻辑入口全部在Events.php中,包括onWorkerStart进程启动事件(进程事件)、onConnect连接事件(客户端事件)、onMessage消息事件(客户端事件)、onClose连接关闭事件(客户端事件)、onWorkerStop进程退出事件(进程事件)
GatewayWorker  OR  Workerman

如果你的项目是长连接并且需要客户端与客户端之间通讯,建议使用GatewayWorker。
短连接或者不需要客户端与客户端之间通讯的项目建议使用Workerman。
GatewayWorker不支持UDP监听,所以UDP服务请选择Workerman。
如果你是一个有多进程socket编程经验的人,喜欢定制自己的进程模型,可以选择Workerman。


GatewayWorker安装


composer require workerman/gateway-worker


目录结构


├── BusinessWorker.php
├── Gateway.php
├── Lib
│   ├── Context.php
│   ├── DbConnection.php
│   ├── Db.php
│   └── Gateway.php
├── Protocols
│   └── GatewayProtocol.php
└── Register.php


Gateway使用


Gateway类用于初始化Gateway进程。Gateway进程是暴露给客户端的让其连接的进程。所有客户端的请求都是由Gateway接收然后分发给BusinessWorker处理,同样BusinessWorker也会将要发给客户端的响应通过Gateway转发出去。

require_once 'vendor/autoload.php';

use Workerman\Worker;
use GatewayWorker\Gateway;


初始化:


$gateway = new Gateway('protocol://ip:port’);


支持协议


为应用层协议,目前支持的协议有
1、websocket协议
2、text协议
3、Frame协议
4、自定义通讯协议
5、tcp,直接裸tcp,不推荐


属性


name
和Worker一样,可以设置Gateway进程的名称,方便status命令中查看统计


count
和Worker一样,可以设置Gateway进程的数量,以便充分利用多cpu资源

lanIp
lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0 .


startPort
Gateway进程启动后会监听一个本机端口,用来给BusinessWorker提供链接服务,然后Gateway与BusinessWorker之间就通过这个连接通讯。这里设置的是Gateway监听本机端口的起始端口。比如启动了4个Gateway进程,startPort为2000,则每个Gateway进程分别启动的本地端口一般为2000、2001、2002、2003。

当本机有多个Gateway/BusinessWorker项目时,需要把每个项目的startPort设置成不同的段

registerAddress,注册服务地址,只写格式类似于 '127.0.0.1:1236’


回调属性


onWorkerStart
和Worker一样,可以设置Gateway进程启动后的回调函数,一般在这个回调里面初始化一些全局数据

onWorkerStop
和Worker一样,可以设置Gateway进程关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作

onConnect(比较少用到,开发者一般不用关注)
和Worker一样,可以设置onConnect回调,当有客户端连接上来时触发。与Events::onConnect的区别是Events::onConnect运行在BusinessWorker进程上。Gateway::onConnect是运行在Gateway进程上,无法使用\GatewayWorker\Lib\Gateway类提供的接口

onClose(比较少用到,开发者一般不用关注)
和Worker一样,可以设置onClose回调,当有客户端连接关闭时触发。同样与Events::onClose的区别是Gateway::onClose是运行在Gateway进程上,无法使用\GatewayWorker\Lib\Gateway类提供的接口
BusinessWorker使用

BusinessWorker类其实也是基于基础的Worker开发的。BusinessWorker是运行业务逻辑的进程,BusinessWorker收到Gateway转发来的事件及请求时会默认调用Events.php中的onConnect onMessage onClose方法处理事件及数据,开发者正是通过实现这些回调控制业务及流程。

name
和Worker一样,可以设置BusinessWorker进程的名称,方便status命令中查看统计

count
和Worker一样,可以设置BusinessWorker进程的数量,以便充分利用多cpu资源

registerAddress,注册服务地址,只写格式类似于 '127.0.0.1:1236'

eventHandler 设置使用哪个类来处理业务,默认值是Events,即默认使用Events.php中的Events类来处理业务。业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现。

onWorkerStart
和Worker一样,可以设置BusinessWorker启动后的回调函数,一般在这个回调里面初始化一些全局数据

onWorkerStop
和Worker一样,可以设置BusinessWorker关闭的回调函数,一般在这个回调里面做数据清理或者保存数据工作


Register


Register类其实也是基于基础的Worker开发的。Gateway进程和BusinessWorker进程启动后分别向Register进程注册自己的通讯地址,Gateway进程和BusinessWorker通过Register进程得到通讯地址后,就可以建立起连接并通讯了。

Register类只能定制监听的ip和端口,并且目前只能使用text协议。

use Workerman\Worker;
use GatewayWorker\Register;

$register = new Register('text://0.0.0.0:1236');


Events


eventHandler 设置使用哪个类来处理业务,默认值是Events,即默认使用Events.php中的Events类来处理业务。业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现。

onWorkerStart(BusinessWorker $businessWorker);
当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次。$businessworker->onWorkerStart和Event::onWorkerStart不会互相覆盖,如果两个回调都设置则都会运行。

onConnect(string $client_id);
当客户端连接上gateway进程时(TCP三次握手完毕时)触发的回调函数。

 $client_id
client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。

onWebSocketConnect(string $client_id, array $data);

当客户端连接上gateway完成websocket握手时触发的回调函数。

 $client_id
client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。

$data
websocket握手时的http头数据,包含get、server等变量

onWebSocketConnect(string $client_id, array $data);
当客户端连接上gateway完成websocket握手时触发的回调函数。

 $client_id
client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。

$data
websocket握手时的http头数据,包含get、server等变量

onMessage(string $client_id, mixed $recv_data);
当客户端发来数据(Gateway进程收到数据)后触发的回调函数

$client_id
全局唯一的客户端socket连接标识

$recv_data
完整的客户端请求数据,数据类型取决于Gateway所使用协议的decode方法返的回值类型

onClose(string $client_id);
客户端与Gateway进程的连接断开时触发。不管是客户端主动断开还是服务端主动断开,都会触发这个回调。

onWorkerStop(BusinessWorker $businessWorker);
当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。


Lib\Gateway


Lib\Gateway类是Gateway/BusinessWorker模型中给客户端发送数据的类。
提供了单发、群发以及关闭客户端连接的接口

sendToClient(string $client_id, string $send_data);
向客户端client_id发送$send_data数据

$client_id
客户端连接的client_id

$send_data
要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后再发送给客户端

closeClient(string $client_id);
断开与client_id对应的客户端的连接

$client_id
全局唯一标识客户端连接的id

sendToAll(string $send_data [, array $client_id_array = null [, array $exclude_client_id = null [, bool $raw = false]]]);
向所有客户端或者client_id_array指定的客户端发送$send_data数据

$send_data
要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后发送给客户端

$client_id_array
指定向哪些client_id发送,如果不传递该参数,则是向所有在线客户端发送 $send_data 数据

$exclude_client_id
client_id组成的数组。$exclude_client_id数组中指定的client_id将被排除在外,不会收到本次发的消息

$raw
是否发送原始数据,一般用不到

getAllClientIdList(void);
获取全局所有在线client_id列表。

getAllClientIdCount(void);
获取当前在线连接总数(多少client_id在线)。

isOnline(string $client_id);
判断$client_id是否还在线,是否在线取决于对应client_id是否触发过onClose回调。

$client_id
客户端的client_id

返回值
在线返回1,不在线返回0

bindUid(string $client_id, mixed $uid);
将client_id与uid绑定,uid泛指用户id或者设备id,用来唯一确定一个客户端用户或者设备

$client_id
客户端的client_id

$uid
uid,可以是数字或者字符串。
uid与client_id是一对多的关系,一个uid下有多个client_id,client_id下线(连接断开)时会自动执行解绑

sendToUid(mixed $uid, string $message);
向uid绑定的所有在线client_id发送数据。

$uid
uid可以是字符串、数字、或者包含uid的数组。如果为数组,则是给数组内所有uid发送数据

$message
要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后再发送给客户端

getClientIdByUid(mixed $uid);
返回一个数组,数组元素为与uid绑定的所有在线的client_id

getUidByClientId(string $client_id);
返回client_id绑定的uid,如果client_id没有绑定uid,则返回null。

unbindUid(string $client_id, mixed $uid);
将client_id与uid解绑。当client_id下线(连接断开)时会自动与uid解绑

$client_id
客户端的client_id

$uid
数字或者字符串

isUidOnline(mixed $uid);
判断$uid是否在线,如果某uid没有通过进行任何绑定,那么对该uid调用将返回0

返回值
uid在线返回1,不在线返回0

getAllUidList(void);
获取全局所有在线uid列表。

getAllUidCount(void);
获取全局所有在线uid数量。

joinGroup(string $client_id, mixed $group);

将client_id加入某个组,以便通过Gateway::sendToGroup发送数据。

$client_id
客户端的client_id

$group
只能是数字或者字符串

leaveGroup(string $client_id, mixed $group);
将client_id从某个组中删除

$client_id
客户端的client_id

$group
只能是数字或者字符串。

ungroup(mixed $group);
取消分组,或者说解散分组

sendToGroup(mixed $group, string $message [, array $exclude_client_id = null [, bool $raw = false]])
向某个分组的所有在线client_id发送数据。

$group
group可以是字符串、数字、或者数组。如果为数组,则是给数组内所有group发送数据

$message
要发送的数据(字符串类型),此数据会被Gateway所使用协议的encode方法打包后再发送给客户端

$exclude_client_id
client_id组成的数组。$exclude_client_id数组中指定的client_id将被排除在外,不会收到本次发的消息

$raw
是否发送原始数据

getAllGroupIdList(void);
获取全局所有在线group id列表。

getClientIdCountByGroup(mixed $group);
获取某分组当前在线成连接数(多少client_id在线)。

getClientIdListByGroup(mixed $group);
获取某个分组所有在线client_id列表。

getUidCountByGroup(mixed $group);
获取某个分组下的在线uid数量。

getUidListByGroup(mixed $group);
获取某个分组所有在线uid列表。

getClientSessionsByGroup(mixed $group);
获取某个分组所有在线client_id信息。

getAllClientSessions(void);
获取当前所有在线client_id信息。

setSession(string $client_id, array $session);
设置某个client_id对应的session

updateSession(string $client_id, array $session);
更新某个client_id对应的session

getSession(string $client_id);
获取某个client_id对应的session。


配置wss服务

$context = array(
    'ssl' => array(
        // 请使用绝对路径
        'local_cert'                 => '磁盘路径/server.pem', // 也可以是crt文件
        'local_pk'                   => '磁盘路径/server.key',
        'verify_peer'               => false,
       
    )
);

$gateway = new Gateway("websocket://0.0.0.0:443", $context);
$gateway->transport = 'ssl';
ws = new WebSocket("wss://域名");

1、如果无法启动,则一般是443端口被占用,请改成其它端口。如果必须使用443端口请参考worekrman手册创建wss服务方法二部分。

2、wss端口只能通过wss协议访问,ws无法访问wss端口。

3、证书一般是与域名绑定的,所以测试的时候客户端请使用域名连接,不要使用ip去连。

4、如果出现无法访问的情况,请检查服务器防火墙。

5、此方法要求PHP版本>=5.6,因为微信小程序要求tls1.2,而PHP5.6以下版本不支持tls1.2。
开启进程数

Gateway进程使用的非阻塞式IO通讯,属于CPU密集型业务,Gateway进程数设置成与CPU核数一样

BusinessWorker进程中根据业务是否有阻塞式IO设置进程数为CPU核数的1倍-3倍


Thinkphp5.1使用Gateway


composer require topthink/think-worker

配置文件
gateway_worker.php

0 条留言此文章没有人评论

给我留言

评论内容