恢复为实时流量统计

This commit is contained in:
root 2023-10-10 01:51:50 +09:00
parent 5ad5a633a9
commit 573f534f3e
No known key found for this signature in database
GPG Key ID: 8827A30FF1DB1379
7 changed files with 174 additions and 153 deletions

View File

@ -1,140 +0,0 @@
<?php
namespace App\Console\Commands;
use App\Models\StatServer;
use App\Models\StatUser;
use App\Services\StatisticalService;
use Illuminate\Console\Command;
use App\Models\Stat;
use Illuminate\Support\Facades\DB;
class V2boardStatistics extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'v2board:statistics';
/**
* The console command description.
*
* @var string
*/
protected $description = '统计任务';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
$startAt = microtime(true);
ini_set('memory_limit', -1);
$this->statUser();
$this->statServer();
$this->stat();
info('统计任务执行完毕。耗时:' . (microtime(true) - $startAt) / 1000);
}
private function statServer()
{
try {
DB::beginTransaction();
$createdAt = time();
$recordAt = strtotime('-1 day', strtotime(date('Y-m-d')));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setServerStats();
$stats = $statService->getStatServer();
foreach ($stats as $stat) {
if (!StatServer::insert([
'server_id' => $stat['server_id'],
'server_type' => $stat['server_type'],
'u' => $stat['u'],
'd' => $stat['d'],
'created_at' => $createdAt,
'updated_at' => $createdAt,
'record_type' => 'd',
'record_at' => $recordAt
])) {
throw new \Exception('stat server fail');
}
}
DB::commit();
$statService->clearStatServer();
} catch (\Exception $e) {
DB::rollback();
\Log::error($e->getMessage(), ['exception' => $e]);
}
}
private function statUser()
{
try {
DB::beginTransaction();
$createdAt = time();
$recordAt = strtotime('-1 day', strtotime(date('Y-m-d')));
$statService = new StatisticalService();
$statService->setStartAt($recordAt);
$statService->setUserStats();
$stats = $statService->getStatUser();
foreach ($stats as $stat) {
if (!StatUser::insert([
'user_id' => $stat['user_id'],
'u' => $stat['u'],
'd' => $stat['d'],
'server_rate' => $stat['server_rate'],
'created_at' => $createdAt,
'updated_at' => $createdAt,
'record_type' => 'd',
'record_at' => $recordAt
])) {
throw new \Exception('stat user fail');
}
}
DB::commit();
$statService->clearStatUser();
} catch (\Exception $e) {
DB::rollback();
\Log::error($e->getMessage(), ['exception' => $e]);
}
}
private function stat()
{
try {
$endAt = strtotime(date('Y-m-d'));
$startAt = strtotime('-1 day', $endAt);
$statisticalService = new StatisticalService();
$statisticalService->setStartAt($startAt);
$statisticalService->setEndAt($endAt);
$data = $statisticalService->generateStatData();
$data['record_at'] = $startAt;
$data['record_type'] = 'd';
$statistic = Stat::where('record_at', $startAt)
->where('record_type', 'd')
->first();
if ($statistic) {
$statistic->update($data);
return;
}
Stat::create($data);
} catch (\Exception $e) {
\Log::error($e->getMessage(), ['exception' => $e]);
}
}
}

View File

@ -28,7 +28,7 @@ class Kernel extends ConsoleKernel
{
Cache::put(CacheKey::get('SCHEDULE_LAST_CHECK_AT', null), time());
// v2board
$schedule->command('v2board:statistics')->dailyAt('0:10');
//$schedule->command('v2board:statistics')->dailyAt('0:10');
// check
$schedule->command('check:order')->everyMinute();
$schedule->command('check:commission')->everyMinute();

View File

@ -0,0 +1,78 @@
<?php
namespace App\Jobs;
use App\Models\StatServer;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class StatServerJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $u;
protected $d;
protected $server;
protected $protocol;
protected $recordType;
public $tries = 3;
public $timeout = 60;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($u, $d, $server, $protocol, $recordType = 'd')
{
$this->onQueue('stat');
$this->u = $u;
$this->d = $d;
$this->server = $server;
$this->protocol = $protocol;
$this->recordType = $recordType;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$recordAt = strtotime(date('Y-m-d'));
if ($this->recordType === 'm') {
//
}
$data = StatServer::lockForUpdate()
->where('record_at', $recordAt)
->where('server_id', $this->server['id'])
->where('server_type', $this->protocol)
->first();
if ($data) {
try {
$data->update([
'u' => $data['u'] + $this->u,
'd' => $data['d'] + $this->d
]);
} catch (\Exception $e) {
abort(500, '节点统计数据更新失败');
}
} else {
if (!StatServer::create([
'server_id' => $this->server['id'],
'server_type' => $this->protocol,
'u' => $this->u,
'd' => $this->d,
'record_type' => $this->recordType,
'record_at' => $recordAt
])) {
abort(500, '节点统计数据创建失败');
}
}
}
}

