|
@@ -17,9 +17,13 @@ declare (strict_types=1);
|
|
|
|
|
|
namespace think\admin\service;
|
|
|
|
|
|
+use Error;
|
|
|
use think\admin\Exception;
|
|
|
use think\admin\extend\CodeExtend;
|
|
|
use think\admin\Service;
|
|
|
+use think\db\exception\DataNotFoundException;
|
|
|
+use think\db\exception\DbException;
|
|
|
+use think\db\exception\ModelNotFoundException;
|
|
|
|
|
|
/**
|
|
|
* 任务基础服务
|
|
@@ -55,21 +59,21 @@ class QueueService extends Service
|
|
|
|
|
|
/**
|
|
|
* 数据初始化
|
|
|
- * @param integer $code
|
|
|
+ * @param string $code
|
|
|
* @return static
|
|
|
- * @throws \think\admin\Exception
|
|
|
- * @throws \think\db\exception\DataNotFoundException
|
|
|
- * @throws \think\db\exception\DbException
|
|
|
- * @throws \think\db\exception\ModelNotFoundException
|
|
|
+ * @throws Exception
|
|
|
+ * @throws DataNotFoundException
|
|
|
+ * @throws DbException
|
|
|
+ * @throws ModelNotFoundException
|
|
|
*/
|
|
|
- public function initialize($code = 0): QueueService
|
|
|
+ public function initialize(string $code = ''): QueueService
|
|
|
{
|
|
|
if (!empty($code)) {
|
|
|
$this->code = $code;
|
|
|
$this->record = $this->app->db->name('SystemQueue')->where(['code' => $this->code])->find();
|
|
|
if (empty($this->record)) {
|
|
|
$this->app->log->error("Qeueu initialize failed, Queue {$code} not found.");
|
|
|
- throw new \think\admin\Exception("Qeueu initialize failed, Queue {$code} not found.");
|
|
|
+ throw new Exception("Qeueu initialize failed, Queue {$code} not found.");
|
|
|
}
|
|
|
[$this->code, $this->title] = [$this->record['code'], $this->record['title']];
|
|
|
$this->data = json_decode($this->record['exec_data'], true) ?: [];
|
|
@@ -81,16 +85,16 @@ class QueueService extends Service
|
|
|
* 重发异步任务
|
|
|
* @param integer $wait 等待时间
|
|
|
* @return $this
|
|
|
- * @throws \think\admin\Exception
|
|
|
- * @throws \think\db\exception\DataNotFoundException
|
|
|
- * @throws \think\db\exception\DbException
|
|
|
- * @throws \think\db\exception\ModelNotFoundException
|
|
|
+ * @throws Exception
|
|
|
+ * @throws DataNotFoundException
|
|
|
+ * @throws DbException
|
|
|
+ * @throws ModelNotFoundException
|
|
|
*/
|
|
|
- public function reset($wait = 0): QueueService
|
|
|
+ public function reset(int $wait = 0): QueueService
|
|
|
{
|
|
|
if (empty($this->record)) {
|
|
|
$this->app->log->error("Qeueu reset failed, Queue {$this->code} data cannot be empty!");
|
|
|
- throw new \think\admin\Exception("Qeueu reset failed, Queue {$this->code} data cannot be empty!");
|
|
|
+ throw new Exception("Qeueu reset failed, Queue {$this->code} data cannot be empty!");
|
|
|
}
|
|
|
$this->app->db->name('SystemQueue')->where(['code' => $this->code])->strict(false)->failException(true)->update([
|
|
|
'exec_pid' => 0, 'exec_time' => time() + $wait, 'status' => 1,
|
|
@@ -103,13 +107,13 @@ class QueueService extends Service
|
|
|
* @param integer $loops 循环时间
|
|
|
* @return $this
|
|
|
* @throws Exception
|
|
|
- * @throws \think\db\exception\DataNotFoundException
|
|
|
- * @throws \think\db\exception\DbException
|
|
|
- * @throws \think\db\exception\ModelNotFoundException
|
|
|
+ * @throws DataNotFoundException
|
|
|
+ * @throws DbException
|
|
|
+ * @throws ModelNotFoundException
|
|
|
*/
|
|
|
- public function addCleanQueue($loops = 3600): QueueService
|
|
|
+ public function addCleanQueue(int $loops = 3600): QueueService
|
|
|
{
|
|
|
- return $this->register('定时清理系统任务数据', "xadmin:queue clean", 0, [], 0, $loops);
|
|
|
+ return $this->register('定时清理系统任务数据', "xadmin:service clean", 0, [], 0, $loops);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -121,16 +125,16 @@ class QueueService extends Service
|
|
|
* @param integer $rscript 任务类型(0单例,1多例)
|
|
|
* @param integer $loops 循环等待时间
|
|
|
* @return $this
|
|
|
- * @throws \think\admin\Exception
|
|
|
- * @throws \think\db\exception\DataNotFoundException
|
|
|
- * @throws \think\db\exception\DbException
|
|
|
- * @throws \think\db\exception\ModelNotFoundException
|
|
|
+ * @throws Exception
|
|
|
+ * @throws DataNotFoundException
|
|
|
+ * @throws DbException
|
|
|
+ * @throws ModelNotFoundException
|
|
|
*/
|
|
|
public function register(string $title, string $command, int $later = 0, array $data = [], int $rscript = 0, int $loops = 0): QueueService
|
|
|
{
|
|
|
$map = [['title', '=', $title], ['status', 'in', [1, 2]]];
|
|
|
if (empty($rscript) && ($queue = $this->app->db->name('SystemQueue')->where($map)->find())) {
|
|
|
- throw new \think\admin\Exception(lang('think_library_queue_exist'), 0, $queue['code']);
|
|
|
+ throw new Exception(lang('think_library_queue_exist'), 0, $queue['code']);
|
|
|
}
|
|
|
$this->code = CodeExtend::uniqidDate(16, 'Q');
|
|
|
$this->app->db->name('SystemQueue')->strict(false)->failException(true)->insert([
|
|
@@ -145,19 +149,19 @@ class QueueService extends Service
|
|
|
'outer_time' => 0,
|
|
|
'loops_time' => $loops,
|
|
|
]);
|
|
|
- $this->progress(1, '>>> 任务创建成功 <<<', 0.00);
|
|
|
+ $this->progress(1, '>>> 任务创建成功 <<<', '0.00');
|
|
|
return $this->initialize($this->code);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 设置任务进度信息
|
|
|
- * @param null|integer $status 任务状态
|
|
|
- * @param null|string $message 进度消息
|
|
|
- * @param null|float $progress 进度数值
|
|
|
+ * @param ?integer $status 任务状态
|
|
|
+ * @param ?string $message 进度消息
|
|
|
+ * @param ?string $progress 进度数值
|
|
|
* @param integer $backline 回退信息行
|
|
|
* @return array
|
|
|
*/
|
|
|
- public function progress(?int $status = null, ?string $message = null, $progress = null, $backline = 0): array
|
|
|
+ public function progress(?int $status = null, ?string $message = null, ?string $progress = null, int $backline = 0): array
|
|
|
{
|
|
|
$ckey = "queue_{$this->code}_progress";
|
|
|
if (is_numeric($status) && intval($status) === 3) {
|
|
@@ -172,12 +176,10 @@ class QueueService extends Service
|
|
|
$data = $this->app->cache->get($ckey, [
|
|
|
'code' => $this->code, 'status' => $status, 'message' => $message, 'progress' => $progress, 'history' => [],
|
|
|
]);
|
|
|
- } catch (\Exception | \Error $exception) {
|
|
|
+ } catch (\Exception | Error $exception) {
|
|
|
return $this->progress($status, $message, $progress, $backline);
|
|
|
}
|
|
|
- while ($backline > 0 && count($data['history']) > 0) {
|
|
|
- [--$backline, array_pop($data['history'])];
|
|
|
- }
|
|
|
+ while (--$backline > -1 && count($data['history']) > 0) array_pop($data['history']);
|
|
|
if (is_numeric($status)) $data['status'] = intval($status);
|
|
|
if (is_numeric($progress)) $progress = str_pad(sprintf("%.2f", $progress), 6, '0', STR_PAD_LEFT);
|
|
|
if (is_string($message) && is_null($progress)) {
|
|
@@ -207,7 +209,7 @@ class QueueService extends Service
|
|
|
* @param string $message 文字描述
|
|
|
* @param integer $backline 回退行数
|
|
|
*/
|
|
|
- public function message(int $total, int $count, string $message = '', $backline = 0): void
|
|
|
+ public function message(int $total, int $count, string $message = '', int $backline = 0): void
|
|
|
{
|
|
|
$total = $total < 1 ? 1 : $total;
|
|
|
$prefix = str_pad("{$count}", strlen("{$total}"), '0', STR_PAD_LEFT);
|
|
@@ -242,7 +244,7 @@ class QueueService extends Service
|
|
|
/**
|
|
|
* 执行任务处理
|
|
|
* @param array $data 任务参数
|
|
|
- * @return mixed
|
|
|
+ * @return void
|
|
|
*/
|
|
|
public function execute(array $data = [])
|
|
|
{
|