clrghouz/app/Classes/File/Receive.php

373 lines
11 KiB
PHP
Raw Normal View History

2021-04-01 10:59:15 +00:00
<?php
namespace App\Classes\File;
2022-11-13 13:29:55 +00:00
use Illuminate\Support\Arr;
2021-04-01 10:59:15 +00:00
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Storage;
2021-04-01 10:59:15 +00:00
use Symfony\Component\HttpFoundation\File\Exception\FileException;
use App\Classes\{File,Protocol};
2022-11-13 13:29:55 +00:00
use App\Classes\FTN\{InvalidPacketException,Packet};
use App\Exceptions\FileGrewException;
2022-11-01 11:24:36 +00:00
use App\Jobs\{MessageProcess,TicProcess};
use App\Models\Address;
2021-04-01 10:59:15 +00:00
/**
* Object representing the files we are receiving
*
* @property-read resource $fd
* @property-read int total_recv
* @property-read int total_recv_bytes
*/
final class Receive extends Item
{
private const LOGKEY = 'IR-';
private const compression = [
'BZ2',
'GZ',
];
private Address $ao;
2021-04-01 10:59:15 +00:00
private Collection $list;
private ?Item $receiving;
private string $file; // Local filename for file received
/** @var ?string The compression used by the incoming file */
private ?string $comp;
/** @var string|null The compressed data received */
private ?string $comp_data;
2021-04-01 10:59:15 +00:00
public function __construct()
{
// Initialise our variables
$this->list = collect();
$this->receiving = NULL;
}
public function __get($key)
{
switch ($key) {
case 'fd':
return is_resource($this->f);
case 'filepos':
return $this->file_pos;
case 'mtime':
case 'name':
case 'size':
2022-11-13 13:29:55 +00:00
return $this->receiving?->{'file_'.$key};
2021-04-01 10:59:15 +00:00
case 'name_size_time':
return sprintf('%s %lu %lu',$this->name,$this->size,$this->mtime);
2021-04-01 10:59:15 +00:00
case 'to_get':
return $this->list
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === FALSE; })
->count();
case 'total_recv':
return $this->list
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; })
->count();
case 'total_recv_bytes':
return $this->list
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; })
->sum(function($item) { return $item->size; });
2021-04-01 10:59:15 +00:00
default:
2023-06-27 07:39:11 +00:00
throw new \Exception('Unknown key: '.$key);
2021-04-01 10:59:15 +00:00
}
}
/**
* Close the file descriptor for our incoming file
*
2023-06-27 07:39:11 +00:00
* @throws \Exception
2021-04-01 10:59:15 +00:00
*/
public function close(): void
{
if (! $this->receiving)
2023-06-27 07:39:11 +00:00
throw new \Exception('No file to close');
2021-04-01 10:59:15 +00:00
if ($this->f) {
if ($this->file_pos !== $this->receiving->size) {
Log::warning(sprintf('%s:- Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->name,$this->receiving->size-$this->file_pos));
$this->receiving->incomplete = TRUE;
}
2021-04-01 10:59:15 +00:00
$this->receiving->received = TRUE;
2021-04-01 10:59:15 +00:00
$end = time()-$this->start;
Log::debug(sprintf('%s:- Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->name,$end));
2021-04-01 10:59:15 +00:00
if ($this->comp)
Log::info(sprintf('%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]',self::LOGKEY,$this->comp,$x=strlen($this->comp_data),$this->receiving->size,$x/$this->receiving->size*100));
fclose($this->f);
// Set our mtime
touch($this->file,$this->mtime);
$this->file_pos = 0;
$this->f = NULL;
2021-08-12 11:59:48 +00:00
// If the packet has been received but not the right size, dont process it any more.
// If we received a packet, we'll dispatch a job to process it
if (! $this->receiving->incomplete)
switch ($this->receiving->type) {
case self::IS_ARC:
case self::IS_PKT:
Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$this->receiving->type === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file));
2022-11-13 13:29:55 +00:00
try {
$f = new File($this->file);
$processed = FALSE;
2022-11-13 13:29:55 +00:00
foreach ($f as $packet) {
$po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system);
// Check the messages are from the uplink
if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) {
Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id));
2022-11-13 13:29:55 +00:00
break;
}
// Check the packet password
if ($this->ao->session('pktpass') != $po->password) {
Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password));
// @todo Generate message to system advising invalid password - that message should be sent without a packet password!
break;
}
2022-11-13 13:29:55 +00:00
Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count()));
2022-11-13 13:29:55 +00:00
// Queue messages if there are too many in the packet.
if ($queue = ($po->count() > config('app.queue_msgs')))
Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY));
2022-11-13 13:29:55 +00:00
$count = 0;
foreach ($po as $msg) {
Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn));
2022-11-13 13:29:55 +00:00
// @todo Quick check that the packet should be processed by us.
// @todo validate that the packet's zone is in the domain.
2022-11-13 13:29:55 +00:00
try {
// Dispatch job.
if ($queue)
MessageProcess::dispatch($msg,$f->pktName());
else
MessageProcess::dispatchSync($msg,$f->pktName());
2022-11-13 13:29:55 +00:00
} catch (\Exception $e) {
Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage()));
}
$count++;
2022-11-13 13:29:55 +00:00
}
if ($count === $po->count())
$processed = TRUE;
2022-11-13 13:29:55 +00:00
}
if (! $processed) {
Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file));
2022-11-13 13:29:55 +00:00
// If we want to keep the packet, we could do that logic here
} elseif (! config('app.packet_keep')) {
Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->file));
unlink($this->file);
}
} catch (InvalidPacketException $e) {
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
2023-01-11 02:15:30 +00:00
} catch (\Exception $e) {
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
}
break;
case self::IS_TIC:
Log::info(sprintf('%s:- Processing TIC file [%s]',self::LOGKEY,$this->file));
2022-11-01 11:24:36 +00:00
// Queue the tic to be processed later, in case the referenced file hasnt been received yet
TicProcess::dispatch($this->file);
2022-11-01 11:24:36 +00:00
break;
2022-11-01 11:24:36 +00:00
default:
Log::debug(sprintf('%s:- Leaving file [%s] in the inbound dir',self::LOGKEY,$this->file));
}
}
$this->receiving = NULL;
2021-04-01 10:59:15 +00:00
}
/**
* Open the file descriptor to receive a file
*
* @param Address $ao
2021-04-01 10:59:15 +00:00
* @param bool $check
* @param string|null $comp If the incoming file will be compressed
* @return int
2023-06-27 07:39:11 +00:00
* @throws \Exception
* @todo $comp should be parsed, in case it contains other items
2021-04-01 10:59:15 +00:00
*/
public function open(Address $ao,bool $check=FALSE,string $comp=NULL): int
2021-04-01 10:59:15 +00:00
{
// @todo implement return 4 - SUSPEND(?) file
// @todo Change to use Storage::class()
2021-04-01 10:59:15 +00:00
if (! $this->receiving)
2023-06-27 07:39:11 +00:00
throw new \Exception('No files currently receiving');
2021-04-01 10:59:15 +00:00
$this->comp_data = '';
$this->comp = $comp;
/*
if ($this->receiving->size <= self::MAX_COMPSIZE) {
$this->comp = $comp;
} else {
Log::alert(sprintf('%s:- Compression [%s] disabled for file [%s], its size is too big [%d]',self::LOGKEY,$comp,$this->receiving->name,$this->receiving->size));
}
*/
if ($this->comp && (! in_array($this->comp,self::compression)))
throw new \Exception('Unsupported compression:'.$this->comp);
elseif ($this->comp)
Log::debug(sprintf('%s:- Receiving file with [%s] compression',self::LOGKEY,$this->comp));
$this->ao = $ao;
2021-04-01 10:59:15 +00:00
$this->file_pos = 0;
$this->start = time();
$this->file = sprintf('storage/app/%s',$this->local_path($ao));
if (file_exists($this->file)
&& (Storage::disk('local')->lastModified($this->local_path($ao)) === $this->mtime)
&& (Storage::disk('local')->size($this->local_path($ao)) === $this->size))
{
Log::alert(sprintf('%s:- File already exists - skipping [%s]', self::LOGKEY, $this->file));
return Protocol::FOP_GOT;
} elseif (file_exists($this->file) && (Storage::disk('local')->size($this->local_path($ao)) > 0)) {
Log::alert(sprintf('%s:- File exists with different details - skipping [%s]',self::LOGKEY,$this->file));
return Protocol::FOP_SKIP;
} else {
// @todo I dont think we are enabling resumable sessions - need to check
Log::debug(sprintf('%s:- Opening [%s]',self::LOGKEY,$this->file));
}
// If we are only checking, we'll return (NR mode)
if ($check)
return Protocol::FOP_OK;
2021-04-01 10:59:15 +00:00
$this->f = fopen($this->file,'wb');
2021-04-01 10:59:15 +00:00
if (! $this->f) {
Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->name));
return Protocol::FOP_ERROR;
2021-04-01 10:59:15 +00:00
}
Log::info(sprintf('%s:= open - File [%s] opened for writing',self::LOGKEY,$this->receiving->name));
return Protocol::FOP_OK;
2021-04-01 10:59:15 +00:00
}
/**
* Add a new file to receive
*
* @param array $file
2023-06-27 07:39:11 +00:00
* @throws \Exception
2021-04-01 10:59:15 +00:00
*/
public function new(array $file): void
{
Log::debug(sprintf('%s:+ new [%s]',self::LOGKEY,join('|',$file)));
2021-04-01 10:59:15 +00:00
if ($this->receiving)
2023-06-27 07:39:11 +00:00
throw new \Exception('Can only have 1 file receiving at a time');
2021-04-01 10:59:15 +00:00
$o = new Item($file,self::I_RECV);
$this->list->push($o);
$this->receiving = $o;
}
private function local_path(Address $ao): string
{
return sprintf('%s/%04X-%s',config('app.fido'),$ao->id,$this->receiving->recvas);
}
2021-04-01 10:59:15 +00:00
/**
* Write data to the file we are receiving
*
* @param string $buf
* @return bool
2023-06-27 07:39:11 +00:00
* @throws \Exception
2021-04-01 10:59:15 +00:00
*/
public function write(string $buf): bool
2021-04-01 10:59:15 +00:00
{
if (! $this->f)
throw new \Exception('No file open for write');
$data = '';
2021-04-01 10:59:15 +00:00
// If we are using compression mode, then we need to buffer the right until we have everything
if ($this->comp) {
$this->comp_data .= $buf;
2021-04-01 10:59:15 +00:00
// See if we can uncompress the data yet
switch ($this->comp) {
case 'BZ2':
if (($data=bzdecompress($this->comp_data,TRUE)) === FALSE)
throw new FileException('BZ2 decompression failed?');
elseif (is_numeric($data))
throw new FileException(sprintf('BZ2 decompression failed with (:%d)?',$data));
break;
case 'GZ':
if (($data=gzdeflate($this->comp_data)) === FALSE)
throw new FileException('BZ2 decompression failed?');
break;
}
// Compressed file grew
if (! strlen($data)) {
if (strlen($this->comp_data) > $this->receiving->size) {
fclose($this->f);
$this->f = NULL;
throw new FileGrewException(sprintf('Error compressed file grew, rejecting [%d] -> [%d]', strlen($this->comp_data), $this->receiving->size));
}
return TRUE;
}
} else {
$data = $buf;
}
if ($this->file_pos+strlen($data) > $this->receiving->size)
throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$this->file_pos+strlen($buf),$this->receiving->size));
$rc = fwrite($this->f,$data);
2021-04-01 10:59:15 +00:00
if ($rc === FALSE)
throw new FileException('Error while writing to file');
$this->file_pos += $rc;
Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->file_pos,$this->receiving->size,strlen($buf)));
if (strlen($this->comp_data) > $this->receiving->size)
Log::alert(sprintf('%s:- Compression grew the file during transfer (%d->%d)',self::LOGKEY,$this->receiving->size,strlen($this->comp_data)));
2021-04-01 10:59:15 +00:00
return TRUE;
2021-04-01 10:59:15 +00:00
}
}