Skip to content

Commit 14580c4

Browse files
committed
增强代理服务,使其具备连接池和 TLS 支持功能
- 在 ProxyService 中添加了连接池的初始化和管理功能。 - 实现了对 MySQL 连接的 TLS 握手处理和数据转发功能。 - 引入了针对 MySQL 发送操作的新日志记录。 - 更新了 TLS 设置和连接池参数的配置。 - 清除了 ServerShutdownListener 中未使用的工作类型日志记录。 - 添加了套接字锁定机制,以确保在执行 MySQL 操作时的线程安全性。
1 parent 5753012 commit 14580c4

12 files changed

Lines changed: 1289 additions & 106 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@ vendor/
1414
.vscode/
1515
/phpstan.neon
1616
/phpunit.xml
17+
.history

app/Listener/ApplicationLifecycleListener.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99
use Hyperf\Framework\Event\{BootApplication, BeforeWorkerStart};
1010
use Psr\Container\ContainerInterface;
1111
use Psr\Log\LoggerInterface;
12+
use App\Service\ProxyService;
1213

1314
#[Listener]
1415
class ApplicationLifecycleListener implements ListenerInterface
1516
{
1617
private LoggerInterface $logger;
18+
private ContainerInterface $container;
1719

1820
public function __construct(ContainerInterface $container)
1921
{
22+
$this->container = $container;
2023
$this->logger = $container->get(\Hyperf\Logger\LoggerFactory::class)->get('proxy');
2124
}
2225

@@ -57,6 +60,27 @@ public function process(object $event): void
5760
'line' => $e->getLine(),
5861
]);
5962
}
63+
64+
// 初始化连接池
65+
try {
66+
$proxyService = $this->container->get(ProxyService::class);
67+
$proxyService->initializeConnectionPool();
68+
69+
$this->logger->info('连接池初始化完成', [
70+
'event' => 'BeforeWorkerStart',
71+
'workerNum' => $event->serverSetting['worker_num'] ?? 'unknown',
72+
'pid' => getmypid(),
73+
]);
74+
} catch (\Throwable $e) {
75+
$this->logger->error('连接池初始化失败', [
76+
'error' => $e->getMessage(),
77+
'file' => $e->getFile(),
78+
'line' => $e->getLine(),
79+
'event' => 'BeforeWorkerStart',
80+
'workerNum' => $event->serverSetting['worker_num'] ?? 'unknown',
81+
'pid' => getmypid(),
82+
]);
83+
}
6084
}
6185
}
6286
}

app/Listener/ServerShutdownListener.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ public function process(object $event): void
4747
$this->logger->info("Worker进程已停止", [
4848
'pid' => getmypid(),
4949
'worker_id' => $workerId,
50-
'worker_type' => $event->workerType,
5150
'event' => 'OnWorkerStop',
5251
]);
5352
}
@@ -57,9 +56,7 @@ public function process(object $event): void
5756
$this->logger->info("Worker进程正在退出", [
5857
'pid' => getmypid(),
5958
'worker_id' => $workerId,
60-
'worker_type' => $event->workerType,
6159
'event' => 'OnWorkerExit',
62-
'exit_code' => $event->exitCode ?? 'unknown',
6360
]);
6461
}
6562
}

app/Protocol/ConnectionContext.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
namespace App\Protocol;
66

77
use Swoole\Coroutine\Socket;
8+
use Swoole\Lock;
89

910
class ConnectionContext
1011
{
@@ -17,6 +18,9 @@ class ConnectionContext
1718
private array $dsnParams = [];
1819
private ?string $targetHost = null;
1920
private ?int $targetPort = null;
21+
private bool $tlsEnabled = false;
22+
private ?string $clientTlsPeerCN = null;
23+
private ?Lock $socketLock = null;
2024

2125
public function __construct(string $clientId, string $clientIp, int $clientPort)
2226
{
@@ -107,6 +111,34 @@ public function setTargetPort(int $port): void
107111
$this->targetPort = $port;
108112
}
109113

114+
public function getSocketLock(): Lock
115+
{
116+
if ($this->socketLock === null) {
117+
$this->socketLock = new Lock(SWOOLE_MUTEX);
118+
}
119+
return $this->socketLock;
120+
}
121+
122+
public function isTlsEnabled(): bool
123+
{
124+
return $this->tlsEnabled;
125+
}
126+
127+
public function setTlsEnabled(bool $tlsEnabled): void
128+
{
129+
$this->tlsEnabled = $tlsEnabled;
130+
}
131+
132+
public function getClientTlsPeerCN(): ?string
133+
{
134+
return $this->clientTlsPeerCN;
135+
}
136+
137+
public function setClientTlsPeerCN(?string $clientTlsPeerCN): void
138+
{
139+
$this->clientTlsPeerCN = $clientTlsPeerCN;
140+
}
141+
110142
public function __toString(): string
111143
{
112144
return sprintf('%s:%d', $this->clientIp, $this->clientPort);
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace App\Service\Pool;
6+
7+
use Swoole\Coroutine\Channel;
8+
use Swoole\Coroutine\Socket;
9+
use App\Service\TargetConnector;
10+
11+
/**
12+
* Per-Worker Connection Pool for MySQL Target Connections
13+
*
14+
* Maintains a fixed-size pool of connections to the target MySQL server.
15+
* Each Swoole Worker has its own pool instance for thread safety.
16+
*/
17+
class ConnectionPool
18+
{
19+
private Channel $pool;
20+
private TargetConnector $connector;
21+
private int $size;
22+
private float $idleTimeout;
23+
private array $stats = [
24+
'created' => 0,
25+
'borrowed' => 0,
26+
'returned' => 0,
27+
'failed' => 0,
28+
'destroyed' => 0,
29+
];
30+
31+
public function __construct(TargetConnector $connector, int $size = 12, float $idleTimeout = 300.0)
32+
{
33+
$this->connector = $connector;
34+
$this->size = $size;
35+
$this->idleTimeout = $idleTimeout;
36+
$this->pool = new Channel($size);
37+
38+
// Pre-populate the pool with connections
39+
$this->initializePool();
40+
}
41+
42+
/**
43+
* Initialize the connection pool by creating initial connections
44+
*/
45+
private function initializePool(): void
46+
{
47+
for ($i = 0; $i < $this->size; $i++) {
48+
try {
49+
$connection = $this->connector->connect();
50+
if ($connection) {
51+
$this->pool->push([
52+
'socket' => $connection,
53+
'created_at' => microtime(true),
54+
'last_used' => microtime(true),
55+
]);
56+
$this->stats['created']++;
57+
}
58+
} catch (\Exception $e) {
59+
$this->stats['failed']++;
60+
// Log the error but continue creating other connections
61+
error_log("Failed to create initial connection in pool: " . $e->getMessage());
62+
}
63+
}
64+
}
65+
66+
/**
67+
* Borrow a connection from the pool
68+
*
69+
* @param float $timeout Timeout in seconds to wait for a connection
70+
* @return Socket|null
71+
*/
72+
public function borrow(float $timeout = 5.0): ?Socket
73+
{
74+
$startTime = microtime(true);
75+
76+
// Try to get an existing connection from the pool
77+
$connection = $this->pool->pop($timeout);
78+
if ($connection !== false) {
79+
$this->stats['borrowed']++;
80+
81+
// Check if connection is still healthy
82+
if ($this->isConnectionHealthy($connection['socket'])) {
83+
$connection['last_used'] = microtime(true);
84+
return $connection['socket'];
85+
} else {
86+
// Connection is unhealthy, destroy it and try to create a new one
87+
$this->destroyConnection($connection['socket']);
88+
$this->stats['destroyed']++;
89+
}
90+
}
91+
92+
// No available connection, try to create a new one
93+
try {
94+
$socket = $this->connector->connect();
95+
if ($socket) {
96+
$this->stats['created']++;
97+
$this->stats['borrowed']++;
98+
return $socket;
99+
}
100+
} catch (\Exception $e) {
101+
$this->stats['failed']++;
102+
error_log("Failed to create new connection in pool: " . $e->getMessage());
103+
}
104+
105+
return null;
106+
}
107+
108+
/**
109+
* Return a connection to the pool
110+
*
111+
* @param Socket $socket
112+
*/
113+
public function release(Socket $socket): void
114+
{
115+
// Check if connection is still healthy before returning to pool
116+
if ($this->isConnectionHealthy($socket)) {
117+
$connection = [
118+
'socket' => $socket,
119+
'created_at' => microtime(true), // Reset timestamps
120+
'last_used' => microtime(true),
121+
];
122+
123+
// Try to return to pool, if pool is full, close the connection
124+
if ($this->pool->push($connection, 0.1) === false) {
125+
$this->destroyConnection($socket);
126+
} else {
127+
$this->stats['returned']++;
128+
}
129+
} else {
130+
// Connection is unhealthy, destroy it
131+
$this->destroyConnection($socket);
132+
$this->stats['destroyed']++;
133+
}
134+
}
135+
136+
/**
137+
* Check if a connection is healthy
138+
*
139+
* @param Socket $socket
140+
* @return bool
141+
*/
142+
private function isConnectionHealthy(Socket $socket): bool
143+
{
144+
// Basic health check - socket should be connected and not have errors
145+
if (!$socket->isConnected()) {
146+
return false;
147+
}
148+
149+
// Check if connection has been idle for too long
150+
$connection = null;
151+
while ($this->pool->length() > 0) {
152+
$conn = $this->pool->pop(0.001);
153+
if ($conn !== false && $conn['socket'] === $socket) {
154+
$connection = $conn;
155+
break;
156+
} elseif ($conn !== false) {
157+
$this->pool->push($conn); // Put back other connections
158+
}
159+
}
160+
161+
if ($connection && (microtime(true) - $connection['last_used']) > $this->idleTimeout) {
162+
return false;
163+
}
164+
165+
// Put connection back if we found it
166+
if ($connection) {
167+
$this->pool->push($connection);
168+
}
169+
170+
return true;
171+
}
172+
173+
/**
174+
* Destroy a connection
175+
*
176+
* @param Socket $socket
177+
*/
178+
private function destroyConnection(Socket $socket): void
179+
{
180+
try {
181+
if ($socket->isConnected()) {
182+
$socket->close();
183+
}
184+
} catch (\Exception $e) {
185+
// Ignore errors when closing unhealthy connections
186+
}
187+
}
188+
189+
/**
190+
* Get pool statistics
191+
*
192+
* @return array
193+
*/
194+
public function getStats(): array
195+
{
196+
return array_merge($this->stats, [
197+
'pool_size' => $this->size,
198+
'available' => $this->pool->length(),
199+
'idle_timeout' => $this->idleTimeout,
200+
]);
201+
}
202+
203+
/**
204+
* Close all connections in the pool
205+
*/
206+
public function close(): void
207+
{
208+
while ($this->pool->length() > 0) {
209+
$connection = $this->pool->pop(0.001);
210+
if ($connection !== false) {
211+
$this->destroyConnection($connection['socket']);
212+
}
213+
}
214+
}
215+
216+
/**
217+
* Get the current number of available connections
218+
*
219+
* @return int
220+
*/
221+
public function getAvailableCount(): int
222+
{
223+
return $this->pool->length();
224+
}
225+
}

0 commit comments

Comments
 (0)