80
app/Jobs/StatUserJob.php Normal file
View File

@ -0,0 +1,80 @@
<?php
namespace App\Jobs;
use App\Models\StatServer;
use App\Models\StatUser;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class StatUserJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $u;
protected $d;
protected $userId;
protected $server;
protected $protocol;
protected $recordType;
public $tries = 3;
public $timeout = 60;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($u, $d, $userId, array $server, $protocol, $recordType = 'd')
{
$this->onQueue('stat');
$this->u = $u;
$this->d = $d;
$this->userId = $userId;
$this->server = $server;
$this->protocol = $protocol;
$this->recordType = $recordType;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$recordAt = strtotime(date('Y-m-d'));
if ($this->recordType === 'm') {
//
}
$data = StatUser::where('record_at', $recordAt)
->where('server_rate', $this->server['rate'])
->where('user_id', $this->userId)
->first();
if ($data) {
try {
$data->update([
'u' => $data['u'] + ($this->u * $this->server['rate']),
'd' => $data['d'] + ($this->d * $this->server['rate'])
]);
} catch (\Exception $e) {
abort(500, '用户统计数据更新失败');
}
} else {
if (!StatUser::create([
'user_id' => $this->userId,
'server_rate' => $this->server['rate'],
'u' => $this->u,
'd' => $this->d,
'record_type' => $this->recordType,
'record_at' => $recordAt
])) {
abort(500, '用户统计数据创建失败');
}
}
}
}

View File

@ -30,7 +30,7 @@ class StatisticalService {
}
public function setServerStats() {
$this->serverStats = Cache::store('file')->get("stat_server_{$this->startAt}");
$this->serverStats = Cache::get("stat_server_{$this->startAt}");
$this->serverStats = json_decode($this->serverStats, true) ?? [];
if (!is_array($this->serverStats)) {
$this->serverStats = [];
@ -38,7 +38,7 @@ class StatisticalService {
}
public function setUserStats() {
$this->userStats = Cache::store('file')->get("stat_user_{$this->startAt}");
$this->userStats = Cache::get("stat_user_{$this->startAt}");
$this->userStats = json_decode($this->userStats, true) ?? [];
if (!is_array($this->userStats)) {
$this->userStats = [];
@ -95,7 +95,7 @@ class StatisticalService {
} else {
$this->serverStats[$serverType][$serverId] = [$u, $d];
}
Cache::store('file')->set("stat_server_{$this->startAt}", json_encode($this->serverStats));
Cache::put("stat_server_{$this->startAt}", json_encode($this->serverStats), 6000);
}
public function statUser($rate, $userId, $u, $d)
@ -107,7 +107,7 @@ class StatisticalService {
} else {
$this->userStats[$rate][$userId] = [$u, $d];
}
Cache::store('file')->set("stat_user_{$this->startAt}", json_encode($this->userStats));
Cache::put("stat_user_{$this->startAt}", json_encode($this->userStats), 6000);
}
public function getStatUserByUserID($userId): array
@ -165,12 +165,12 @@ class StatisticalService {
public function clearStatUser()
{
Cache::store('file')->forget("stat_user_{$this->startAt}");
Cache::forget("stat_user_{$this->startAt}");
}
public function clearStatServer()
{
Cache::store('file')->forget("stat_server_{$this->startAt}");
Cache::forget("stat_server_{$this->startAt}");
}
public function getStatRecord($type)

View File

@ -170,16 +170,18 @@ class UserService
public function trafficFetch(array $server, string $protocol, array $data)
{
$statService = new StatisticalService();
$statService->setStartAt(strtotime(date('Y-m-d')));
$statService->setUserStats();
$statService->setServerStats();
//$statService = new StatisticalService();
//$statService->setStartAt(strtotime(date('Y-m-d')));
//$statService->setUserStats();
//$statService->setServerStats();
foreach (array_keys($data) as $userId) {
$u = $data[$userId][0];
$d = $data[$userId][1];
StatServerJob::dispatch($u, $d, $server, $protocol, 'd');
StatUserJob::dispatch($u, $d, $userId, $server, $protocol, 'd');
TrafficFetchJob::dispatch($u, $d, $userId, $server, $protocol);
$statService->statServer($server['id'], $protocol, $u, $d);
$statService->statUser($server['rate'], $userId, $u, $d);
//$statService->statServer($server['id'], $protocol, $u, $d);
//$statService->statUser($server['rate'], $userId, $u, $d);
}
}
}

View File

@ -175,6 +175,7 @@ return [
'queue' => [
'order_handle',
'traffic_fetch',
'stat',
'send_email',
'send_email_mass',
'send_telegram',