|
@@ -98,11 +98,11 @@ class Queue extends Command
|
|
|
if ($this->process->iswin()) {
|
|
|
$this->process->exec("start http://{$host}:{$port}");
|
|
|
}
|
|
|
- $this->output->writeln("WebServer process already exist for pid {$result['0']['pid']}");
|
|
|
+ $this->output->writeln("WebServer process already exist for pid {$result[0]['pid']}");
|
|
|
} else {
|
|
|
[$this->process->create($command), usleep(2000)];
|
|
|
if (count($result = $this->process->query($command)) > 0) {
|
|
|
- $this->output->writeln("WebServer process started successfully for pid {$result['0']['pid']}");
|
|
|
+ $this->output->writeln("WebServer process started successfully for pid {$result[0]['pid']}");
|
|
|
if ($this->process->iswin()) {
|
|
|
$this->process->exec("start http://{$host}:{$port}");
|
|
|
}
|
|
@@ -192,7 +192,7 @@ class Queue extends Command
|
|
|
$this->app->db->name($this->table)->whereOr([$map1, $map2])->chunk(100, function (Collection $result) use ($total, &$loops, &$timeout) {
|
|
|
foreach ($result->toArray() as $item) {
|
|
|
$item['loops_time'] > 0 ? $loops++ : $timeout++;
|
|
|
- $prefix = str_pad($timeout + $loops, strlen("{$total}"), '0', STR_PAD_LEFT);
|
|
|
+ $prefix = str_pad($timeout + $loops, strlen("{$total}"), 0, STR_PAD_LEFT);
|
|
|
if ($item['loops_time'] > 0) {
|
|
|
$this->setQueueProgress("[{$prefix}/{$total}] 正在重置任务 {$item['code']} 为运行", ($timeout + $loops) * 100 / $total);
|
|
|
[$status, $message] = [1, intval($item['status']) === 4 ? '任务执行失败,已自动重置任务!' : '任务执行超时,已自动重置任务!'];
|
|
@@ -233,7 +233,7 @@ class Queue extends Command
|
|
|
$this->output->writeln("\tYou can exit with <info>`CTRL-C`</info>");
|
|
|
$this->output->writeln('============== LISTENING ==============');
|
|
|
while (true) {
|
|
|
- $where = [['status', '=', '1'], ['exec_time', '<=', time()]];
|
|
|
+ list($last, $where) = [microtime(true), [['status', '=', 1], ['exec_time', '<=', time()]]];
|
|
|
$this->app->db->name($this->table)->where($where)->order('exec_time asc')->chunk(100, function (Collection $result) {
|
|
|
foreach ($result->toArray() as $vo) try {
|
|
|
$command = $this->process->think("xadmin:queue dorun {$vo['code']} -");
|
|
@@ -245,12 +245,14 @@ class Queue extends Command
|
|
|
}
|
|
|
} catch (\Exception $exception) {
|
|
|
$this->app->db->name($this->table)->where(['code' => $vo['code']])->update([
|
|
|
- 'status' => '4', 'outer_time' => time(), 'exec_desc' => $exception->getMessage(),
|
|
|
+ 'status' => 4, 'outer_time' => time(), 'exec_desc' => $exception->getMessage(),
|
|
|
]);
|
|
|
$this->output->error("Execution failed -> [{$vo['code']}] {$vo['title']},{$exception->getMessage()}");
|
|
|
}
|
|
|
});
|
|
|
- usleep(500000);
|
|
|
+ if (microtime(true) - $last < 0.5000) {
|
|
|
+ usleep(500000);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -268,12 +270,12 @@ class Queue extends Command
|
|
|
$this->queue->initialize($this->code);
|
|
|
if (empty($this->queue->record) || intval($this->queue->record['status']) !== 1) {
|
|
|
// 这里不做任何处理(该任务可能在其它地方已经在执行)
|
|
|
- $this->output->warning($message = "The or status of task {$this->code} is abnormal");
|
|
|
+ $this->output->warning("The or status of task {$this->code} is abnormal");
|
|
|
} else {
|
|
|
// 锁定任务状态,防止任务再次被执行
|
|
|
$this->app->db->name($this->table)->strict(false)->where(['code' => $this->code])->update([
|
|
|
'enter_time' => microtime(true), 'attempts' => $this->app->db->raw('attempts+1'),
|
|
|
- 'outer_time' => '0', 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => '2',
|
|
|
+ 'outer_time' => 0, 'exec_pid' => getmypid(), 'exec_desc' => '', 'status' => 2,
|
|
|
]);
|
|
|
$this->queue->progress(2, '>>> 任务处理开始 <<<', 0);
|
|
|
// 设置进程标题
|