Yii2基于workerman的pushserver实现消息推送

作者: siediyer 分类: linux 发布时间: 2019-01-29 10:40

在实际项目中即时消息的展示必不可少,比如站内信,在线客服等,一般这类实现由两种方式,一种是采用ajax轮询的方式拉取,还有一种就是采用websocket主推推送的方式。其中主动推送可以节约服务器资源,有效减少无效的访问和查询,所以越来越被大家所推崇。接下来我们就讲讲如何采用websocket的方式完成一次消息推送。

在php中推送的框架很多,比较出名和好用的应该是swoole和workerman。其中swoole是php的c扩展,workerman纯php写的socket处理框架。有个区别就是swoole没有windows版本,而workerman有windows版本,这样我们就可以在win上开发,无缝迁移到linux,so我们就采用workerman作为框架。

首先下载开发用的win版本workerman增强版Gateway代码https://github.com/walkor/GatewayWorker-for-win 在YII的根目录创建一个目录叫pushServer,然后修改下载的源码根目录文件夹名称为dev表示我们的开发环境(还可以下载linux版本的放进来叫product表示生产环境)。修改applications下面的文件夹yourApp为blog。所以最终的文件夹目录结构如下:

pushServer        
    ├── dev
        ├── Applications // 这里是所有开发者应用项目
        │   └── blog  // 其中一个项目目录,目录名可以自定义
        │       ├── Events.php // 开发者只需要关注这个文件
        │       ├── start_gateway.php // gateway进程启动脚本
        │       ├── start_businessworker.php // businessWorker进程启动脚本
        │       └── start_register.php // 注册服务启动脚本
        │
        ├── GatewayWorker // GatewayWorker框架代码
        │   ├── BusinessWorker.php  // BusinessWorker进程实现
        │   ├── Gateway.php         // Gateway进程实现
        │   ├── Register.php        // 注册服务进程实现
        │   ├── Lib
        │   │   ├── Context.php     // Gateway与BusinessWorker通信上下文
        │   │   ├── DbConnection.php// 一个数据库连接类
        │   │   ├── Db.php          // 数据库连接管理类
        │   │   └── Gateway.php     // Gateway通信接口类,给Events.php调用
        │   └──── Protocols
        │         └── GatewayProtocol.php // Gateway与BusinessWorker通信协议
        │
        ├── Workerman // workerman内核目录
        │
        └── start.php // 全局启动脚本

使用gateway推送的整个推送流程可以参考官方文档http://www.workerman.net/gatewaydoc/advanced/push.html

1.Yii2通过gateway提供的内部协议(例如纯文本通信协议)向gateway发送消息

2.gateway接收到YII2代码发送的消息,然后将消息通过websocket推送的监听的前端网页

所以整个看上去pushserver像是一个二传手,只是作为推送的服务器使用(其实推送还可以选用nodejs等天生就具备推送长项的框架来使用,模式基本一样套用即可)。

首先我们完成第一步,用YII2的php代码向gateway的内部协议发送推送数据。我们在pushServer/dev/applictions/blog中创建一个基于文本协议的gateway文件start_text_gateway.php代码如下:

<?php
use \Workerman\Worker;
use \GatewayWorker\Gateway;
use \Workerman\Autoloader;
require_once __DIR__ . '/../../Workerman/Autoloader.php';
Autoloader::setRootPath(__DIR__);


// #### 内部推送端口(假设当前服务器内网ip为本机) ####
$internal_gateway = new Gateway("Text://127.0.0.1:8806");
$internal_gateway->name='internalGateway';
$internal_gateway->startPort = 2800;
// 端口为start_register.php中监听的端口,websocket推送默认是1238
$internal_gateway->registerAddress = '127.0.0.1:1238';
// #### 内部推送端口设置完毕 ####


if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

接下来我们在YII中创建一个测试的console类型的Pushcontroller(console/controller/Pushcontroller.php)

