init
- 框架初始化 - 安装插件 - 修复PHP8.4报错
This commit is contained in:
101
vendor/workerman/channel/README.md
vendored
Normal file
101
vendor/workerman/channel/README.md
vendored
Normal file
@@ -0,0 +1,101 @@
|
||||
# Channel
|
||||
基于订阅的多进程通讯组件,用于workerman进程间通讯或者服务器集群通讯,类似redis订阅发布机制。基于workerman开发。
|
||||
|
||||
Channel 提供两种通讯形式,分别是发布订阅的事件机制和消息队列机制。
|
||||
|
||||
它们的主要区别是:
|
||||
- 事件机制是消息发出后,所有订阅该事件的客户端都能收到消息。
|
||||
- 消息队列机制是消息发出后,所有订阅该消息的客户端只有一个会收到消息,如果客户端忙消息会进行排队直到有客户端闲置后重新取到消息。
|
||||
- 需要注意的是 Channel 只是提供一种通讯方式,本身并不提供消息确认、重试、延迟、持久化等功能,请根据实际情况合理使用。
|
||||
|
||||
# 手册地址
|
||||
[Channel手册](http://doc.workerman.net/components/channel.html)
|
||||
|
||||
# 服务端
|
||||
```php
|
||||
use Workerman\Worker;
|
||||
|
||||
//Tcp 通讯方式
|
||||
$channel_server = new Channel\Server('0.0.0.0', 2206);
|
||||
|
||||
//Unix Domain Socket 通讯方式
|
||||
//$channel_server = new Channel\Server('unix:///tmp/workerman-channel.sock');
|
||||
|
||||
if(!defined('GLOBAL_START'))
|
||||
{
|
||||
Worker::runAll();
|
||||
}
|
||||
```
|
||||
|
||||
# 客户端
|
||||
```php
|
||||
use Workerman\Worker;
|
||||
|
||||
$worker = new Worker();
|
||||
$worker->onWorkerStart = function()
|
||||
{
|
||||
// Channel客户端连接到Channel服务端
|
||||
Channel\Client::connect('<Channel服务端ip>', 2206);
|
||||
|
||||
// 使用 Unix Domain Socket 通讯
|
||||
//Channel\Client::connect('unix:///tmp/workerman-channel.sock');
|
||||
|
||||
// 要订阅的事件名称(名称可以为任意的数字和字符串组合)
|
||||
$event_name = 'event_xxxx';
|
||||
// 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调
|
||||
Channel\Client::on($event_name, function($event_data){
|
||||
var_dump($event_data);
|
||||
});
|
||||
};
|
||||
$worker->onMessage = function($connection, $data)
|
||||
{
|
||||
// 要发布的事件名称
|
||||
$event_name = 'event_xxxx';
|
||||
// 事件数据(数据格式可以为数字、字符串、数组),会传递给客户端回调函数作为参数
|
||||
$event_data = array('some data.', 'some data..');
|
||||
// 发布某个自定义事件,订阅这个事件的客户端会收到事件数据,并触发客户端对应的事件回调
|
||||
Channel\Client::publish($event_name, $event_data);
|
||||
};
|
||||
|
||||
if(!defined('GLOBAL_START'))
|
||||
{
|
||||
Worker::runAll();
|
||||
}
|
||||
````
|
||||
|
||||
## 消息队列示例
|
||||
```php
|
||||
use Workerman\Worker;
|
||||
use Workerman\Timer;
|
||||
|
||||
$worker = new Worker();
|
||||
$worker->name = 'Producer';
|
||||
$worker->onWorkerStart = function()
|
||||
{
|
||||
Client::connect();
|
||||
|
||||
$count = 0;
|
||||
Timer::add(1, function() {
|
||||
Client::enqueue('queue', 'Hello World '.time());
|
||||
});
|
||||
};
|
||||
|
||||
$mq = new Worker();
|
||||
$mq->name = 'Consumer';
|
||||
$mq->count = 4;
|
||||
$mq->onWorkerStart = function($worker) {
|
||||
Client::connect();
|
||||
|
||||
//订阅消息 queue
|
||||
Client::watch('queue', function($data) use ($worker) {
|
||||
echo "Worker {$worker->id} get queue: $data\n";
|
||||
});
|
||||
|
||||
//10 秒后取消订阅该消息
|
||||
Timer::add(10, function() {
|
||||
Client::unwatch('queue');
|
||||
}, [], false);
|
||||
};
|
||||
|
||||
Worker::runAll();
|
||||
```
|
||||
12
vendor/workerman/channel/composer.json
vendored
Normal file
12
vendor/workerman/channel/composer.json
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"name" : "workerman/channel",
|
||||
"type" : "library",
|
||||
"homepage": "http://www.workerman.net",
|
||||
"license" : "MIT",
|
||||
"require": {
|
||||
"workerman/workerman" : ">=4.0.12"
|
||||
},
|
||||
"autoload": {
|
||||
"psr-4": {"Channel\\": "./src"}
|
||||
}
|
||||
}
|
||||
394
vendor/workerman/channel/src/Client.php
vendored
Normal file
394
vendor/workerman/channel/src/Client.php
vendored
Normal file
@@ -0,0 +1,394 @@
|
||||
<?php
|
||||
namespace Channel;
|
||||
|
||||
use Workerman\Connection\AsyncTcpConnection;
|
||||
use Workerman\Timer;
|
||||
use Workerman\Protocols\Frame;
|
||||
|
||||
/**
|
||||
* Channel/Client
|
||||
* @version 1.0.7
|
||||
*/
|
||||
class Client
|
||||
{
|
||||
/**
|
||||
* onMessage.
|
||||
* @var callback
|
||||
*/
|
||||
public static $onMessage = null;
|
||||
|
||||
/**
|
||||
* onConnect
|
||||
* @var callback
|
||||
*/
|
||||
public static $onConnect = null;
|
||||
|
||||
/**
|
||||
* onClose
|
||||
* @var callback
|
||||
*/
|
||||
public static $onClose = null;
|
||||
|
||||
/**
|
||||
* Connction to channel server.
|
||||
* @var \Workerman\Connection\TcpConnection
|
||||
*/
|
||||
protected static $_remoteConnection = null;
|
||||
|
||||
/**
|
||||
* Channel server ip.
|
||||
* @var string
|
||||
*/
|
||||
protected static $_remoteIp = null;
|
||||
|
||||
/**
|
||||
* Channel server port.
|
||||
* @var int
|
||||
*/
|
||||
protected static $_remotePort = null;
|
||||
|
||||
/**
|
||||
* Reconnect timer.
|
||||
* @var Timer
|
||||
*/
|
||||
protected static $_reconnectTimer = null;
|
||||
|
||||
/**
|
||||
* Ping timer.
|
||||
* @var Timer
|
||||
*/
|
||||
protected static $_pingTimer = null;
|
||||
|
||||
/**
|
||||
* All event callback.
|
||||
* @var array
|
||||
*/
|
||||
protected static $_events = array();
|
||||
|
||||
/**
|
||||
* All queue callback.
|
||||
* @var callable
|
||||
*/
|
||||
protected static $_queues = array();
|
||||
|
||||
/**
|
||||
* @var bool
|
||||
*/
|
||||
protected static $_isWorkermanEnv = true;
|
||||
|
||||
/**
|
||||
* Ping interval.
|
||||
* @var int
|
||||
*/
|
||||
public static $pingInterval = 55;
|
||||
|
||||
/**
|
||||
* Connect to channel server
|
||||
* @param string $ip Channel server ip address or unix domain socket address
|
||||
* Ip like (TCP): 192.168.1.100
|
||||
* Unix domain socket like: unix:///tmp/workerman-channel.sock
|
||||
* @param int $port Port to connect when use tcp
|
||||
*/
|
||||
public static function connect($ip = '127.0.0.1', $port = 2206)
|
||||
{
|
||||
if (self::$_remoteConnection) {
|
||||
return;
|
||||
}
|
||||
|
||||
self::$_remoteIp = $ip;
|
||||
self::$_remotePort = $port;
|
||||
|
||||
if (PHP_SAPI !== 'cli' || !class_exists('Workerman\Worker', false)) {
|
||||
self::$_isWorkermanEnv = false;
|
||||
}
|
||||
|
||||
// For workerman environment.
|
||||
if (self::$_isWorkermanEnv) {
|
||||
if (strpos($ip, 'unix://') === false) {
|
||||
$conn = new AsyncTcpConnection('frame://' . self::$_remoteIp . ':' . self::$_remotePort);
|
||||
} else {
|
||||
$conn = new AsyncTcpConnection($ip);
|
||||
$conn->protocol = Frame::class;
|
||||
}
|
||||
|
||||
$conn->onClose = [self::class, 'onRemoteClose'];
|
||||
$conn->onConnect = [self::class, 'onRemoteConnect'];
|
||||
$conn->onMessage = [self::class , 'onRemoteMessage'];
|
||||
$conn->connect();
|
||||
|
||||
if (empty(self::$_pingTimer)) {
|
||||
self::$_pingTimer = Timer::add(self::$pingInterval, 'Channel\Client::ping');
|
||||
}
|
||||
// Not workerman environment.
|
||||
} else {
|
||||
$remote = strpos($ip, 'unix://') === false ? 'tcp://'.self::$_remoteIp.':'.self::$_remotePort : $ip;
|
||||
$conn = stream_socket_client($remote, $code, $message, 5);
|
||||
if (!$conn) {
|
||||
throw new \Exception($message);
|
||||
}
|
||||
}
|
||||
|
||||
self::$_remoteConnection = $conn;
|
||||
}
|
||||
|
||||
/**
|
||||
* onRemoteMessage.
|
||||
* @param \Workerman\Connection\TcpConnection $connection
|
||||
* @param string $data
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function onRemoteMessage($connection, $data)
|
||||
{
|
||||
$data = unserialize($data);
|
||||
$type = $data['type'];
|
||||
$event = $data['channel'];
|
||||
$event_data = $data['data'];
|
||||
|
||||
$callback = null;
|
||||
|
||||
if ($type == 'event') {
|
||||
if (!empty(self::$_events[$event])) {
|
||||
call_user_func(self::$_events[$event], $event_data);
|
||||
} elseif (!empty(Client::$onMessage)) {
|
||||
call_user_func(Client::$onMessage, $event, $event_data);
|
||||
} else {
|
||||
throw new \Exception("event:$event have not callback");
|
||||
}
|
||||
} else {
|
||||
if (isset(self::$_queues[$event])) {
|
||||
call_user_func(self::$_queues[$event], $event_data);
|
||||
} else {
|
||||
throw new \Exception("queue:$event have not callback");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ping.
|
||||
* @return void
|
||||
*/
|
||||
public static function ping()
|
||||
{
|
||||
if(self::$_remoteConnection)
|
||||
{
|
||||
self::$_remoteConnection->send('');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* onRemoteClose.
|
||||
* @return void
|
||||
*/
|
||||
public static function onRemoteClose()
|
||||
{
|
||||
Timer::add(0.5, function() {
|
||||
echo "Waring channel connection closed and try to reconnect\n";
|
||||
}, array(), false);
|
||||
self::$_remoteConnection = null;
|
||||
self::clearTimer();
|
||||
self::$_reconnectTimer = Timer::add(1, 'Channel\Client::connect', array(self::$_remoteIp, self::$_remotePort));
|
||||
if (self::$onClose) {
|
||||
call_user_func(Client::$onClose);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* onRemoteConnect.
|
||||
* @return void
|
||||
*/
|
||||
public static function onRemoteConnect()
|
||||
{
|
||||
$all_event_names = array_keys(self::$_events);
|
||||
if($all_event_names)
|
||||
{
|
||||
self::subscribe($all_event_names);
|
||||
}
|
||||
self::clearTimer();
|
||||
|
||||
if (self::$onConnect) {
|
||||
call_user_func(Client::$onConnect);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* clearTimer.
|
||||
* @return void
|
||||
*/
|
||||
public static function clearTimer()
|
||||
{
|
||||
if (!self::$_isWorkermanEnv) {
|
||||
throw new \Exception('Channel\\Client not support clearTimer method when it is not in the workerman environment.');
|
||||
}
|
||||
if(self::$_reconnectTimer)
|
||||
{
|
||||
Timer::del(self::$_reconnectTimer);
|
||||
self::$_reconnectTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On.
|
||||
* @param string $event
|
||||
* @param callback $callback
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function on($event, $callback)
|
||||
{
|
||||
if (!is_callable($callback)) {
|
||||
throw new \Exception('callback is not callable for event.');
|
||||
}
|
||||
self::$_events[$event] = $callback;
|
||||
self::subscribe($event);
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe.
|
||||
* @param string $events
|
||||
* @return void
|
||||
*/
|
||||
public static function subscribe($events)
|
||||
{
|
||||
$events = (array)$events;
|
||||
self::send(array('type' => 'subscribe', 'channels'=>$events));
|
||||
foreach ($events as $event) {
|
||||
if(!isset(self::$_events[$event])) {
|
||||
self::$_events[$event] = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe.
|
||||
* @param string $events
|
||||
* @return void
|
||||
*/
|
||||
public static function unsubscribe($events)
|
||||
{
|
||||
$events = (array)$events;
|
||||
self::send(array('type' => 'unsubscribe', 'channels'=>$events));
|
||||
foreach($events as $event) {
|
||||
unset(self::$_events[$event]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Publish.
|
||||
* @param string $events
|
||||
* @param mixed $data
|
||||
*/
|
||||
public static function publish($events, $data , $is_loop = false)
|
||||
{
|
||||
$type = $is_loop == true ? 'publishLoop' : 'publish';
|
||||
self::sendAnyway(array('type' => $type, 'channels' => (array)$events, 'data' => $data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Watch a channel of queue
|
||||
* @param string|array $channels
|
||||
* @param callable $callback
|
||||
* @param boolean $autoReserve Auto reserve after callback finished.
|
||||
* But sometime you may don't want reserve immediately, or in some asynchronous job,
|
||||
* you want reserve in finished callback, so you should set $autoReserve to false
|
||||
* and call Client::reserve() after watch() and in finish callback manually.
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function watch($channels, $callback, $autoReserve=true)
|
||||
{
|
||||
if (!is_callable($callback)) {
|
||||
throw new \Exception('callback is not callable for watch.');
|
||||
}
|
||||
|
||||
if ($autoReserve) {
|
||||
$callback = static function($data) use ($callback) {
|
||||
try {
|
||||
call_user_func($callback, $data);
|
||||
} catch (\Exception $e) {
|
||||
throw $e;
|
||||
} catch (\Error $e) {
|
||||
throw $e;
|
||||
} finally {
|
||||
self::reserve();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
$channels = (array)$channels;
|
||||
self::send(array('type' => 'watch', 'channels'=>$channels));
|
||||
|
||||
foreach ($channels as $channel) {
|
||||
self::$_queues[$channel] = $callback;
|
||||
}
|
||||
|
||||
if ($autoReserve) {
|
||||
self::reserve();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unwatch a channel of queue
|
||||
* @param string $channel
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function unwatch($channels)
|
||||
{
|
||||
$channels = (array)$channels;
|
||||
self::send(array('type' => 'unwatch', 'channels'=>$channels));
|
||||
foreach ($channels as $channel) {
|
||||
if (isset(self::$_queues[$channel])) {
|
||||
unset(self::$_queues[$channel]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Put data to queue
|
||||
* @param string|array $channels
|
||||
* @param mixed $data
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function enqueue($channels, $data)
|
||||
{
|
||||
self::sendAnyway(array('type' => 'enqueue', 'channels' => (array)$channels, 'data' => $data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Start reserve queue manual
|
||||
* @throws \Exception
|
||||
*/
|
||||
public static function reserve()
|
||||
{
|
||||
self::send(array('type' => 'reserve'));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send through workerman environment
|
||||
* @param $data
|
||||
* @throws \Exception
|
||||
*/
|
||||
protected static function send($data)
|
||||
{
|
||||
if (!self::$_isWorkermanEnv) {
|
||||
throw new \Exception("Channel\\Client not support {$data['type']} method when it is not in the workerman environment.");
|
||||
}
|
||||
self::connect(self::$_remoteIp, self::$_remotePort);
|
||||
self::$_remoteConnection->send(serialize($data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Send from any environment
|
||||
* @param $data
|
||||
* @throws \Exception
|
||||
*/
|
||||
protected static function sendAnyway($data)
|
||||
{
|
||||
self::connect(self::$_remoteIp, self::$_remotePort);
|
||||
$body = serialize($data);
|
||||
if (self::$_isWorkermanEnv) {
|
||||
self::$_remoteConnection->send($body);
|
||||
} else {
|
||||
$buffer = pack('N', 4+strlen($body)) . $body;
|
||||
fwrite(self::$_remoteConnection, $buffer);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
89
vendor/workerman/channel/src/Queue.php
vendored
Normal file
89
vendor/workerman/channel/src/Queue.php
vendored
Normal file
@@ -0,0 +1,89 @@
|
||||
<?php
|
||||
|
||||
namespace Channel;
|
||||
|
||||
use Workerman\Connection\TcpConnection;
|
||||
|
||||
class Queue
|
||||
{
|
||||
|
||||
public $name = 'default';
|
||||
public $watcher = array();
|
||||
public $consumer = array();
|
||||
protected $queue = null;
|
||||
|
||||
public function __construct($name)
|
||||
{
|
||||
$this->name = $name;
|
||||
$this->queue = new \SplQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param TcpConnection $connection
|
||||
*/
|
||||
public function addWatch($connection)
|
||||
{
|
||||
if (!isset($this->watcher[$connection->id])) {
|
||||
$this->watcher[$connection->id] = $connection;
|
||||
$connection->watchs[] = $this->name;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param TcpConnection $connection
|
||||
*/
|
||||
public function removeWatch($connection)
|
||||
{
|
||||
if (isset($connection->watchs) && in_array($this->name, $connection->watchs)) {
|
||||
$idx = array_search($this->name, $connection->watchs);
|
||||
unset($connection->watchs[$idx]);
|
||||
}
|
||||
if (isset($this->watcher[$connection->id])) {
|
||||
unset($this->watcher[$connection->id]);
|
||||
}
|
||||
if (isset($this->consumer[$connection->id])) {
|
||||
unset($this->consumer[$connection->id]);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param TcpConnection $connection
|
||||
*/
|
||||
public function addConsumer($connection)
|
||||
{
|
||||
if (isset($this->watcher[$connection->id]) && !isset($this->consumer[$connection->id])) {
|
||||
$this->consumer[$connection->id] = $connection;
|
||||
}
|
||||
$this->dispatch();
|
||||
}
|
||||
|
||||
public function enqueue($data)
|
||||
{
|
||||
$this->queue->enqueue($data);
|
||||
$this->dispatch();
|
||||
}
|
||||
|
||||
private function dispatch()
|
||||
{
|
||||
if ($this->queue->isEmpty() || count($this->consumer) == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (!$this->queue->isEmpty()) {
|
||||
$data = $this->queue->dequeue();
|
||||
$idx = key($this->consumer);
|
||||
$connection = $this->consumer[$idx];
|
||||
unset($this->consumer[$idx]);
|
||||
$connection->send(serialize(array('type'=>'queue', 'channel'=>$this->name, 'data' => $data)));
|
||||
if (count($this->consumer) == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function isEmpty()
|
||||
{
|
||||
return empty($this->watcher) && $this->queue->isEmpty();
|
||||
}
|
||||
|
||||
}
|
||||
179
vendor/workerman/channel/src/Server.php
vendored
Normal file
179
vendor/workerman/channel/src/Server.php
vendored
Normal file
@@ -0,0 +1,179 @@
|
||||
<?php
|
||||
namespace Channel;
|
||||
|
||||
use Workerman\Protocols\Frame;
|
||||
use Workerman\Worker;
|
||||
|
||||
/**
|
||||
* Channel server.
|
||||
*/
|
||||
class Server
|
||||
{
|
||||
/**
|
||||
* Worker instance.
|
||||
* @var Worker
|
||||
*/
|
||||
protected $_worker = null;
|
||||
|
||||
/**
|
||||
* Queues
|
||||
* @var Queue[]
|
||||
*/
|
||||
protected $_queues = array();
|
||||
|
||||
private $ip;
|
||||
|
||||
/**
|
||||
* Construct.
|
||||
* @param string $ip Bind ip address or unix domain socket.
|
||||
* Bind unix domain socket use 'unix:///tmp/channel.sock'
|
||||
* @param int $port Tcp port to bind, only used when listen on tcp.
|
||||
*/
|
||||
public function __construct($ip = '0.0.0.0', $port = 2206)
|
||||
{
|
||||
if (strpos($ip, 'unix:') === false) {
|
||||
$worker = new Worker("frame://$ip:$port");
|
||||
} else {
|
||||
$worker = new Worker($ip);
|
||||
$worker->protocol = Frame::class;
|
||||
}
|
||||
$this->ip = $ip;
|
||||
$worker->count = 1;
|
||||
$worker->name = 'ChannelServer';
|
||||
$worker->channels = array();
|
||||
$worker->onMessage = array($this, 'onMessage') ;
|
||||
$worker->onClose = array($this, 'onClose');
|
||||
$this->_worker = $worker;
|
||||
}
|
||||
|
||||
/**
|
||||
* onClose
|
||||
* @return void
|
||||
*/
|
||||
public function onClose($connection)
|
||||
{
|
||||
if (!empty($connection->channels)) {
|
||||
foreach ($connection->channels as $channel) {
|
||||
unset($this->_worker->channels[$channel][$connection->id]);
|
||||
if (empty($this->_worker->channels[$channel])) {
|
||||
unset($this->_worker->channels[$channel]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!empty($connection->watchs)) {
|
||||
foreach ($connection->watchs as $channel) {
|
||||
if (isset($this->_queues[$channel])) {
|
||||
$this->_queues[$channel]->removeWatch($connection);
|
||||
if ($this->_queues[$channel]->isEmpty()) {
|
||||
unset($this->_queues[$channel]);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* onMessage.
|
||||
* @param \Workerman\Connection\TcpConnection $connection
|
||||
* @param string $data
|
||||
*/
|
||||
public function onMessage($connection, $data)
|
||||
{
|
||||
if(!$data)
|
||||
{
|
||||
return;
|
||||
}
|
||||
$worker = $this->_worker;
|
||||
$data = unserialize($data);
|
||||
$type = $data['type'];
|
||||
switch($type)
|
||||
{
|
||||
case 'subscribe':
|
||||
foreach($data['channels'] as $channel)
|
||||
{
|
||||
$connection->channels[$channel] = $channel;
|
||||
$worker->channels[$channel][$connection->id] = $connection;
|
||||
}
|
||||
break;
|
||||
case 'unsubscribe':
|
||||
foreach($data['channels'] as $channel) {
|
||||
if (isset($connection->channels[$channel])) {
|
||||
unset($connection->channels[$channel]);
|
||||
}
|
||||
if (isset($worker->channels[$channel][$connection->id])) {
|
||||
unset($worker->channels[$channel][$connection->id]);
|
||||
if (empty($worker->channels[$channel])) {
|
||||
unset($worker->channels[$channel]);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'publish':
|
||||
foreach ($data['channels'] as $channel) {
|
||||
if (empty($worker->channels[$channel])) {
|
||||
continue;
|
||||
}
|
||||
$buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']));
|
||||
foreach ($worker->channels[$channel] as $connection) {
|
||||
$connection->send($buffer);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'publishLoop':
|
||||
//choose one subscriber from the list
|
||||
foreach ($data['channels'] as $channel) {
|
||||
if (empty($worker->channels[$channel])) {
|
||||
continue;
|
||||
}
|
||||
$buffer = serialize(array('type' => 'event', 'channel' => $channel, 'data' => $data['data']));
|
||||
|
||||
//这是要点,每次取出一个元素,如果取不到,说明已经到最后,重置到第一个
|
||||
$connection = next($worker->channels[$channel]);
|
||||
if( $connection == false ){
|
||||
$connection = reset($worker->channels[$channel]);
|
||||
}
|
||||
$connection->send($buffer);
|
||||
}
|
||||
break;
|
||||
case 'watch':
|
||||
foreach ($data['channels'] as $channel) {
|
||||
$this->getQueue($channel)->addWatch($connection);
|
||||
}
|
||||
break;
|
||||
case 'unwatch':
|
||||
foreach ($data['channels'] as $channel) {
|
||||
if (isset($this->_queues[$channel])) {
|
||||
$this->_queues[$channel]->removeWatch($connection);
|
||||
if ($this->_queues[$channel]->isEmpty()) {
|
||||
unset($this->_queues[$channel]);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
case 'enqueue':
|
||||
foreach ($data['channels'] as $channel) {
|
||||
$this->getQueue($channel)->enqueue($data['data']);
|
||||
}
|
||||
break;
|
||||
case 'reserve':
|
||||
if (isset($connection->watchs)) {
|
||||
foreach ($connection->watchs as $channel) {
|
||||
if (isset($this->_queues[$channel])) {
|
||||
$this->_queues[$channel]->addConsumer($connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private function getQueue($channel)
|
||||
{
|
||||
if (isset($this->_queues[$channel])) {
|
||||
return $this->_queues[$channel];
|
||||
}
|
||||
return ($this->_queues[$channel] = new Queue($channel));
|
||||
}
|
||||
|
||||
}
|
||||
53
vendor/workerman/channel/test/queue.php
vendored
Normal file
53
vendor/workerman/channel/test/queue.php
vendored
Normal file
@@ -0,0 +1,53 @@
|
||||
<?php
|
||||
|
||||
use Channel\Client;
|
||||
use Channel\Server;
|
||||
use Workerman\Worker;
|
||||
use Workerman\Timer;
|
||||
|
||||
// composer autoload
|
||||
include __DIR__ . '/../vendor/autoload.php';
|
||||
|
||||
$channel_server = new Server();
|
||||
|
||||
$worker = new Worker();
|
||||
$worker->name = 'Event';
|
||||
$worker->onWorkerStart = function()
|
||||
{
|
||||
Client::connect();
|
||||
|
||||
$count = 0;
|
||||
$timerId = Timer::add(0.01, function() use (&$timerId, &$count) {
|
||||
Client::publish('test event', 'some data');
|
||||
$count++;
|
||||
Client::enqueue('task-queue', time());
|
||||
if ($count == 1000) {
|
||||
Timer::del($timerId);
|
||||
}
|
||||
});
|
||||
|
||||
Timer::add(10, function() {
|
||||
Client::enqueue('task-queue', 'hello every 10 seconds');
|
||||
});
|
||||
};
|
||||
|
||||
$mq = new Worker();
|
||||
$mq->name = 'Queue';
|
||||
$mq->count = 4;
|
||||
$mq->onWorkerStart = function($worker) {
|
||||
Client::connect();
|
||||
$countDown = 20;
|
||||
$id = 1;
|
||||
Client::watch('task-queue', function($data) use ($worker, &$countDown, &$id) {
|
||||
echo "[$id] Worker {$worker->id} get queue: $data\n";
|
||||
sleep(0.2);
|
||||
$countDown--;
|
||||
$id++;
|
||||
if ($worker->id > 1 && $countDown == 0) {
|
||||
Client::unwatch('task-queue');
|
||||
}
|
||||
Timer::add(1, [Client::class, 'reserve'], [], false);
|
||||
});
|
||||
};
|
||||
|
||||
Worker::runAll();
|
||||
28
vendor/workerman/channel/test/server.php
vendored
Normal file
28
vendor/workerman/channel/test/server.php
vendored
Normal file
@@ -0,0 +1,28 @@
|
||||
<?php
|
||||
|
||||
use Channel\Client;
|
||||
use Channel\Server;
|
||||
use Workerman\Worker;
|
||||
use Workerman\Timer;
|
||||
|
||||
// composer autoload
|
||||
include __DIR__ . '/../vendor/autoload.php';
|
||||
|
||||
$channel_server = new Server();
|
||||
|
||||
$worker = new Worker();
|
||||
$worker->onWorkerStart = function()
|
||||
{
|
||||
Client::connect();
|
||||
|
||||
Client::on('test event', function($event_data){
|
||||
echo 'test event triggered event_data :';
|
||||
var_dump($event_data);
|
||||
});
|
||||
|
||||
Timer::add(2, function(){
|
||||
Client::publish('test event', 'some data');
|
||||
});
|
||||
};
|
||||
|
||||
Worker::runAll();
|
||||
23
vendor/workerman/channel/test/start_channel.php
vendored
Normal file
23
vendor/workerman/channel/test/start_channel.php
vendored
Normal file
@@ -0,0 +1,23 @@
|
||||
<?php
|
||||
/**
|
||||
* Created by PhpStorm.
|
||||
* User: Administrator
|
||||
* Date: 2022/2/20
|
||||
* Time: 12:00
|
||||
*/
|
||||
|
||||
include_once __DIR__ . '/vendor/autoload.php';
|
||||
|
||||
use Workerman\Worker;
|
||||
|
||||
$processName = "ChannelServerTest";
|
||||
Worker::$pidFile = "var/{$processName}.pid";
|
||||
Worker::$logFile = "var/{$processName}_logFile.log";
|
||||
Worker::$stdoutFile = "var/{$processName}_stdout.log";
|
||||
|
||||
$channel_server = new Channel\Server('0.0.0.0', 2206);
|
||||
|
||||
if(!defined('GLOBAL_START'))
|
||||
{
|
||||
Worker::runAll();
|
||||
}
|
||||
34
vendor/workerman/channel/test/start_client.php
vendored
Normal file
34
vendor/workerman/channel/test/start_client.php
vendored
Normal file
@@ -0,0 +1,34 @@
|
||||
<?php
|
||||
|
||||
include_once __DIR__ . '/vendor/autoload.php';
|
||||
|
||||
use Workerman\Worker;
|
||||
use Workerman\Lib\Timer;
|
||||
use Workerman\Connection\TcpConnection;
|
||||
use Workerman\Connection\AsyncUdpConnection;
|
||||
use Workerman\Connection\AsyncTcpConnection;
|
||||
|
||||
//监听端口
|
||||
$worker = new Worker("");
|
||||
|
||||
//开启进程数量
|
||||
$worker->count = 8;
|
||||
$processName = "client";
|
||||
$worker->name = $processName;
|
||||
$worker->reusePort = true; //开启均衡负载模式
|
||||
|
||||
Worker::$pidFile = "var/{$processName}.pid";
|
||||
Worker::$logFile = "var/{$processName}_logFile.log";
|
||||
Worker::$stdoutFile = "var/{$processName}_stdout.log";
|
||||
|
||||
$worker->onWorkerStart = function() use($worker){
|
||||
usleep(10);
|
||||
Channel\Client::connect('127.0.0.1' , 2206);
|
||||
$event_name = "test_channel";
|
||||
Channel\Client::on($event_name, function($event_data)use($worker ,$event_name ){
|
||||
$log_str = "{$worker->id} on {$event_name}:".json_encode($event_data,320)."\n";
|
||||
echo $log_str;
|
||||
});
|
||||
};
|
||||
|
||||
Worker::runAll();
|
||||
35
vendor/workerman/channel/test/start_send.php
vendored
Normal file
35
vendor/workerman/channel/test/start_send.php
vendored
Normal file
@@ -0,0 +1,35 @@
|
||||
<?php
|
||||
|
||||
include_once __DIR__ . '/vendor/autoload.php';
|
||||
|
||||
use Workerman\Worker;
|
||||
use Workerman\Lib\Timer;
|
||||
use Workerman\Connection\TcpConnection;
|
||||
use Workerman\Connection\AsyncUdpConnection;
|
||||
use Workerman\Connection\AsyncTcpConnection;
|
||||
|
||||
//监听端口
|
||||
$worker = new Worker("");
|
||||
|
||||
//开启进程数量
|
||||
$worker->count = 1;
|
||||
$processName = "send";
|
||||
$worker->name = $processName;
|
||||
$worker->reusePort = true; //开启均衡负载模式
|
||||
|
||||
Worker::$pidFile = "var/{$processName}.pid";
|
||||
Worker::$logFile = "var/{$processName}_logFile.log";
|
||||
Worker::$stdoutFile = "var/{$processName}_stdout.log";
|
||||
|
||||
$worker->onWorkerStart = function() use($worker){
|
||||
Channel\Client::connect('127.0.0.1' , 2206);
|
||||
Timer::add( 1 , function ()use($worker){
|
||||
$data_arr = [
|
||||
'time' => microtime(true),
|
||||
'date' => date("Y-m-d H:i:s"),
|
||||
];
|
||||
$event_name = "test_channel";
|
||||
Channel\Client::publish($event_name, $data_arr , true);
|
||||
});
|
||||
};
|
||||
Worker::runAll();
|
||||
Reference in New Issue
Block a user