update: traffic fetch queue

This commit is contained in:
tokumeikoi 2021-09-01 00:55:07 +09:00
parent fbced4d09b
commit 1d87a1b99a
9 changed files with 134 additions and 240 deletions

View File

@ -76,23 +76,11 @@ class DeepbworkController extends Controller
Cache::put(CacheKey::get('SERVER_V2RAY_ONLINE_USER', $server->id), count($data), 3600); Cache::put(CacheKey::get('SERVER_V2RAY_ONLINE_USER', $server->id), count($data), 3600);
Cache::put(CacheKey::get('SERVER_V2RAY_LAST_PUSH_AT', $server->id), time(), 3600); Cache::put(CacheKey::get('SERVER_V2RAY_LAST_PUSH_AT', $server->id), time(), 3600);
$userService = new UserService(); $userService = new UserService();
DB::beginTransaction();
try {
foreach ($data as $item) { foreach ($data as $item) {
$u = $item['u'] * $server->rate; $u = $item['u'] * $server->rate;
$d = $item['d'] * $server->rate; $d = $item['d'] * $server->rate;
if (!$userService->trafficFetch($u, $d, $item['user_id'], $server, 'vmess')) { $userService->trafficFetch($u, $d, $item['user_id'], $server, 'trojan');
continue;
} }
}
} catch (\Exception $e) {
DB::rollBack();
return response([
'ret' => 0,
'msg' => 'user fetch fail'
]);
}
DB::commit();
return response([ return response([
'ret' => 1, 'ret' => 1,

View File

@ -1,158 +0,0 @@
<?php
namespace App\Http\Controllers\Server;
use App\Services\ServerService;
use App\Services\UserService;
use App\Utils\CacheKey;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
use App\Models\User;
use App\Models\Plan;
use App\Models\Server;
use App\Models\ServerLog;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Cache;
/*
* V2ray Poseidon
* Github: https://github.com/ColetteContreras/trojan-poseidon
*/
class PoseidonController extends Controller
{
public $poseidonVersion;
public function __construct(Request $request)
{
$this->poseidonVersion = $request->input('poseidon_version');
}
// 后端获取用户
public function user(Request $request)
{
if ($r = $this->verifyToken($request)) { return $r; }
$nodeId = $request->input('node_id');
$server = Server::find($nodeId);
if (!$server) {
return $this->error("server could not be found", 404);
}
Cache::put(CacheKey::get('SERVER_V2RAY_LAST_CHECK_AT', $server->id), time(), 3600);
$serverService = new ServerService();
$users = $serverService->getAvailableUsers($server->group_id);
$result = [];
foreach ($users as $user) {
$user->v2ray_user = [
"uuid" => $user->uuid,
"email" => sprintf("%s@v2board.user", $user->uuid),
"alter_id" => $server->alter_id,
"level" => 0,
];
unset($user['uuid']);
unset($user['email']);
array_push($result, $user);
}
return $this->success($result);
}
// 后端提交数据
public function submit(Request $request)
{
if ($r = $this->verifyToken($request)) { return $r; }
$server = Server::find($request->input('node_id'));
if (!$server) {
return $this->error("server could not be found", 404);
}
$data = file_get_contents('php://input');
$data = json_decode($data, true);
Cache::put(CacheKey::get('SERVER_V2RAY_ONLINE_USER', $server->id), count($data), 3600);
Cache::put(CacheKey::get('SERVER_V2RAY_LAST_PUSH_AT', $server->id), time(), 3600);
$userService = new UserService();
foreach ($data as $item) {
$u = $item['u'] * $server->rate;
$d = $item['d'] * $server->rate;
if (!$userService->trafficFetch($u, $d, $item['user_id'], $server, 'vmess')) {
return $this->error("user fetch fail", 500);
}
}
return $this->success('');
}
// 后端获取配置
public function config(Request $request)
{
if ($r = $this->verifyToken($request)) { return $r; }
$nodeId = $request->input('node_id');
$localPort = $request->input('local_port');
if (empty($nodeId) || empty($localPort)) {
return $this->error('invalid parameters', 400);
}
$serverService = new ServerService();
try {
$json = $serverService->getV2RayConfig($nodeId, $localPort);
$json->poseidon = [
'license_key' => (string)config('v2board.server_license'),
];
if ($this->poseidonVersion >= 'v1.5.0') {
// don't need it after v1.5.0
unset($json->inboundDetour);
unset($json->stats);
unset($json->api);
array_shift($json->routing->rules);
}
foreach($json->policy->levels as &$level) {
$level->handshake = 2;
$level->uplinkOnly = 2;
$level->downlinkOnly = 2;
$level->connIdle = 60;
}
return $this->success($json);
} catch (\Exception $e) {
return $this->error($e->getMessage(), 500);
}
}
protected function verifyToken(Request $request)
{
$token = $request->input('token');
if (empty($token)) {
return $this->error("token must be set");
}
if ($token !== config('v2board.server_token')) {
return $this->error("invalid token");
}
}
protected function error($msg, int $status = 400) {
return response([
'msg' => $msg,
], $status);
}
protected function success($data) {
$req = request();
// Only for "GET" method
if (!$req->isMethod('GET') || !$data) {
return response([
'msg' => 'ok',
'data' => $data,
]);
}
$etag = sha1(json_encode($data));
if ($etag == $req->header("IF-NONE-MATCH")) {
return response(null, 304);
}
return response([
'msg' => 'ok',
'data' => $data,
])->header('ETAG', $etag);
}
}

View File

@ -8,10 +8,6 @@ use App\Services\UserService;
use App\Utils\CacheKey; use App\Utils\CacheKey;
use Illuminate\Http\Request; use Illuminate\Http\Request;
use App\Http\Controllers\Controller; use App\Http\Controllers\Controller;
use App\Models\User;
use App\Models\ServerLog;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Cache; use Illuminate\Support\Facades\Cache;
/* /*
@ -72,23 +68,11 @@ class ShadowsocksTidalabController extends Controller
Cache::put(CacheKey::get('SERVER_SHADOWSOCKS_ONLINE_USER', $server->id), count($data), 3600); Cache::put(CacheKey::get('SERVER_SHADOWSOCKS_ONLINE_USER', $server->id), count($data), 3600);
Cache::put(CacheKey::get('SERVER_SHADOWSOCKS_LAST_PUSH_AT', $server->id), time(), 3600); Cache::put(CacheKey::get('SERVER_SHADOWSOCKS_LAST_PUSH_AT', $server->id), time(), 3600);
$userService = new UserService(); $userService = new UserService();
DB::beginTransaction();
try {
foreach ($data as $item) { foreach ($data as $item) {
$u = $item['u'] * $server->rate; $u = $item['u'] * $server->rate;
$d = $item['d'] * $server->rate; $d = $item['d'] * $server->rate;
if (!$userService->trafficFetch((float)$u, (float)$d, (int)$item['user_id'], $server, 'shadowsocks')) { $userService->trafficFetch($u, $d, $item['user_id'], $server, 'trojan');
continue;
} }
}
} catch (\Exception $e) {
DB::rollBack();
return response([
'ret' => 0,
'msg' => 'user fetch fail'
]);
}
DB::commit();
return response([ return response([
'ret' => 1, 'ret' => 1,

View File

@ -73,23 +73,11 @@ class TrojanTidalabController extends Controller
Cache::put(CacheKey::get('SERVER_TROJAN_ONLINE_USER', $server->id), count($data), 3600); Cache::put(CacheKey::get('SERVER_TROJAN_ONLINE_USER', $server->id), count($data), 3600);
Cache::put(CacheKey::get('SERVER_TROJAN_LAST_PUSH_AT', $server->id), time(), 3600); Cache::put(CacheKey::get('SERVER_TROJAN_LAST_PUSH_AT', $server->id), time(), 3600);
$userService = new UserService(); $userService = new UserService();
DB::beginTransaction();
try {
foreach ($data as $item) { foreach ($data as $item) {
$u = $item['u'] * $server->rate; $u = $item['u'] * $server->rate;
$d = $item['d'] * $server->rate; $d = $item['d'] * $server->rate;
if (!$userService->trafficFetch($u, $d, $item['user_id'], $server, 'trojan')) { $userService->trafficFetch($u, $d, $item['user_id'], $server, 'trojan');
continue;
} }
}
} catch (\Exception $e) {
DB::rollBack();
return response([
'ret' => 0,
'msg' => 'user fetch fail'
]);
}
DB::commit();
return response([ return response([
'ret' => 1, 'ret' => 1,

58
app/Jobs/ServerLogJob.php Normal file
View File

@ -0,0 +1,58 @@
<?php
namespace App\Jobs;
use App\Services\ServerService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ServerLogJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $u;
protected $d;
protected $userId;
protected $server;
protected $protocol;
public $tries = 3;
public $timeout = 3;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($u, $d, $userId, $server, $protocol)
{
$this->onQueue('server_log');
$this->u = $u;
$this->d = $d;
$this->userId = $userId;
$this->server = $server;
$this->protocol = $protocol;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$serverService = new ServerService();
if (!$serverService->log(
$this->userId,
$this->server->id,
$this->u,
$this->d,
$this->server->rate,
$this->protocol
)) {
throw new \Exception('日志记录失败');
}
}
}

View File

@ -0,0 +1,56 @@
<?php
namespace App\Jobs;
use App\Models\User;
use App\Services\MailService;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class TrafficFetchJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $u;
protected $d;
protected $userId;
protected $server;
protected $protocol;
public $tries = 3;
public $timeout = 3;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($u, $d, $userId, $server, $protocol)
{
$this->onQueue('traffic_fetch');
$this->u = $u;
$this->d = $d;
$this->userId = $userId;
$this->server = $server;
$this->protocol = $protocol;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
$user = User::lockForUpdate()->find($this->userId);
if (!$user) return;
$user->t = time();
$user->u = $user->u + $this->u;
$user->d = $user->d + $this->d;
if (!$user->save()) throw new \Exception('流量更新失败');
$mailService = new MailService();
$mailService->remindTraffic($user);
}
}

View File

@ -272,7 +272,7 @@ class ServerService
if ($serverLog) { if ($serverLog) {
$serverLog->u = $serverLog->u + $u; $serverLog->u = $serverLog->u + $u;
$serverLog->d = $serverLog->d + $d; $serverLog->d = $serverLog->d + $d;
$serverLog->save(); return $serverLog->save();
} else { } else {
$serverLog = new ServerLog(); $serverLog = new ServerLog();
$serverLog->user_id = $userId; $serverLog->user_id = $userId;
@ -282,7 +282,7 @@ class ServerService
$serverLog->rate = $rate; $serverLog->rate = $rate;
$serverLog->log_at = $timestamp; $serverLog->log_at = $timestamp;
$serverLog->method = $method; $serverLog->method = $method;
$serverLog->save(); return $serverLog->save();
} }
} }

View File

@ -2,6 +2,8 @@
namespace App\Services; namespace App\Services;
use App\Jobs\ServerLogJob;
use App\Jobs\TrafficFetchJob;
use App\Models\InviteCode; use App\Models\InviteCode;
use App\Models\Order; use App\Models\Order;
use App\Models\Server; use App\Models\Server;
@ -80,33 +82,9 @@ class UserService
return true; return true;
} }
public function trafficFetch(int $u, int $d, int $userId, object $server, string $protocol):bool public function trafficFetch(int $u, int $d, int $userId, object $server, string $protocol)
{ {
$user = User::lockForUpdate() TrafficFetchJob::dispatch($u, $d, $userId, $server, $protocol);
->find($userId); ServerLogJob::dispatch($u, $d, $userId, $server, $protocol);
if (!$user) {
return true;
}
$user->t = time();
$user->u = $user->u + $u;
$user->d = $user->d + $d;
if (!$user->save()) {
return false;
}
$mailService = new MailService();
$serverService = new ServerService();
try {
$mailService->remindTraffic($user);
$serverService->log(
$userId,
$server->id,
$u,
$d,
$server->rate,
$protocol
);
} catch (\Exception $e) {
}
return true;
} }
} }

View File

@ -1,5 +1,5 @@
apps: apps:
- name : 'V2Board' - name : 'V2Board'
script : 'php artisan queue:work --queue=send_email,send_telegram,stat_server' script : 'php artisan queue:work --queue=traffic_fetch,server_log,send_email,send_telegram,stat_server'
instances: 4 instances: 4
out_file : './storage/logs/queue/queue.log' out_file : './storage/logs/queue/queue.log'