Optimised our sending and receiving of items
This commit is contained in:
@@ -21,7 +21,7 @@ use App\Models\Address;
|
||||
* @property-read int total_recv
|
||||
* @property-read int total_recv_bytes
|
||||
*/
|
||||
final class Receive extends Item
|
||||
class Receive extends Base
|
||||
{
|
||||
private const LOGKEY = 'IR-';
|
||||
|
||||
@@ -31,10 +31,7 @@ final class Receive extends Item
|
||||
];
|
||||
|
||||
private Address $ao;
|
||||
private Collection $list;
|
||||
private ?Item $receiving;
|
||||
|
||||
private string $file; // Local filename for file received
|
||||
/** @var ?string The compression used by the incoming file */
|
||||
private ?string $comp;
|
||||
/** @var string|null The compressed data received */
|
||||
@@ -43,41 +40,47 @@ final class Receive extends Item
|
||||
public function __construct()
|
||||
{
|
||||
// Initialise our variables
|
||||
$this->list = collect();
|
||||
$this->receiving = NULL;
|
||||
if (get_class($this) === self::class) {
|
||||
$this->list = collect();
|
||||
$this->f = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
public function __get($key)
|
||||
{
|
||||
switch ($key) {
|
||||
case 'completed':
|
||||
return $this->list
|
||||
->filter(function($item) { return $item->complete === TRUE; });
|
||||
|
||||
case 'fd':
|
||||
return is_resource($this->f);
|
||||
|
||||
case 'filepos':
|
||||
return $this->file_pos;
|
||||
|
||||
case 'mtime':
|
||||
case 'name':
|
||||
case 'nameas':
|
||||
case 'size':
|
||||
return $this->receiving?->{'file_'.$key};
|
||||
|
||||
case 'name_size_time':
|
||||
return sprintf('%s %lu %lu',$this->name,$this->size,$this->mtime);
|
||||
return $this->receiving->{$key};
|
||||
|
||||
case 'to_get':
|
||||
case 'pos':
|
||||
return $this->pos;
|
||||
|
||||
case 'receiving':
|
||||
return $this->list->get($this->index);
|
||||
|
||||
case 'ready':
|
||||
return (! is_null($this->index));
|
||||
|
||||
case 'togo_count':
|
||||
return $this->list
|
||||
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === FALSE; })
|
||||
->filter(function($item) { return $item->complete === FALSE; })
|
||||
->count();
|
||||
|
||||
case 'total_recv':
|
||||
return $this->list
|
||||
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; })
|
||||
->count();
|
||||
return $this->completed->count();
|
||||
|
||||
case 'total_recv_bytes':
|
||||
return $this->list
|
||||
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; })
|
||||
->sum(function($item) { return $item->size; });
|
||||
return $this->completed->sum(function($item) { return $item->size; });
|
||||
|
||||
default:
|
||||
throw new \Exception('Unknown key: '.$key);
|
||||
@@ -95,36 +98,31 @@ final class Receive extends Item
|
||||
throw new \Exception('No file to close');
|
||||
|
||||
if ($this->f) {
|
||||
if ($this->file_pos !== $this->receiving->size) {
|
||||
Log::warning(sprintf('%s:- Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->name,$this->receiving->size-$this->file_pos));
|
||||
$this->receiving->incomplete = TRUE;
|
||||
}
|
||||
|
||||
$this->receiving->received = TRUE;
|
||||
if ($this->pos !== $this->receiving->recvsize)
|
||||
Log::warning(sprintf('%s:- Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->nameas,$this->receiving->recvsize-$this->pos));
|
||||
else
|
||||
$this->receiving->complete = TRUE;
|
||||
|
||||
$end = time()-$this->start;
|
||||
Log::debug(sprintf('%s:- Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->name,$end));
|
||||
Log::debug(sprintf('%s:- Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->nameas,$end));
|
||||
|
||||
if ($this->comp)
|
||||
Log::info(sprintf('%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]',self::LOGKEY,$this->comp,$x=strlen($this->comp_data),$this->receiving->size,$x/$this->receiving->size*100));
|
||||
Log::info(sprintf('%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]',self::LOGKEY,$this->comp,$x=strlen($this->comp_data),$this->receiving->recvsize,$x/$this->receiving->recvsize*100));
|
||||
|
||||
fclose($this->f);
|
||||
// Set our mtime
|
||||
touch($this->file,$this->mtime);
|
||||
$this->file_pos = 0;
|
||||
touch($this->receiving->full_name,$this->receiving->mtime);
|
||||
$this->f = NULL;
|
||||
|
||||
// If the packet has been received but not the right size, dont process it any more.
|
||||
|
||||
// If we received a packet, we'll dispatch a job to process it
|
||||
if (! $this->receiving->incomplete)
|
||||
switch ($this->receiving->type) {
|
||||
// If we received a packet, we'll dispatch a job to process it, if we got it all
|
||||
if ($this->receiving->complete)
|
||||
switch ($x=$this->receiving->whatType()) {
|
||||
case self::IS_ARC:
|
||||
case self::IS_PKT:
|
||||
Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$this->receiving->type === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file));
|
||||
Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$x === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->receiving->nameas));
|
||||
|
||||
try {
|
||||
$f = new File($this->file);
|
||||
$f = new File($this->receiving->full_name);
|
||||
$processed = FALSE;
|
||||
|
||||
foreach ($f as $packet) {
|
||||
@@ -177,54 +175,72 @@ final class Receive extends Item
|
||||
}
|
||||
|
||||
if (! $processed) {
|
||||
Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file));
|
||||
Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->receiving->nameas));
|
||||
|
||||
// If we want to keep the packet, we could do that logic here
|
||||
} elseif (! config('app.packet_keep')) {
|
||||
Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->file));
|
||||
unlink($this->file);
|
||||
Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->receiving->full_name));
|
||||
unlink($this->receiving->full_name);
|
||||
}
|
||||
|
||||
} catch (InvalidPacketException $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->receiving->nameas),['e'=>$e->getMessage()]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case self::IS_TIC:
|
||||
Log::info(sprintf('%s:- Processing TIC file [%s]',self::LOGKEY,$this->file));
|
||||
Log::info(sprintf('%s:- Processing TIC file [%s]',self::LOGKEY,$this->receiving->nameas));
|
||||
|
||||
// Queue the tic to be processed later, in case the referenced file hasnt been received yet
|
||||
TicProcess::dispatch($this->file);
|
||||
TicProcess::dispatch($this->receiving->nameas);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
Log::debug(sprintf('%s:- Leaving file [%s] in the inbound dir',self::LOGKEY,$this->file));
|
||||
Log::debug(sprintf('%s:- Leaving file [%s] in the inbound dir',self::LOGKEY,$this->receiving->nameas));
|
||||
}
|
||||
}
|
||||
|
||||
$this->receiving = NULL;
|
||||
$this->index = NULL;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new file to receive
|
||||
*
|
||||
* @param array $file
|
||||
* @param Address $ao
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function new(array $file,Address $ao): void
|
||||
{
|
||||
Log::debug(sprintf('%s:+ Receiving new file [%s]',self::LOGKEY,join('|',$file)));
|
||||
|
||||
if ($this->index)
|
||||
throw new \Exception('Can only have 1 file receiving at a time');
|
||||
|
||||
$this->ao = $ao;
|
||||
|
||||
$this->list->push(new Item($ao,Arr::get($file,'name'),(int)Arr::get($file,'mtime'),(int)Arr::get($file,'size')));
|
||||
$this->index = $this->list->count()-1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open the file descriptor to receive a file
|
||||
*
|
||||
* @param Address $ao
|
||||
* @param bool $check
|
||||
* @param string|null $comp If the incoming file will be compressed
|
||||
* @return int
|
||||
* @throws \Exception
|
||||
* @todo $comp should be parsed, in case it contains other items
|
||||
*/
|
||||
public function open(Address $ao,bool $check=FALSE,string $comp=NULL): int
|
||||
public function open(bool $check=FALSE,string $comp=NULL): int
|
||||
{
|
||||
// @todo implement return 4 - SUSPEND(?) file
|
||||
// @todo Change to use Storage::class()
|
||||
if (! $this->receiving)
|
||||
if (is_null($this->index))
|
||||
throw new \Exception('No files currently receiving');
|
||||
|
||||
$this->comp_data = '';
|
||||
@@ -244,65 +260,37 @@ final class Receive extends Item
|
||||
elseif ($this->comp)
|
||||
Log::debug(sprintf('%s:- Receiving file with [%s] compression',self::LOGKEY,$this->comp));
|
||||
|
||||
$this->ao = $ao;
|
||||
$this->file_pos = 0;
|
||||
$this->pos = 0;
|
||||
$this->start = time();
|
||||
$this->file = sprintf('storage/app/%s',$this->local_path($ao));
|
||||
|
||||
if (file_exists($this->file)
|
||||
&& (Storage::disk('local')->lastModified($this->local_path($ao)) === $this->mtime)
|
||||
&& (Storage::disk('local')->size($this->local_path($ao)) === $this->size))
|
||||
{
|
||||
Log::alert(sprintf('%s:- File already exists - skipping [%s]', self::LOGKEY, $this->file));
|
||||
if ($this->receiving->exists && $this->receiving->match_mtime && $this->receiving->match_size) {
|
||||
Log::alert(sprintf('%s:- File already exists - skipping [%s]', self::LOGKEY,$this->receiving->nameas));
|
||||
return Protocol::FOP_GOT;
|
||||
|
||||
} elseif (file_exists($this->file) && (Storage::disk('local')->size($this->local_path($ao)) > 0)) {
|
||||
Log::alert(sprintf('%s:- File exists with different details - skipping [%s]',self::LOGKEY,$this->file));
|
||||
} elseif ($this->receiving->exists && $this->receiving->size > 0) {
|
||||
Log::alert(sprintf('%s:- File exists with different details - skipping [%s] (size: %d, mtime: %d)',self::LOGKEY,$this->receiving->nameas,$this->receiving->size,$this->receiving->mtime));
|
||||
return Protocol::FOP_SKIP;
|
||||
|
||||
} else {
|
||||
// @todo I dont think we are enabling resumable sessions - need to check
|
||||
Log::debug(sprintf('%s:- Opening [%s]',self::LOGKEY,$this->file));
|
||||
Log::debug(sprintf('%s:- Opening [%s]',self::LOGKEY,$this->receiving->nameas));
|
||||
}
|
||||
|
||||
// If we are only checking, we'll return (NR mode)
|
||||
if ($check)
|
||||
return Protocol::FOP_OK;
|
||||
|
||||
$this->f = fopen($this->file,'wb');
|
||||
$this->f = fopen($this->receiving->full_name,'wb');
|
||||
|
||||
if (! $this->f) {
|
||||
Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->name));
|
||||
Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->nameas));
|
||||
return Protocol::FOP_ERROR;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:= open - File [%s] opened for writing',self::LOGKEY,$this->receiving->name));
|
||||
Log::info(sprintf('%s:= File [%s] opened for writing',self::LOGKEY,$this->receiving->nameas));
|
||||
return Protocol::FOP_OK;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new file to receive
|
||||
*
|
||||
* @param array $file
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function new(array $file): void
|
||||
{
|
||||
Log::debug(sprintf('%s:+ new [%s]',self::LOGKEY,join('|',$file)));
|
||||
|
||||
if ($this->receiving)
|
||||
throw new \Exception('Can only have 1 file receiving at a time');
|
||||
|
||||
$o = new Item($file,self::I_RECV);
|
||||
$this->list->push($o);
|
||||
|
||||
$this->receiving = $o;
|
||||
}
|
||||
|
||||
private function local_path(Address $ao): string
|
||||
{
|
||||
return sprintf('%s/%04X-%s',config('app.fido'),$ao->id,$this->receiving->recvas);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to the file we are receiving
|
||||
*
|
||||
@@ -312,7 +300,7 @@ final class Receive extends Item
|
||||
*/
|
||||
public function write(string $buf): bool
|
||||
{
|
||||
if (! $this->f)
|
||||
if (! $this->fd)
|
||||
throw new \Exception('No file open for write');
|
||||
|
||||
$data = '';
|
||||
@@ -354,19 +342,19 @@ final class Receive extends Item
|
||||
$data = $buf;
|
||||
}
|
||||
|
||||
if ($this->file_pos+strlen($data) > $this->receiving->size)
|
||||
throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$this->file_pos+strlen($buf),$this->receiving->size));
|
||||
if (($x=$this->pos+strlen($data)) > $this->receiving->recvsize)
|
||||
throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$x,$this->receiving->recvsize));
|
||||
|
||||
$rc = fwrite($this->f,$data);
|
||||
|
||||
if ($rc === FALSE)
|
||||
throw new FileException('Error while writing to file');
|
||||
|
||||
$this->file_pos += $rc;
|
||||
Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->file_pos,$this->receiving->size,strlen($buf)));
|
||||
$this->pos += $rc;
|
||||
Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->pos,$this->receiving->recvsize,strlen($buf)));
|
||||
|
||||
if (strlen($this->comp_data) > $this->receiving->size)
|
||||
Log::alert(sprintf('%s:- Compression grew the file during transfer (%d->%d)',self::LOGKEY,$this->receiving->size,strlen($this->comp_data)));
|
||||
if (strlen($this->comp_data) > $this->receiving->recvsize)
|
||||
Log::alert(sprintf('%s:- Compression grew the file during transfer (%d->%d)',self::LOGKEY,$this->receiving->recvsize,strlen($this->comp_data)));
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
Reference in New Issue
Block a user