Yii2基于workerman的pushserver实现消息推送
在实际项目中即时消息的展示必不可少,比如站内信,在线客服等,一般这类实现由两种方式,一种是采用ajax轮询的方式拉取,还有一种就是采用websocket主推推送的方式。其中主动推送可以节约服务器资源,有效减少无效的访问和查询,所以越来越被大家所推崇。接下来我们就讲讲如何采用websocket的方式完成一次消息推送。
在php中推送的框架很多,比较出名和好用的应该是swoole和workerman。其中swoole是php的c扩展,workerman纯php写的socket处理框架。有个区别就是swoole没有windows版本,而workerman有windows版本,这样我们就可以在win上开发,无缝迁移到linux,so我们就采用workerman作为框架。
首先下载开发用的win版本workerman增强版Gateway代码https://github.com/walkor/GatewayWorker-for-win 在YII的根目录创建一个目录叫pushServer,然后修改下载的源码根目录文件夹名称为dev表示我们的开发环境(还可以下载linux版本的放进来叫product表示生产环境)。修改applications下面的文件夹yourApp为blog。所以最终的文件夹目录结构如下:
pushServer
├── dev
├── Applications // 这里是所有开发者应用项目
│ └── blog // 其中一个项目目录,目录名可以自定义
│ ├── Events.php // 开发者只需要关注这个文件
│ ├── start_gateway.php // gateway进程启动脚本
│ ├── start_businessworker.php // businessWorker进程启动脚本
│ └── start_register.php // 注册服务启动脚本
│
├── GatewayWorker // GatewayWorker框架代码
│ ├── BusinessWorker.php // BusinessWorker进程实现
│ ├── Gateway.php // Gateway进程实现
│ ├── Register.php // 注册服务进程实现
│ ├── Lib
│ │ ├── Context.php // Gateway与BusinessWorker通信上下文
│ │ ├── DbConnection.php// 一个数据库连接类
│ │ ├── Db.php // 数据库连接管理类
│ │ └── Gateway.php // Gateway通信接口类,给Events.php调用
│ └──── Protocols
│ └── GatewayProtocol.php // Gateway与BusinessWorker通信协议
│
├── Workerman // workerman内核目录
│
└── start.php // 全局启动脚本
使用gateway推送的整个推送流程可以参考官方文档http://www.workerman.net/gatewaydoc/advanced/push.html:
1.Yii2通过gateway提供的内部协议(例如纯文本通信协议)向gateway发送消息
2.gateway接收到YII2代码发送的消息,然后将消息通过websocket推送的监听的前端网页
所以整个看上去pushserver像是一个二传手,只是作为推送的服务器使用(其实推送还可以选用nodejs等天生就具备推送长项的框架来使用,模式基本一样套用即可)。
首先我们完成第一步,用YII2的php代码向gateway的内部协议发送推送数据。我们在pushServer/dev/applictions/blog中创建一个基于文本协议的gateway文件start_text_gateway.php代码如下:
<?php
use \Workerman\Worker;
use \GatewayWorker\Gateway;
use \Workerman\Autoloader;
require_once __DIR__ . '/../../Workerman/Autoloader.php';
Autoloader::setRootPath(__DIR__);
// #### 内部推送端口(假设当前服务器内网ip为本机) ####
$internal_gateway = new Gateway("Text://127.0.0.1:8806");
$internal_gateway->name='internalGateway';
$internal_gateway->startPort = 2800;
// 端口为start_register.php中监听的端口,websocket推送默认是1238
$internal_gateway->registerAddress = '127.0.0.1:1238';
// #### 内部推送端口设置完毕 ####
if(!defined('GLOBAL_START'))
{
Worker::runAll();
}
接下来我们在YII中创建一个测试的console类型的Pushcontroller(console/controller/Pushcontroller.php)
class PushController extends \yii\console\Controller
{
public function actionIndex()
{
$task_id = 'task 123456';
$open_id = 'open id 8888';
// 建立连接,@see http://php.net/manual/zh/function.stream-socket-client.php
$client = stream_socket_client('tcp://127.0.0.1:8806');
if(!$client) return "can not connect";
// 模拟超级用户,以文本协议发送数据,注意Text文本协议末尾有换行符(发送的数据中最好有能识别超级用户的字段)
//这样在Event.php中的onMessage方法中便能收到这个数据,然后做相应的处理即可
fwrite($client, '{"type":"send","task_id":"'.$task_id.'","openid":"'.$open_id.'"}'."\n");
}
}
接下来我们要完成在Events类中定义修改核心方法onMessage,这里会处理所有发来的消息,包括内部调用的消息和推送的消息
public static function onMessage($client_id, $message)
{
// debug
echo "client:{$_SERVER['REMOTE_ADDR']}:{$_SERVER['REMOTE_PORT']} gateway:{$_SERVER['GATEWAY_ADDR']}:{$_SERVER['GATEWAY_PORT']} client_id:$client_id session:"
.json_encode($_SESSION)." onMessage:".$message."\n";
// 客户端传递的是json数据
$message_data = json_decode(rtrim($message), true);
if(!$message_data)
{
echo "message_data is null return\n";
return ;
}
// 根据类型执行不同的业务
switch($message_data['type'])
{
// 客户端回应服务端的心跳
case 'pong':
return;
// 客户端初始化 message格式: {type:init, task_id:xx}
case 'init':
$task_id = $message_data['task_id'];
Gateway::bindUid($client_id,$task_id);
return;
// Yii代码推送 message格式: {type:send, task_id:xx,openid:xxxx,data:xxxxxx}
case 'send':
$task_id = $message_data['task_id'];
Gateway::sendToUid($task_id,'{"type":"login_status","openid":"'.$message_data['openid'].'","data":"xxx"}');
return;
}
}
修改pushServer/dev/start_for_win.bat
php Applications\blog\start_register.php Applications\blog\start_gateway.php Applications\blog\start_businessworker.php Applications\blog\start_text_gateway.php
pause
这个时候执行下start_for_win.bat就可以在console中看到server执行的结果了
----------------------- WORKERMAN -----------------------------
Workerman version:3.3.2 PHP version:5.6.20
------------------------ WORKERS -------------------------------
worker listen processes status
Register text://0.0.0.0:1238 1 [OK]
YourAppGateway Text://0.0.0.0:8282 4 [OK]
YourAppBusinessWorker none 4 [OK]
internalGateway Text://127.0.0.1:8806 1 [OK]
----------------------------------------------------------------
Press Ctrl-C to quit. Start success.
我们再启动一个console,执行yii中console的push方法调用测试看看
yii push
回到gateway的console上我们可以看到log显示
client:127.0.0.1:64400 gateway:127.0.0.1:8806 client_id:7f0000010af000000001 session:null onMessage:{"type":"send","task_id":"task 123456","openid":"open id 8888"}
看我们发送的消息已经被gateway所接收到了,接下来,我们继续完成模拟客户端websocket交互的代码
修改applications/blog/start_gateway.php修改gateway协议为websocket
$gateway = new Gateway("Websocket://0.0.0.0:7272");
打开一个chrome,f12打开工具栏选择console
var ws = new WebSocket("ws://127.0.0.1:7272");
ws.send('{"type":"init","task_id":"task 123456"}')
ws.onmessage = function(e){ console.log(e.data);};
然后去yii的console中执行推送代码
yii push
在浏览器中就会显示log {“type”:”login_status”,”openid”:”open id 8888″,”data”:”xxx”}
可见我没推送的数据已经被前端的浏览器获取到了,整个推送流程完成了。