class PushController extends \yii\console\Controller
{
    public function actionIndex()
    {
        $task_id = 'task 123456';
        $open_id = 'open id 8888';
        // 建立连接,@see http://php.net/manual/zh/function.stream-socket-client.php
        $client = stream_socket_client('tcp://127.0.0.1:8806');
        if(!$client) return "can not connect";
        // 模拟超级用户,以文本协议发送数据,注意Text文本协议末尾有换行符(发送的数据中最好有能识别超级用户的字段)
      //这样在Event.php中的onMessage方法中便能收到这个数据,然后做相应的处理即可
        fwrite($client, '{"type":"send","task_id":"'.$task_id.'","openid":"'.$open_id.'"}'."\n");
    }
}

接下来我们要完成在Events类中定义修改核心方法onMessage,这里会处理所有发来的消息,包括内部调用的消息和推送的消息

public static function onMessage($client_id, $message)
   {
        // debug
        echo "client:{$_SERVER['REMOTE_ADDR']}:{$_SERVER['REMOTE_PORT']} gateway:{$_SERVER['GATEWAY_ADDR']}:{$_SERVER['GATEWAY_PORT']}  client_id:$client_id session:"
         .json_encode($_SESSION)." onMessage:".$message."\n";
        
        // 客户端传递的是json数据
        $message_data = json_decode(rtrim($message), true);
        if(!$message_data)
        {
            echo "message_data is null return\n";
            return ;
        }
       // 根据类型执行不同的业务
       switch($message_data['type'])
       {
           // 客户端回应服务端的心跳
           case 'pong':
               return;
           // 客户端初始化 message格式: {type:init, task_id:xx}
           case 'init':
               $task_id = $message_data['task_id'];
               Gateway::bindUid($client_id,$task_id);
               return;
        // Yii代码推送 message格式: {type:send, task_id:xx,openid:xxxx,data:xxxxxx}
           case 'send':
               $task_id = $message_data['task_id'];
               Gateway::sendToUid($task_id,'{"type":"login_status","openid":"'.$message_data['openid'].'","data":"xxx"}');
               return; 
       }
   }

修改pushServer/dev/start_for_win.bat

php Applications\blog\start_register.php Applications\blog\start_gateway.php Applications\blog\start_businessworker.php Applications\blog\start_text_gateway.php 
pause

这个时候执行下start_for_win.bat就可以在console中看到server执行的结果了

----------------------- WORKERMAN -----------------------------
Workerman version:3.3.2          PHP version:5.6.20
------------------------ WORKERS -------------------------------
worker                 listen                 processes status
Register                text://0.0.0.0:1238     1        [OK]
YourAppGateway           Text://0.0.0.0:8282     4        [OK]
YourAppBusinessWorker         none               4        [OK]
internalGateway            Text://127.0.0.1:8806    1        [OK]
----------------------------------------------------------------
Press Ctrl-C to quit. Start success.

我们再启动一个console,执行yii中console的push方法调用测试看看

yii push

回到gateway的console上我们可以看到log显示

client:127.0.0.1:64400 gateway:127.0.0.1:8806  client_id:7f0000010af000000001 session:null onMessage:{"type":"send","task_id":"task 123456","openid":"open id 8888"}

看我们发送的消息已经被gateway所接收到了,接下来,我们继续完成模拟客户端websocket交互的代码

修改applications/blog/start_gateway.php修改gateway协议为websocket

$gateway = new Gateway("Websocket://0.0.0.0:7272");

打开一个chrome,f12打开工具栏选择console

var ws = new WebSocket("ws://127.0.0.1:7272");
ws.send('{"type":"init","task_id":"task 123456"}')
ws.onmessage = function(e){ console.log(e.data);};

然后去yii的console中执行推送代码

yii push

在浏览器中就会显示log {“type”:”login_status”,”openid”:”open id 8888″,”data”:”xxx”}

可见我没推送的数据已经被前端的浏览器获取到了,整个推送流程完成了。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

Title - Artist
0:00