您的位置:首页 > 数据库 > Redis

PHP redis workman实现定时广播,远程控制功能

2017-11-22 10:18 459 查看


<?php
namespace app\terminal\library;

use app\terminal\model\TerminalMessageLogModel;
use Predis\Client;
use think\Config;

abstract class MessagePool
{
protected $messageType;
protected $redis;
protected $aheadTime = 5 * 60;// 提前发送时间,单位:秒
protected $expiredTime = 5 * 60;// 过期时间,单位:秒
protected $message;
protected $isInstant = false;

protected abstract function _getTerminals($terminalMessage);
protected abstract function _sendMessage($terminalMessage, $params, $terminalId);

public function __construct($isInstant = false)
{
Config::load(APP_PATH . 'terminal/config.php');
$options = Config::get('redis');
$this->redis = new Client($options);

if ($isInstant) {
$this->isInstant = true;
$this->messageType = $this->messageType . 'Instant';
}
}

public function add($message)
{
$redis = $this->redis;
$messageType = $this->messageType;

$redis->hmset($this->_getMessageSetName($messageType, $message['id']), $message);

// 添加待发送列表
$nextExecuteTime = $this->isInstant ? time() : $this->_getNextExecuteTime($message);
//dump(date('Y-m-d H:i:s', $nextExecuteTime));
$redis->zadd($this->_getSendingQueueName($messageType), $nextExecuteTime , $message['id']);

// 添加待过期列表
$lastExecuteTime = $this->isInstant ? time() : $this->_getLastExecuteTime($message);
//dump(date('Y-m-d H:i:s', $lastExecuteTime));
$redis->zadd($this->_getExpiringQueueName($messageType), $lastExecuteTime , $message['id']);

// 即时执行
if ($this->isInstant) {
$this->consume();
}
}

/**
* 更新消息
* @param $message
*/
public function update($message)
{
$this->remove($message['id']);
$this->add($message);
}

/**
* 删除某个消息
* @param $messageId
*/
public function remove($messageId)
{
$redis = $this->redis;
$messageType = $this->messageType;

$messageSetName = $this->_getMessageSetName($this->messageType, $messageId);
$this->deleteFromHset($messageSetName);

$redis->zrem($this->_getSendingQueueName($messageType), $messageId);
$redis->zrem($this->_getExpiringQueueName($messageType), $messageId);
}

/**
* 用于定时地生成发送日志并发送消息
* 处理流程:从有序集合里面获取最近要执行的消息 -> 获取目标终端 ID -> 记录发送日志 -> 更新有序集合 -> 发送消息
*/
public function consume()
{
$redis = $this->redis;
$messageType = $this->messageType;

// 获取待发送的消息
$sendTimeRange = time() + $this->aheadTime;
//dump(date('Y-m-d H:i:s', $sendTimeRange));
$sending = $redis->zrangebyscore($this->_getSendingQueueName($messageType) , 0 , $sendTimeRange, 'withscores');

// 获取待发送消息,写发送日志
foreach ($sending as $messageId => $sendTime) {
// 获取待发送消息详情
$terminalMessage = $redis->hgetall($this->_getMessageSetName($messageType, $messageId));
// 获取目标终端
$terminals = $this->_getTerminals($terminalMessage);

// 记录发送日志
foreach ($terminals as $terminal) {
// 把发送日志记录到 MySQL
$log = new TerminalMessageLogModel([
'message_type' => TerminalMessageLogModel::MESSAGE_TYPES[$messageType],
'message_id' => $messageId,
'device_id' => $terminal['device_id'],
'terminal_id' => $terminal['id'],
'send_time' => date('Y-m-d H:i:s', $sendTime)
]);
$log->save();

// 把发送日志记录到 Redis
$sendLog = [
'message_id' => $messageId,
'terminal_id' => $terminal['id'],
'send_time' => $sendTime
];
$redis->hmset($this->_getSendLogSetName($messageType, $log['id']), $sendLog);
$redis->zadd($this->_getSendLogQueueName($messageType), $sendTime , $log['id']);
}
}

// 删除有序集合里面的发送任务
$redis->zremrangebyscore($this->_getSendingQueueName($messageType), 0, $sendTimeRange);

// 根据发送日志调用消息接口发送
$this->message = new Message();
$sendLogKeys = $redis->keys($this->_getSendLogSetName($messageType, '*'));
foreach ($sendLogKeys as $sendLogKey) {
$sendLog = $redis->hgetall($sendLogKey);
$terminalMessage = $redis->hgetall($this->_getMessageSetName($messageType, $sendLog['message_id']));

$params = [
'log_id' => $this->_getSendLogId($sendLogKey),
'send_time' => date('Y-m-d H:i:s', $sendLog['send_time']),
];
$this->_sendMessage($terminalMessage, $params, $sendLog['terminal_id']);
}
}

/**
* 定时清除过期的消息和发送日志
*/
public function clear()
{
$redis = $this->redis;
$messageType = $this->messageType;

// 清理过期的消息
$expiredTimeRange = time() - $this->expiredTime;
$expiring = $redis->zrangebyscore($this->_getExpiringQueueName($messageType), 0, $expiredTimeRange);
foreach ($expiring as $messageId) {
$messageSetName = $this->_getMessageSetName($messageType, $messageId);
$this->deleteFromHset($messageSetName);
}
$redis->zremrangebyscore($this->_getExpiringQueueName($messageType), 0, $expiredTimeRange);

// 清理过期的发送日志
$expiring = $redis->zrangebyscore($this->_getSendLogQueueName($messageType), 0, $expiredTimeRange);
foreach ($expiring as $logId) {
$sendLogSetName = $this->_getSendLogSetName($messageType, $logId);
$this->deleteFromHset($sendLogSetName);
}
$redis->zremrangebyscore($this->_getSendLogQueueName($messageType), 0, $expiredTimeRange);

return true;
}

/**
* 定时任务,每晚11点50分执行,创建第二天的执行时间有序集合
* @param $message
* @return false|int
* @throws MessageException
*/
public function createExecuteTimeSet()
{

}

private function _getNextExecuteTime($message)
{
$today = date('Y-m-d');
$week = explode(',', $message['week']);
if (trim($week[0]) == '') {
throw new MessageException('星期数据为空');
}

if ($message['end_date'] < $today) {
throw new MessageException('任务已过期');
}

$beginDate = $message['begin_date'] < $today ? $today : $message['begin_date'];
$executeDate = date('H:i:s') < $message['time'] ? strtotime($beginDate) : strtotime('+1 days', strtotime($beginDate));// 下次执行的日期,时间戳格式
$executeDayInWeek = date('N', $executeDate);// 一周中的第几天(星期一为第一天)

sort($week);
$week[count($week)] = $week[0] + 7;// 一周七天
foreach ($week as $w) {
if ($executeDayInWeek <= $w) {
$executeDate = strtotime('+'.strval($w-$executeDayInWeek).' days', $executeDate);
$executeDate = date('Y-m-d', $executeDate);
if ($executeDate > $message['end_date']) {
throw new MessageException('此任务没有执行时间');
}
return strtotime($executeDate . ' ' . $message['time']);
}
}

throw new MessageException('获取下次执行时间失败');
}

private function _getLastExecuteTime($message)
{
$today = date('Y-m-d');
$week = explode(',', $message['week']);
if (trim($week[0]) == '') {
throw new MessageException('星期数据为空');
}

if ($message['end_date'] < $today ||
($message['end_date'] == $today && $message['time'] < date('H:i:s'))
) {
throw new MessageException('此任务已过期');
}

$executeDate = strtotime($message['end_date']);
$executeDayInWeek = date('N', $executeDate);// 一周中的第几天(星期一为第一天)

rsort($week);// 降序排列
$week[count($week)] = $week[0] - 7;// 一周七天
foreach ($week as $w) {
if ($executeDayInWeek >= $w) {
$executeDate = strtotime('-'.strval($executeDayInWeek-$w).' days', $executeDate);
$executeDate = date('Y-m-d', $executeDate);
if ($executeDate . $message['time'] < $today . date('H:i:s')) {
throw new MessageException('此任务已过期');
}
return strtotime($executeDate . ' ' . $message['time']);
}
}

throw new MessageException('获取最后执行时间失败');
}

private function _getMessageSetName($messageType, $messageId)
{
return "messageSet:$messageType:$messageId";
}

/**
* 获取待发送的消息的有序集合名。集合元素为 message_id,分值为为此消息的下一次发送时间
* @param $messageType
* @return string
*/
private function _getSendingQueueName($messageType)
{
return "sendingQueue:$messageType";
}

/**
* 获取待过期的消息的有序集合名。集合元素为 message_id,分值为此消息的最后一次发送时间
* @param $messageType
* @return string
*/
private function _getExpiringQueueName($messageType)
{
return "expiringQueue:$messageType";
}

private function _getSendLogSetName($messageType, $logId)
{
return "sendLog:$messageType:$logId";
}

private function _getSendLogId($sendLogKey)
{
return explode(':', $sendLogKey)[2];
}

private function _getSendLogQueueName($messageType)
{
return "sendLogQueue:$messageType";
}

/**
* 根据键名删除一个 Redis 哈希集合
* @param $key
*/
private function deleteFromHset($hsetName)
{
$redis = $this->redis;

$keys = $redis->hkeys($hsetName);
foreach ($keys as $key) {
$redis->hdel($hsetName, $key);
}

return true;
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: