627 lines
16 KiB
PHP
627 lines
16 KiB
PHP
<?php
|
|
|
|
namespace App\Classes\Sock;
|
|
|
|
use Illuminate\Support\Arr;
|
|
use Illuminate\Support\Facades\Log;
|
|
use Illuminate\Support\Str;
|
|
|
|
use App\Classes\Sock\Exception\{HAproxyException,SocketException};
|
|
|
|
/**
|
|
* Class SocketClient
|
|
*
|
|
* @package App\Classes\Sock
|
|
* @property int cps
|
|
* @property int speed
|
|
*/
|
|
final class SocketClient {
|
|
private const LOGKEY = 'SC-';
|
|
|
|
// For deep debugging
|
|
private const DEBUG = FALSE;
|
|
|
|
private \Socket $connection;
|
|
private string $address_local = '';
|
|
private int $port_local = 0;
|
|
private string $address_remote = '';
|
|
private int $port_remote = 0;
|
|
|
|
// Our session state
|
|
private array $session = [];
|
|
|
|
private const OK = 0;
|
|
private const TIMEOUT = -2;
|
|
private const ERROR = -5;
|
|
|
|
/** @var string Size of our TX buffer */
|
|
private const TX_BUF_SIZE = 0xFFFF;
|
|
/** @var string Maximum amount of data to send at a time */
|
|
private const TX_SIZE = 0xFFFF;
|
|
/** @var string Data in the TX buffer */
|
|
private string $tx_buf = '';
|
|
|
|
/** @var string Size of our RX buffer */
|
|
private const RX_BUF_SIZE = 0xFFFF;
|
|
/** @var string Maximum amount of data to received at a time */
|
|
private const RX_SIZE = 0xFFFF;
|
|
/** @var string Data in the RX buffer */
|
|
private string $rx_buf = '';
|
|
|
|
public function __construct (\Socket $connection,bool $originate=FALSE)
|
|
{
|
|
$this->connection = $connection;
|
|
|
|
if ($this->type === SOCK_STREAM) {
|
|
socket_getsockname($connection,$this->address_local,$this->port_local);
|
|
socket_getpeername($connection,$this->address_remote,$this->port_remote);
|
|
|
|
// If HAPROXY is used, work get the clients address
|
|
if ((! $originate) && config('fido.haproxy')) {
|
|
Log::debug(sprintf('%s:+ HAPROXY connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type));
|
|
|
|
if (($x=$this->read(5,6)) === 'PROXY ')
|
|
$vers = 1;
|
|
|
|
elseif (($x === "\x0d\x0a\x0d\x0a\x00\x0d") && ($this->read('5,6') === "\x0aQUIT\x0a"))
|
|
$vers = 2;
|
|
|
|
else
|
|
throw new HAproxyException('Failed to initialise HAPROXY connection');
|
|
|
|
switch ($vers) {
|
|
case 1:
|
|
// Protocol/Address Family
|
|
switch ($x=$this->read(5,5)) {
|
|
case 'TCP4 ':
|
|
$p = 4;
|
|
break;
|
|
case 'TCP6 ':
|
|
$p = 6;
|
|
break;
|
|
|
|
default:
|
|
throw new HAproxyException(sprintf('HAPROXY protocol [%d] is not handled',$x));
|
|
}
|
|
|
|
$read = $this->read(5,104-11);
|
|
|
|
// IPv4
|
|
if (($p === 4) || ($p === 6)) {
|
|
$parse = collect(sscanf($read,'%s %s %s %s'));
|
|
|
|
$src = Arr::get($parse,0);
|
|
$dst = Arr::get($parse,1);
|
|
$src_port = (int)Arr::get($parse,2);
|
|
$dst_port = (int)Arr::get($parse,3);
|
|
$len = $parse->map(fn($item)=>strlen($item))->sum()+3;
|
|
|
|
// The last 2 chars should be "\r\n"
|
|
if (($x=substr($read,$len)) !== "\r\n")
|
|
throw new HAproxyException(sprintf('HAPROXY parsing failed for version [%d] [%s] (%s)',$p,$read,hex_dump($x)));
|
|
|
|
} else {
|
|
throw new HAproxyException(sprintf('HAPROXY version [%d] is not handled [%s]',$p,$read));
|
|
}
|
|
|
|
$this->port_remote = $src_port;
|
|
|
|
break;
|
|
|
|
case 2:
|
|
// Version/Command
|
|
$vc = $this->read_ch(5);
|
|
|
|
if (($x=($vc>>4)&0x7) !== 2)
|
|
throw new HAproxyException(sprintf('Unknown HAPROXY version [%d]',$x));
|
|
|
|
switch ($x=($vc&0x7)) {
|
|
// HAPROXY internal
|
|
case 0:
|
|
throw new HAproxyException('HAPROXY internal health-check');
|
|
|
|
// PROXY connection
|
|
case 1:
|
|
break;
|
|
|
|
default:
|
|
throw new HAproxyException(sprintf('HAPROXY command [%d] is not handled',$x));
|
|
}
|
|
|
|
// Protocol/Address Family
|
|
$pa = $this->read_ch(5);
|
|
|
|
switch ($x=($pa>>4)&0x7) {
|
|
case 1: // AF_INET
|
|
$p = 4;
|
|
break;
|
|
|
|
case 2: // AF_INET6
|
|
$p = 6;
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
switch ($x=($pa&0x7)) {
|
|
case 1: // STREAM
|
|
break;
|
|
|
|
default:
|
|
throw new HAproxyException(sprintf('HAPROXY address family [%d] is not handled',$x));
|
|
}
|
|
|
|
$len = Arr::get(unpack('n',$this->read(5,2)),1);
|
|
|
|
// IPv4
|
|
if (($p === 4) && ($len === 12)) {
|
|
$src = inet_ntop($this->read(5,4));
|
|
$dst = inet_ntop($this->read(5,4));
|
|
|
|
} elseif (($p === 6) && ($len === 36)) {
|
|
$src = inet_ntop($this->read(5,16));
|
|
$dst = inet_ntop($this->read(5,16));
|
|
|
|
} else {
|
|
throw new HAproxyException(sprintf('HAPROXY address len [%d:%d] is not handled',$p,$len));
|
|
}
|
|
|
|
$src_port = unpack('n',$this->read(5,2));
|
|
$dst_port = Arr::get(unpack('n',$this->read(5,2)),1);
|
|
|
|
$this->port_remote = Arr::get($src_port,1);
|
|
|
|
break;
|
|
|
|
default:
|
|
throw new HAproxyException('Failed to initialise HAPROXY connection');
|
|
}
|
|
|
|
$this->address_remote = $src;
|
|
|
|
Log::debug(sprintf('%s:- HAPROXY src [%s:%d] dst [%s:%d]',
|
|
self::LOGKEY,
|
|
$this->address_remote,
|
|
$this->port_remote,
|
|
$dst,
|
|
$dst_port,
|
|
));
|
|
}
|
|
|
|
Log::debug(sprintf('%s:+ Connection host [%s] on port [%d] (%s)',self::LOGKEY,$this->address_remote,$this->port_remote,$this->type));
|
|
}
|
|
}
|
|
|
|
public function __get(string $key): mixed
|
|
{
|
|
return match ($key) {
|
|
'address_remote', 'port_remote' => $this->{$key},
|
|
'cps', 'speed' => Arr::get($this->session,$key),
|
|
'rx_free' => self::RX_BUF_SIZE-$this->rx_left,
|
|
'rx_left' => strlen($this->rx_buf),
|
|
'tx_free' => self::TX_BUF_SIZE-strlen($this->tx_buf),
|
|
'type' => socket_get_option($this->connection,SOL_SOCKET,SO_TYPE),
|
|
default => throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY, $key)),
|
|
};
|
|
}
|
|
|
|
public function __set(string $key,mixed $value): void
|
|
{
|
|
switch ($key) {
|
|
case 'cps':
|
|
case 'speed':
|
|
$this->session[$key] = $value;
|
|
break;
|
|
|
|
default:
|
|
throw new \Exception(sprintf('%s:! Unknown key [%s]:',self::LOGKEY,$key));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Create a client socket
|
|
*
|
|
* @param string $address
|
|
* @param int $port
|
|
* @return static
|
|
* @throws SocketException|HAproxyException
|
|
*/
|
|
public static function create(string $address,int $port): self
|
|
{
|
|
Log::info(sprintf('%s:+ Creating connection to [%s:%d]',self::LOGKEY,$address,$port));
|
|
|
|
$type = collect(config('fido.ip'))
|
|
->filter(fn($item)=>$item['enabled']);
|
|
|
|
if (filter_var($address,FILTER_VALIDATE_IP))
|
|
$resolved = collect([[
|
|
(($x=filter_var($address,FILTER_VALIDATE_IP, FILTER_FLAG_IPV6)) ? 'ipv6' : 'ip')=>$address,
|
|
'type'=>$x ? 'AAAA' : 'A'
|
|
]]);
|
|
else
|
|
// We only look at AAAA/A records
|
|
$resolved = collect(dns_get_record($address,$type->map(fn($item)=>$item['type'])->sum()))
|
|
->filter(fn($item)=>$type->has(Arr::get($item,'type')))
|
|
->sort(fn($a,$b)=>$type->get(Arr::get($a,'type'))['order'] < $type->get(Arr::get($b,'type'))['order']);
|
|
|
|
if (! $resolved->count())
|
|
throw new SocketException(SocketException::CANT_CONNECT,sprintf('%s doesnt resolved to an IPv4/IPv6 address',$address));
|
|
|
|
$result = FALSE;
|
|
$socket = NULL;
|
|
|
|
foreach ($resolved as $address) {
|
|
try {
|
|
$try = Arr::get($address,Arr::get($address,'type') === 'AAAA' ? 'ipv6' : 'ip');
|
|
if (! $try)
|
|
continue;
|
|
|
|
Log::info(sprintf('%s:- Trying [%s:%d]',self::LOGKEY,$try,$port));
|
|
|
|
/* Create a TCP/IP socket. */
|
|
$socket = socket_create(Arr::get($address,'type') === 'AAAA' ? AF_INET6 : AF_INET,SOCK_STREAM,SOL_TCP);
|
|
if ($socket === FALSE)
|
|
throw new SocketException(SocketException::CANT_CREATE_SOCKET,socket_strerror(socket_last_error($socket)));
|
|
|
|
$result = socket_connect($socket,$try,$port);
|
|
break;
|
|
|
|
} catch (\ErrorException $e) {
|
|
// If 'Cannot assign requested address'
|
|
if (socket_last_error($socket) === 99)
|
|
continue;
|
|
|
|
throw new SocketException(SocketException::CANT_CONNECT,socket_strerror(socket_last_error($socket)));
|
|
}
|
|
}
|
|
|
|
if ($result === FALSE)
|
|
throw new SocketException(SocketException::CANT_CONNECT,socket_strerror(socket_last_error($socket)));
|
|
|
|
return new self($socket,TRUE);
|
|
}
|
|
|
|
/**
|
|
* We'll add to our transmit buffer and if doesnt have space, we'll empty it first
|
|
*
|
|
* @param string $data
|
|
* @return void
|
|
* @throws \Exception
|
|
*/
|
|
public function buffer_add(string $data): void
|
|
{
|
|
$ptr = 0;
|
|
$num_bytes = strlen($data);
|
|
|
|
while ($num_bytes) {
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- To add [%d] to the TX buffer',self::LOGKEY,$num_bytes));
|
|
|
|
if ($num_bytes > $this->tx_free) {
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- TX buffer will be too full, draining...',self::LOGKEY));
|
|
|
|
do {
|
|
$this->buffer_flush(5);
|
|
|
|
$n = min($this->tx_free,$num_bytes);
|
|
$this->tx_buf = substr($data,$ptr,$n);
|
|
$num_bytes -= $n;
|
|
$ptr += $n;
|
|
|
|
} while ($num_bytes);
|
|
|
|
} else {
|
|
$this->tx_buf .= substr($data,$ptr,$num_bytes);
|
|
$num_bytes = 0;
|
|
}
|
|
}
|
|
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:= TX buffer has [%d] space left',self::LOGKEY,$this->tx_free));
|
|
}
|
|
|
|
/**
|
|
* Empty our TX buffer
|
|
*
|
|
* @param int $timeout
|
|
* @return int
|
|
* @throws \Exception
|
|
*/
|
|
public function buffer_flush(int $timeout): int
|
|
{
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:+ Emptying TX buffer with [%d] chars, and timeout [%d]',self::LOGKEY,strlen($this->tx_buf),$timeout));
|
|
|
|
$tm = $this->timer_set($timeout);
|
|
$rc = self::OK;
|
|
|
|
while (strlen($this->tx_buf)) {
|
|
$tv = $this->timer_rest($tm);
|
|
|
|
if ($rc=$this->canSend($tv)) {
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- Chars to send [%d]',self::LOGKEY,strlen($this->tx_buf)));
|
|
|
|
$sent = $this->send(substr($this->tx_buf,0,self::TX_SIZE),0);
|
|
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- Sent [%d] chars [%s]',self::LOGKEY,$sent,Str::limit($this->tx_buf,15)));
|
|
|
|
$this->tx_buf = substr($this->tx_buf,$sent);
|
|
|
|
} else {
|
|
return $rc;
|
|
}
|
|
|
|
// @todo Enable a delay for slow clients
|
|
//sleep(1);
|
|
if ($this->timer_expired($tm))
|
|
return self::ERROR;
|
|
}
|
|
|
|
$this->tx_purge();
|
|
|
|
return $rc;
|
|
}
|
|
|
|
/**
|
|
* @param int $timeout
|
|
* @return bool
|
|
* @throws \Exception
|
|
*/
|
|
public function canSend(int $timeout): bool
|
|
{
|
|
$write = [$this->connection];
|
|
|
|
return $this->socketSelect(NULL,$write,NULL,$timeout) > 0;
|
|
}
|
|
|
|
/**
|
|
* Close the connection with the client
|
|
*/
|
|
public function close(): void
|
|
{
|
|
try {
|
|
socket_shutdown($this->connection);
|
|
} catch (\ErrorException $e) {
|
|
Log::error(sprintf('%s:! Shutting down socket [%s]',self::LOGKEY,$e->getMessage()));
|
|
}
|
|
|
|
try {
|
|
socket_close($this->connection);
|
|
} catch (\ErrorException $e) {
|
|
Log::error(sprintf('%s:! Closing socket [%s]',self::LOGKEY,$e->getMessage()));
|
|
}
|
|
|
|
Log::debug(sprintf('%s:= Connection closed with [%s]',self::LOGKEY,$this->address_remote));
|
|
}
|
|
|
|
/**
|
|
* We have data in the buffer or on the socket
|
|
*
|
|
* @param int $timeout
|
|
* @return bool
|
|
* @throws \Exception
|
|
*/
|
|
public function hasData(int $timeout): bool
|
|
{
|
|
$read = [$this->connection];
|
|
|
|
return ($this->rx_left ?: $this->socketSelect($read,NULL,NULL,$timeout)) > 0;
|
|
}
|
|
|
|
/**
|
|
* Read data, emptying from the RX buffer first, then checking the socket.
|
|
*
|
|
* @param int $timeout How long to wait for data
|
|
* @param int $len The amount of data we want
|
|
* @param int $flags
|
|
* @return string|null
|
|
* @throws SocketException
|
|
*/
|
|
public function read(int $timeout,int $len=1024,int $flags=MSG_DONTWAIT): ?string
|
|
{
|
|
// We have data in our buffer
|
|
if ($this->rx_left >= $len) {
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- Returning [%d] chars from the RX buffer',self::LOGKEY,$len));
|
|
|
|
$result = substr($this->rx_buf,0,$len);
|
|
$this->rx_buf = substr($this->rx_buf,strlen($result));
|
|
|
|
return $result;
|
|
}
|
|
|
|
if ($timeout && (! $this->hasData($timeout)))
|
|
throw new SocketException(SocketException::SOCKET_TIMEOUT,$timeout);
|
|
|
|
$buf = '';
|
|
|
|
try {
|
|
switch ($this->type) {
|
|
case SOCK_STREAM:
|
|
$recv = socket_recv($this->connection,$buf,self::RX_SIZE,$flags);
|
|
break;
|
|
|
|
case SOCK_DGRAM:
|
|
$recv = socket_recvfrom($this->connection,$buf,self::RX_SIZE,$flags,$this->address_remote,$this->port_remote);
|
|
break;
|
|
|
|
default:
|
|
throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type));
|
|
}
|
|
|
|
} catch (\Exception $e) {
|
|
Log::error(sprintf('%s:! socket_recv Exception [%s]',self::LOGKEY,$e->getMessage()));
|
|
|
|
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
|
}
|
|
|
|
// If we got no data, we'll send whatever is left in the buffer
|
|
if ($recv === FALSE) {
|
|
// If we have something in the buffer, we'll send it
|
|
if ($this->rx_left) {
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- Network read return an error, returning final [%d] chars from the RX buffer',self::LOGKEY,strlen($this->rx_buf)));
|
|
|
|
$result = $this->rx_buf;
|
|
$this->rx_buf = '';
|
|
|
|
return $result;
|
|
}
|
|
|
|
Log::debug(sprintf('%s:! Request to read [%d] chars resulted in no data',self::LOGKEY,$len));
|
|
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
|
}
|
|
|
|
// If our buffer is null, see if we have any out of band data.
|
|
// @todo We throw an errorexception when the socket is closed by the remote I think.
|
|
if (($recv === 0) && is_null($buf) && ($this->hasData(0) > 0) && $this->type === SOCK_STREAM) {
|
|
try {
|
|
socket_recv($this->connection,$buf,$len,MSG_OOB);
|
|
|
|
} catch (\Exception $e) {
|
|
throw new SocketException($x=socket_last_error($this->connection),socket_strerror($x));
|
|
}
|
|
}
|
|
|
|
$this->rx_buf .= $buf;
|
|
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- Added [%d] chars to the RX buffer',self::LOGKEY,strlen($buf)),['rx_buf'=>hex_dump($this->rx_buf)]);
|
|
|
|
// Loop again and return the data, now that it is in the RX buffer
|
|
return $this->read($timeout,$len);
|
|
}
|
|
|
|
/**
|
|
* Read a character from the remote.
|
|
* We'll buffer everything received
|
|
*
|
|
* @param int $timeout
|
|
* @return int
|
|
* @throws \Exception
|
|
*/
|
|
public function read_ch(int $timeout): int
|
|
{
|
|
if ($this->hasData($timeout))
|
|
$ch = $this->read($timeout,1);
|
|
|
|
else
|
|
throw new SocketException(SocketException::SOCKET_TIMEOUT,$timeout);
|
|
|
|
return ord($ch);
|
|
}
|
|
|
|
public function rx_purge(): void
|
|
{
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:+ Discarding [%d] chars from the RX buffer',self::LOGKEY,strlen($this->tx_buf)));
|
|
|
|
$this->rx_buf = '';
|
|
}
|
|
|
|
/**
|
|
* Clear our TX buffer
|
|
*/
|
|
public function tx_purge(): void
|
|
{
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:+ Discarding [%d] chars from the TX buffer',self::LOGKEY,strlen($this->tx_buf)));
|
|
|
|
$this->tx_buf = '';
|
|
}
|
|
|
|
/**
|
|
* Send data to the client
|
|
*
|
|
* @param string $message
|
|
* @param int $timeout
|
|
* @return int|bool
|
|
* @throws \Exception
|
|
*/
|
|
public function send(string $message,int $timeout): int|bool
|
|
{
|
|
if ($timeout && (! $rc=$this->canSend($timeout)))
|
|
return $rc;
|
|
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:- Sending [%d] chars [%s]',self::LOGKEY,strlen($message),Str::limit($message,15)));
|
|
|
|
switch ($this->type) {
|
|
case SOCK_STREAM:
|
|
return socket_write($this->connection,$message,strlen($message));
|
|
|
|
case SOCK_DGRAM:
|
|
return socket_sendto($this->connection,$message,strlen($message),0,$this->address_remote,$this->port_remote);
|
|
|
|
default:
|
|
throw new SocketException(SocketException::SOCKET_ERROR,sprintf('Unhandled socket type: %s',$this->type));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Wait for data on a socket
|
|
*
|
|
* @param array|null $read
|
|
* @param array|null $write
|
|
* @param array|null $except
|
|
* @param int $timeout
|
|
* @return int
|
|
* @throws \Exception
|
|
*/
|
|
private function socketSelect(?array $read,?array $write,?array $except,int $timeout): int
|
|
{
|
|
$rc = socket_select($read,$write,$except,$timeout);
|
|
|
|
if ($rc === FALSE)
|
|
throw new \Exception('Socket Error: '.socket_strerror(socket_last_error()));
|
|
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:= Socket select returned [%d] with timeout (%d)',self::LOGKEY,$rc,$timeout),['read'=>$read,'write'=>$write,'except'=>$except]);
|
|
|
|
return $rc;
|
|
}
|
|
|
|
public function timer_expired(int $timer): int
|
|
{
|
|
return (time() >= $timer);
|
|
}
|
|
|
|
public function timer_rest(int $timer): int
|
|
{
|
|
return $timer-time();
|
|
}
|
|
|
|
public function timer_set(int $expire): int
|
|
{
|
|
return time()+$expire;
|
|
}
|
|
|
|
/**
|
|
* See if there is data waiting to collect, or if we can send
|
|
*
|
|
* @param bool $read
|
|
* @param bool $write
|
|
* @param int $timeout
|
|
* @return int
|
|
* @throws \Exception
|
|
* @deprecated use canSend or hasData
|
|
*/
|
|
public function ttySelect(bool $read,bool $write, int $timeout): int
|
|
{
|
|
if ($this->rx_left) {
|
|
if (self::DEBUG)
|
|
Log::debug(sprintf('%s:= We still have [%d] chars in the RX buffer.',self::LOGKEY,$this->rx_left));
|
|
|
|
return 1;
|
|
}
|
|
|
|
$read = $read ? [$this->connection] : NULL;
|
|
$write = $write ? [$this->connection] : NULL;
|
|
|
|
return $this->socketSelect($read,$write,NULL,$timeout);
|
|
}
|
|
}
|