Some BINKP optimisation, implemented crypt, implemented receiving compressed transfers
This commit is contained in:
@@ -2,7 +2,6 @@
|
||||
|
||||
namespace App\Classes\File;
|
||||
|
||||
use Exception;
|
||||
use Illuminate\Contracts\Filesystem\FileNotFoundException;
|
||||
use Illuminate\Support\Facades\Storage;
|
||||
use League\Flysystem\UnreadableFileEncountered;
|
||||
@@ -23,6 +22,10 @@ class Item
|
||||
// For deep debugging
|
||||
protected bool $DEBUG = FALSE;
|
||||
|
||||
/** @var int max size of file to use compression */
|
||||
// @todo MAX_COMPSIZE hasnt been implemented in RECEIVE OR SEND
|
||||
protected const MAX_COMPSIZE = 0x1fff;
|
||||
|
||||
protected const IS_PKT = (1<<1);
|
||||
protected const IS_ARC = (1<<2);
|
||||
protected const IS_FILE = (1<<3);
|
||||
@@ -30,29 +33,33 @@ class Item
|
||||
protected const IS_REQ = (1<<5);
|
||||
protected const IS_TIC = (1<<6);
|
||||
|
||||
protected const I_RECV = (1<<6);
|
||||
protected const I_SEND = (1<<7);
|
||||
protected const I_RECV = (1<<0);
|
||||
protected const I_SEND = (1<<1);
|
||||
|
||||
protected string $file_name = '';
|
||||
protected int $file_size = 0;
|
||||
protected int $file_mtime = 0;
|
||||
protected int $file_type = 0;
|
||||
protected int $action = 0;
|
||||
/** Current read/write pointer */
|
||||
protected int $file_pos = 0;
|
||||
/** File descriptor */
|
||||
protected mixed $f = NULL;
|
||||
protected int $type;
|
||||
protected int $action;
|
||||
protected File $filemodel;
|
||||
|
||||
public bool $sent = FALSE;
|
||||
public bool $received = FALSE;
|
||||
public bool $incomplete = FALSE;
|
||||
/** Time we started sending/receiving */
|
||||
protected int $start;
|
||||
|
||||
/**
|
||||
* @throws FileNotFoundException
|
||||
* @throws UnreadableFileEncountered
|
||||
* @throws Exception
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __construct($file,int $action)
|
||||
{
|
||||
$this->action |= $action;
|
||||
|
||||
switch ($action) {
|
||||
case self::I_SEND:
|
||||
if ($file instanceof File) {
|
||||
@@ -63,7 +70,7 @@ class Item
|
||||
|
||||
} else {
|
||||
if (! is_string($file))
|
||||
throw new Exception('Invalid object creation - file should be a string');
|
||||
throw new \Exception('Invalid object creation - file should be a string');
|
||||
|
||||
if (! file_exists($file))
|
||||
throw new FileNotFoundException('Item doesnt exist: '.$file);
|
||||
@@ -83,7 +90,7 @@ class Item
|
||||
$keys = ['name','mtime','size'];
|
||||
|
||||
if (! is_array($file) || array_diff(array_keys($file),$keys))
|
||||
throw new Exception('Invalid object creation - file is not a valid array :'.serialize(array_diff(array_keys($file),$keys)));
|
||||
throw new \Exception('Invalid object creation - file is not a valid array :'.serialize(array_diff(array_keys($file),$keys)));
|
||||
|
||||
$this->file_name = $file['name'];
|
||||
$this->file_size = $file['size'];
|
||||
@@ -92,14 +99,15 @@ class Item
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Exception('Unknown action: '.$action);
|
||||
throw new \Exception('Unknown action: '.$action);
|
||||
}
|
||||
|
||||
$this->file_type |= $this->whatType();
|
||||
$this->action = $action;
|
||||
$this->type = $this->whatType();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws Exception
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __get($key)
|
||||
{
|
||||
@@ -107,10 +115,10 @@ class Item
|
||||
case 'mtime':
|
||||
case 'name':
|
||||
case 'size':
|
||||
if ($this->action & self::I_RECV)
|
||||
if ($this->action & self::I_RECV|self::I_SEND)
|
||||
return $this->{'file_'.$key};
|
||||
|
||||
throw new Exception('Invalid request for key: '.$key);
|
||||
throw new \Exception('Invalid request for key: '.$key);
|
||||
|
||||
case 'recvas':
|
||||
return $this->file_name;
|
||||
@@ -119,13 +127,13 @@ class Item
|
||||
return $this->file_name ? basename($this->file_name) : $this->filemodel->name;
|
||||
|
||||
default:
|
||||
throw new Exception('Unknown key: '.$key);
|
||||
throw new \Exception('Unknown key: '.$key);
|
||||
}
|
||||
}
|
||||
|
||||
protected function isType(int $type): bool
|
||||
{
|
||||
return $this->file_type & $type;
|
||||
return $this->type & $type;
|
||||
}
|
||||
|
||||
private function whatType(): int
|
||||
|
@@ -15,8 +15,6 @@ class Mail extends Item
|
||||
*/
|
||||
public function __construct(Packet $mail,int $action)
|
||||
{
|
||||
$this->action |= $action;
|
||||
|
||||
switch ($action) {
|
||||
case self::I_SEND:
|
||||
$this->file = $mail;
|
||||
@@ -29,6 +27,8 @@ class Mail extends Item
|
||||
default:
|
||||
throw new \Exception('Unknown action: '.$action);
|
||||
}
|
||||
|
||||
$this->action = $action;
|
||||
}
|
||||
|
||||
public function read(int $start,int $length): string
|
||||
|
@@ -5,10 +5,12 @@ namespace App\Classes\File;
|
||||
use Illuminate\Support\Arr;
|
||||
use Illuminate\Support\Collection;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Storage;
|
||||
use Symfony\Component\HttpFoundation\File\Exception\FileException;
|
||||
|
||||
use App\Classes\File;
|
||||
use App\Classes\{File,Protocol};
|
||||
use App\Classes\FTN\{InvalidPacketException,Packet};
|
||||
use App\Exceptions\FileGrewException;
|
||||
use App\Jobs\{MessageProcess,TicProcess};
|
||||
use App\Models\Address;
|
||||
|
||||
@@ -23,22 +25,26 @@ final class Receive extends Item
|
||||
{
|
||||
private const LOGKEY = 'IR-';
|
||||
|
||||
private const compression = [
|
||||
'BZ2',
|
||||
'GZ',
|
||||
];
|
||||
|
||||
private Address $ao;
|
||||
private Collection $list;
|
||||
private ?Item $receiving;
|
||||
|
||||
private mixed $f; // File descriptor
|
||||
private int $start; // Time we started receiving
|
||||
private int $file_pos; // Current write pointer
|
||||
private string $file; // Local filename for file received
|
||||
/** @var ?string The compression used by the incoming file */
|
||||
private ?string $comp;
|
||||
/** @var string|null The compressed data received */
|
||||
private ?string $comp_data;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
// Initialise our variables
|
||||
$this->list = collect();
|
||||
$this->receiving = NULL;
|
||||
$this->file_pos = 0;
|
||||
$this->f = NULL;
|
||||
}
|
||||
|
||||
public function __get($key)
|
||||
@@ -71,7 +77,7 @@ final class Receive extends Item
|
||||
case 'total_recv_bytes':
|
||||
return $this->list
|
||||
->filter(function($item) { return ($item->action & self::I_RECV) && $item->received === TRUE; })
|
||||
->sum(function($item) { return $item->file_size; });
|
||||
->sum(function($item) { return $item->size; });
|
||||
|
||||
default:
|
||||
throw new \Exception('Unknown key: '.$key);
|
||||
@@ -85,114 +91,121 @@ final class Receive extends Item
|
||||
*/
|
||||
public function close(): void
|
||||
{
|
||||
if (! $this->f)
|
||||
if (! $this->receiving)
|
||||
throw new \Exception('No file to close');
|
||||
|
||||
if ($this->file_pos != $this->receiving->file_size) {
|
||||
Log::warning(sprintf('%s: - Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->file_name,$this->receiving->file_size-$this->file_pos));
|
||||
$this->receiving->incomplete = TRUE;
|
||||
}
|
||||
if ($this->f) {
|
||||
if ($this->file_pos !== $this->receiving->size) {
|
||||
Log::warning(sprintf('%s:- Closing [%s], but missing [%d] bytes',self::LOGKEY,$this->receiving->name,$this->receiving->size-$this->file_pos));
|
||||
$this->receiving->incomplete = TRUE;
|
||||
}
|
||||
|
||||
$this->receiving->received = TRUE;
|
||||
$this->receiving->received = TRUE;
|
||||
|
||||
$end = time()-$this->start;
|
||||
Log::debug(sprintf('%s: - Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->file_name,$end));
|
||||
$end = time()-$this->start;
|
||||
Log::debug(sprintf('%s:- Closing [%s], received in [%d]',self::LOGKEY,$this->receiving->name,$end));
|
||||
|
||||
fclose($this->f);
|
||||
$this->file_pos = 0;
|
||||
$this->f = NULL;
|
||||
if ($this->comp)
|
||||
Log::info(sprintf('%s:= Compressed file using [%s] was [%d] bytes (%d). Compression rate [%3.2f%%]',self::LOGKEY,$this->comp,$x=strlen($this->comp_data),$this->receiving->size,$x/$this->receiving->size*100));
|
||||
|
||||
// If the packet has been received but not the right size, dont process it any more.
|
||||
fclose($this->f);
|
||||
// Set our mtime
|
||||
touch($this->file,$this->mtime);
|
||||
$this->file_pos = 0;
|
||||
$this->f = NULL;
|
||||
|
||||
// If we received a packet, we'll dispatch a job to process it
|
||||
if (! $this->receiving->incomplete)
|
||||
switch ($this->receiving->file_type) {
|
||||
case self::IS_ARC:
|
||||
case self::IS_PKT:
|
||||
Log::info(sprintf('%s: - Processing mail %s [%s]',self::LOGKEY,$this->receiving->file_type === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file));
|
||||
// If the packet has been received but not the right size, dont process it any more.
|
||||
|
||||
try {
|
||||
$f = new File($this->file);
|
||||
$processed = FALSE;
|
||||
// If we received a packet, we'll dispatch a job to process it
|
||||
if (! $this->receiving->incomplete)
|
||||
switch ($this->receiving->type) {
|
||||
case self::IS_ARC:
|
||||
case self::IS_PKT:
|
||||
Log::info(sprintf('%s:- Processing mail %s [%s]',self::LOGKEY,$this->receiving->type === self::IS_PKT ? 'PACKET' : 'ARCHIVE',$this->file));
|
||||
|
||||
foreach ($f as $packet) {
|
||||
$po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system);
|
||||
try {
|
||||
$f = new File($this->file);
|
||||
$processed = FALSE;
|
||||
|
||||
// Check the messages are from the uplink
|
||||
if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) {
|
||||
Log::error(sprintf('%s: ! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id));
|
||||
foreach ($f as $packet) {
|
||||
$po = Packet::process($packet,Arr::get(stream_get_meta_data($packet),'uri'),$f->itemSize(),$this->ao->system);
|
||||
|
||||
break;
|
||||
}
|
||||
// Check the messages are from the uplink
|
||||
if ($this->ao->system->addresses->search(function($item) use ($po) { return $item->id === $po->fftn_o->id; }) === FALSE) {
|
||||
Log::error(sprintf('%s:! Packet [%s] is not from this link? [%d]',self::LOGKEY,$po->fftn_o->ftn,$this->ao->system_id));
|
||||
|
||||
// Check the packet password
|
||||
if ($this->ao->session('pktpass') != $po->password) {
|
||||
Log::error(sprintf('%s: ! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password));
|
||||
|
||||
// @todo Generate message to system advising invalid password - that message should be sent without a packet password!
|
||||
break;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s: - Packet has [%d] messages',self::LOGKEY,$po->count()));
|
||||
|
||||
// Queue messages if there are too many in the packet.
|
||||
if ($queue = ($po->count() > config('app.queue_msgs')))
|
||||
Log::info(sprintf('%s: - Messages will be sent to the queue for processing',self::LOGKEY));
|
||||
|
||||
$count = 0;
|
||||
foreach ($po as $msg) {
|
||||
Log::info(sprintf('%s: - Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn));
|
||||
|
||||
// @todo Quick check that the packet should be processed by us.
|
||||
// @todo validate that the packet's zone is in the domain.
|
||||
|
||||
try {
|
||||
// Dispatch job.
|
||||
if ($queue)
|
||||
MessageProcess::dispatch($msg,$f->pktName());
|
||||
else
|
||||
MessageProcess::dispatchSync($msg,$f->pktName());
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage()));
|
||||
break;
|
||||
}
|
||||
|
||||
$count++;
|
||||
// Check the packet password
|
||||
if ($this->ao->session('pktpass') != $po->password) {
|
||||
Log::error(sprintf('%s:! Packet from [%s] with password [%s] is invalid.',self::LOGKEY,$this->ao->ftn,$po->password));
|
||||
|
||||
// @todo Generate message to system advising invalid password - that message should be sent without a packet password!
|
||||
break;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:- Packet has [%d] messages',self::LOGKEY,$po->count()));
|
||||
|
||||
// Queue messages if there are too many in the packet.
|
||||
if ($queue = ($po->count() > config('app.queue_msgs')))
|
||||
Log::info(sprintf('%s:- Messages will be sent to the queue for processing',self::LOGKEY));
|
||||
|
||||
$count = 0;
|
||||
foreach ($po as $msg) {
|
||||
Log::info(sprintf('%s:- Mail from [%s] to [%s]',self::LOGKEY,$msg->fftn,$msg->tftn));
|
||||
|
||||
// @todo Quick check that the packet should be processed by us.
|
||||
// @todo validate that the packet's zone is in the domain.
|
||||
|
||||
try {
|
||||
// Dispatch job.
|
||||
if ($queue)
|
||||
MessageProcess::dispatch($msg,$f->pktName());
|
||||
else
|
||||
MessageProcess::dispatchSync($msg,$f->pktName());
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:! Got error dispatching message [%s] (%d:%s-%s).',self::LOGKEY,$msg->msgid,$e->getLine(),$e->getFile(),$e->getMessage()));
|
||||
}
|
||||
|
||||
$count++;
|
||||
}
|
||||
|
||||
if ($count === $po->count())
|
||||
$processed = TRUE;
|
||||
}
|
||||
|
||||
if ($count === $po->count())
|
||||
$processed = TRUE;
|
||||
if (! $processed) {
|
||||
Log::alert(sprintf('%s:- Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file));
|
||||
|
||||
// If we want to keep the packet, we could do that logic here
|
||||
} elseif (! config('app.packet_keep')) {
|
||||
Log::debug(sprintf('%s:- Deleting processed packet [%s]',self::LOGKEY,$this->file));
|
||||
unlink($this->file);
|
||||
}
|
||||
|
||||
} catch (InvalidPacketException $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s:- Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
|
||||
}
|
||||
|
||||
if (! $processed) {
|
||||
Log::alert(sprintf('%s: - Not deleting packet [%s], it doesnt seem to be processed?',self::LOGKEY,$this->file));
|
||||
break;
|
||||
|
||||
// If we want to keep the packet, we could do that logic here
|
||||
} elseif (! config('app.packet_keep')) {
|
||||
Log::debug(sprintf('%s: - Deleting processed packet [%s]',self::LOGKEY,$this->file));
|
||||
unlink($this->file);
|
||||
}
|
||||
case self::IS_TIC:
|
||||
Log::info(sprintf('%s:- Processing TIC file [%s]',self::LOGKEY,$this->file));
|
||||
|
||||
} catch (InvalidPacketException $e) {
|
||||
Log::error(sprintf('%s: - Not deleting packet [%s], as it generated an InvalidPacketException',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
|
||||
// Queue the tic to be processed later, in case the referenced file hasnt been received yet
|
||||
TicProcess::dispatch($this->file);
|
||||
|
||||
} catch (\Exception $e) {
|
||||
Log::error(sprintf('%s: - Not deleting packet [%s], as it generated an uncaught exception',self::LOGKEY,$this->file),['e'=>$e->getMessage()]);
|
||||
}
|
||||
break;
|
||||
|
||||
break;
|
||||
|
||||
case self::IS_TIC:
|
||||
Log::info(sprintf('%s: - Processing TIC file [%s]',self::LOGKEY,$this->file));
|
||||
|
||||
// Queue the tic to be processed later, in case the referenced file hasnt been received yet
|
||||
TicProcess::dispatch($this->file);
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
Log::debug(sprintf('%s: - Leaving file [%s] in the inbound dir',self::LOGKEY,$this->file));
|
||||
}
|
||||
default:
|
||||
Log::debug(sprintf('%s:- Leaving file [%s] in the inbound dir',self::LOGKEY,$this->file));
|
||||
}
|
||||
}
|
||||
|
||||
$this->receiving = NULL;
|
||||
}
|
||||
@@ -202,38 +215,66 @@ final class Receive extends Item
|
||||
*
|
||||
* @param Address $ao
|
||||
* @param bool $check
|
||||
* @return bool
|
||||
* @param string|null $comp If the incoming file will be compressed
|
||||
* @return int
|
||||
* @throws \Exception
|
||||
* @todo $comp should be parsed, in case it contains other items
|
||||
*/
|
||||
public function open(Address $ao,bool $check=FALSE): bool
|
||||
public function open(Address $ao,bool $check=FALSE,string $comp=NULL): int
|
||||
{
|
||||
Log::debug(sprintf('%s:+ open [%d]',self::LOGKEY,$check));
|
||||
|
||||
// Check we can open this file
|
||||
// @todo
|
||||
// @todo implement return 2 - SKIP file
|
||||
// @todo implement return 4 - SUSPEND(?) file
|
||||
if ($check) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// @todo Change to use Storage::class()
|
||||
if (! $this->receiving)
|
||||
throw new \Exception('No files currently receiving');
|
||||
|
||||
$this->comp_data = '';
|
||||
$this->comp = $comp;
|
||||
|
||||
/*
|
||||
if ($this->receiving->size <= self::MAX_COMPSIZE) {
|
||||
$this->comp = $comp;
|
||||
|
||||
} else {
|
||||
Log::alert(sprintf('%s:- Compression [%s] disabled for file [%s], its size is too big [%d]',self::LOGKEY,$comp,$this->receiving->name,$this->receiving->size));
|
||||
}
|
||||
*/
|
||||
|
||||
if ($this->comp && (! in_array($this->comp,self::compression)))
|
||||
throw new \Exception('Unsupported compression:'.$this->comp);
|
||||
elseif ($this->comp)
|
||||
Log::debug(sprintf('%s:- Receiving file with [%s] compression',self::LOGKEY,$this->comp));
|
||||
|
||||
$this->ao = $ao;
|
||||
$this->file_pos = 0;
|
||||
$this->start = time();
|
||||
$this->file = sprintf('storage/app/%s/%04X-%s',config('app.fido'),$this->ao->id,$this->receiving->recvas);
|
||||
$this->file = sprintf('storage/app/%s',$this->local_path($ao));
|
||||
|
||||
Log::debug(sprintf('%s: - Opening [%s]',self::LOGKEY,$this->file));
|
||||
$this->f = fopen($this->file,'wb');
|
||||
if (! $this->f) {
|
||||
Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->file_name));
|
||||
return 3; // @todo change to const
|
||||
if (file_exists($this->file)
|
||||
&& (Storage::disk('local')->lastModified($this->local_path($ao)) === $this->mtime)
|
||||
&& (Storage::disk('local')->size($this->local_path($ao)) === $this->size)) {
|
||||
Log::alert(sprintf('%s:- File already exists - skipping [%s]', self::LOGKEY, $this->file));
|
||||
return Protocol::FOP_SKIP;
|
||||
|
||||
} elseif (file_exists($this->file) && (Storage::disk('local')->size($this->local_path($ao)) > 0)) {
|
||||
Log::alert(sprintf('%s:- File exists with different details - skipping [%s]',self::LOGKEY,$this->file));
|
||||
return Protocol::FOP_SUSPEND;
|
||||
|
||||
} else {
|
||||
Log::debug(sprintf('%s:- Opening [%s]',self::LOGKEY,$this->file));
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:= open - File [%s] opened for writing',self::LOGKEY,$this->receiving->file_name));
|
||||
return 0; // @todo change to const
|
||||
// If we are only checking, we'll return (NR mode)
|
||||
if ($check)
|
||||
return Protocol::FOP_OK;
|
||||
|
||||
$this->f = fopen($this->file,'wb');
|
||||
if (! $this->f) {
|
||||
Log::error(sprintf('%s:! Unable to open file [%s] for writing',self::LOGKEY,$this->receiving->name));
|
||||
return Protocol::FOP_ERROR;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:= open - File [%s] opened for writing',self::LOGKEY,$this->receiving->name));
|
||||
return Protocol::FOP_OK;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -255,29 +296,76 @@ final class Receive extends Item
|
||||
$this->receiving = $o;
|
||||
}
|
||||
|
||||
private function local_path(Address $ao): string
|
||||
{
|
||||
return sprintf('%s/%04X-%s',config('app.fido'),$ao->id,$this->receiving->recvas);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write data to the file we are receiving
|
||||
*
|
||||
* @param string $buf
|
||||
* @return int
|
||||
* @return bool
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function write(string $buf): int
|
||||
public function write(string $buf): bool
|
||||
{
|
||||
if (! $this->f)
|
||||
throw new \Exception('No file open for read');
|
||||
throw new \Exception('No file open for write');
|
||||
|
||||
if ($this->file_pos+strlen($buf) > $this->receiving->file_size)
|
||||
throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$this->file_pos+strlen($buf),$this->receiving->file_size));
|
||||
$data = '';
|
||||
|
||||
$rc = fwrite($this->f,$buf);
|
||||
// If we are using compression mode, then we need to buffer the right until we have everything
|
||||
if ($this->comp) {
|
||||
$this->comp_data .= $buf;
|
||||
|
||||
// See if we can uncompress the data yet
|
||||
switch ($this->comp) {
|
||||
case 'BZ2':
|
||||
if (($data=bzdecompress($this->comp_data,TRUE)) === FALSE)
|
||||
throw new FileException('BZ2 decompression failed?');
|
||||
elseif (is_numeric($data))
|
||||
throw new FileException(sprintf('BZ2 decompression failed with (:%d)?',$data));
|
||||
|
||||
break;
|
||||
|
||||
case 'GZ':
|
||||
if (($data=gzdeflate($this->comp_data)) === FALSE)
|
||||
throw new FileException('BZ2 decompression failed?');
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
// Compressed file grew
|
||||
if (! strlen($data)) {
|
||||
if (strlen($this->comp_data) > $this->receiving->size) {
|
||||
fclose($this->f);
|
||||
$this->f = NULL;
|
||||
|
||||
throw new FileGrewException(sprintf('Error compressed file grew, rejecting [%d] -> [%d]', strlen($this->comp_data), $this->receiving->size));
|
||||
}
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
} else {
|
||||
$data = $buf;
|
||||
}
|
||||
|
||||
if ($this->file_pos+strlen($data) > $this->receiving->size)
|
||||
throw new \Exception(sprintf('Too many bytes received [%d] (%d)?',$this->file_pos+strlen($buf),$this->receiving->size));
|
||||
|
||||
$rc = fwrite($this->f,$data);
|
||||
|
||||
if ($rc === FALSE)
|
||||
throw new FileException('Error while writing to file');
|
||||
|
||||
$this->file_pos += $rc;
|
||||
Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->file_pos,$this->receiving->file_size,strlen($buf)));
|
||||
Log::debug(sprintf('%s:- Write [%d] bytes, file pos now [%d] of [%d] (%d)',self::LOGKEY,$rc,$this->file_pos,$this->receiving->size,strlen($buf)));
|
||||
|
||||
return $rc;
|
||||
if (strlen($this->comp_data) > $this->receiving->size)
|
||||
Log::alert(sprintf('%s:- Compression grew the file during transfer (%d->%d)',self::LOGKEY,$this->receiving->size,strlen($this->comp_data)));
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
}
|
@@ -15,9 +15,6 @@ use App\Models\Address;
|
||||
* Object representing the files we are sending
|
||||
*
|
||||
* @property-read resource fd
|
||||
* @property-read int file_mtime
|
||||
* @property-read int file_size
|
||||
* @property-read string file_name
|
||||
* @property-read int mail_size
|
||||
* @property-read int total_count
|
||||
* @property-read int total_sent
|
||||
@@ -31,9 +28,7 @@ final class Send extends Item
|
||||
private ?Item $sending;
|
||||
private Collection $packets;
|
||||
|
||||
private mixed $f; // File descriptor
|
||||
private int $start; // Time we started sending
|
||||
private int $file_pos; // Current read pointer
|
||||
private string $comp_data;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
@@ -41,8 +36,6 @@ final class Send extends Item
|
||||
$this->list = collect();
|
||||
$this->packets = collect();
|
||||
$this->sending = NULL;
|
||||
$this->file_pos = 0;
|
||||
$this->f = NULL;
|
||||
}
|
||||
|
||||
public function __get($key)
|
||||
@@ -51,15 +44,15 @@ final class Send extends Item
|
||||
case 'fd':
|
||||
return is_resource($this->f) ?: $this->f;
|
||||
|
||||
case 'file_count':
|
||||
case 'files_count':
|
||||
return $this->list
|
||||
->filter(function($item) { return $item->isType(self::IS_FILE|self::IS_TIC); })
|
||||
->count();
|
||||
|
||||
case 'file_size':
|
||||
case 'files_size':
|
||||
return $this->list
|
||||
->filter(function($item) { return $item->isType(self::IS_FILE|self::IS_TIC); })
|
||||
->sum(function($item) { return $item->file_size; });
|
||||
->sum(function($item) { return $item->size; });
|
||||
|
||||
case 'filepos':
|
||||
return $this->file_pos;
|
||||
@@ -73,8 +66,8 @@ final class Send extends Item
|
||||
case 'mail_size':
|
||||
return $this->list
|
||||
->filter(function($item) { return $item->isType(self::IS_ARC|self::IS_PKT); })
|
||||
->sum(function($item) { return $item->file_size; })
|
||||
+ $this->packets->sum(function($item) { return $item->file_size; });
|
||||
->sum(function($item) { return $item->size; })
|
||||
+ $this->packets->sum(function($item) { return $item->size; });
|
||||
|
||||
case 'sendas':
|
||||
return $this->sending ? $this->sending->{$key} : NULL;
|
||||
@@ -95,10 +88,10 @@ final class Send extends Item
|
||||
case 'total_sent_bytes':
|
||||
return $this->list
|
||||
->filter(function($item) { return ($item->action & self::I_SEND) && $item->sent === TRUE; })
|
||||
->sum(function($item) { return $item->file_size; })
|
||||
->sum(function($item) { return $item->size; })
|
||||
+ $this->packets
|
||||
->filter(function($item) { return ($item->action & self::I_SEND) && $item->sent === TRUE; })
|
||||
->sum(function($item) { return $item->file_size; });
|
||||
->sum(function($item) { return $item->size; });
|
||||
|
||||
case 'total_count':
|
||||
return $this->list
|
||||
@@ -110,8 +103,8 @@ final class Send extends Item
|
||||
|
||||
case 'total_size':
|
||||
return $this->list
|
||||
->sum(function($item) { return $item->file_size; })
|
||||
+ $this->packets->sum(function($item) { return $item->file_size; });
|
||||
->sum(function($item) { return $item->size; })
|
||||
+ $this->packets->sum(function($item) { return $item->size; });
|
||||
|
||||
default:
|
||||
throw new Exception('Unknown key: '.$key);
|
||||
@@ -146,6 +139,21 @@ final class Send extends Item
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
private function compress(string $comp_mode): void
|
||||
{
|
||||
switch ($comp_mode) {
|
||||
case 'BZ2':
|
||||
$this->comp_data = bzcompress($buf);
|
||||
break;
|
||||
|
||||
case 'GZ':
|
||||
$this->comp_data = gzcompress($buf);
|
||||
break;
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Close the file descriptor of the file we are sending
|
||||
*
|
||||
@@ -160,7 +168,7 @@ final class Send extends Item
|
||||
if ($successful) {
|
||||
$this->sending->sent = TRUE;
|
||||
$end = time()-$this->start;
|
||||
Log::debug(sprintf('%s: - Closing [%s], sent in [%d]',self::LOGKEY,$this->sending->file_name,$end));
|
||||
Log::debug(sprintf('%s: - Closing [%s], sent in [%d]',self::LOGKEY,$this->sending->name,$end));
|
||||
}
|
||||
|
||||
// @todo This should be done better isType == file?
|
||||
@@ -208,9 +216,9 @@ final class Send extends Item
|
||||
Log::debug(sprintf('%s:- [%d] Files(s) added for sending to [%s]',self::LOGKEY,$x->count(),$ao->ftn));
|
||||
|
||||
// Add Files
|
||||
foreach ($x as $xx) {
|
||||
$this->list->push(new Item($xx,self::I_SEND));
|
||||
$this->list->push(new Tic($ao,$xx,self::I_SEND));
|
||||
foreach ($x as $fo) {
|
||||
$this->list->push(new Item($fo,self::I_SEND));
|
||||
$this->list->push(new Tic($ao,$fo,self::I_SEND));
|
||||
}
|
||||
|
||||
$file = TRUE;
|
||||
@@ -222,10 +230,11 @@ final class Send extends Item
|
||||
/**
|
||||
* Open a file for sending
|
||||
*
|
||||
* @param string $compress
|
||||
* @return bool
|
||||
* @throws Exception
|
||||
*/
|
||||
public function open(): bool
|
||||
public function open(string $compress=''): bool
|
||||
{
|
||||
Log::debug(sprintf('%s:+ open',self::LOGKEY));
|
||||
|
||||
@@ -238,6 +247,11 @@ final class Send extends Item
|
||||
$this->start = time();
|
||||
$this->f = TRUE;
|
||||
|
||||
/*
|
||||
if ($compress)
|
||||
$this->comp_data = $this->compdata($compress);
|
||||
*/
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
@@ -258,18 +272,19 @@ final class Send extends Item
|
||||
}
|
||||
|
||||
// If sending file is a File::class, then our file is s3
|
||||
if (! $this->sending->file_name && $this->sending->filemodel) {
|
||||
if (! $this->sending->name && $this->sending->filemodel) {
|
||||
$this->f = Storage::readStream($this->sending->filemodel->full_storage_path);
|
||||
return TRUE;
|
||||
|
||||
} else {
|
||||
$this->f = fopen($this->sending->file_name,'rb');
|
||||
$this->f = fopen($this->sending->name,'rb');
|
||||
|
||||
if (! $this->f) {
|
||||
Log::error(sprintf('%s:! Unable to open file [%s] for reading',self::LOGKEY,$this->sending->file_name));
|
||||
Log::error(sprintf('%s:! Unable to open file [%s] for reading',self::LOGKEY,$this->sending->name));
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:= open - File [%s] opened with size [%d]',self::LOGKEY,$this->sending->file_name,$this->sending->file_size));
|
||||
Log::info(sprintf('%s:= open - File [%s] opened with size [%d]',self::LOGKEY,$this->sending->name,$this->sending->size));
|
||||
return TRUE;
|
||||
}
|
||||
}
|
||||
@@ -343,7 +358,7 @@ final class Send extends Item
|
||||
Log::debug(sprintf('%s: - Read [%d] bytes, file pos now [%d]',self::LOGKEY,strlen($data),$this->file_pos));
|
||||
|
||||
if ($data === FALSE)
|
||||
throw new UnreadableFileEncountered('Error reading file: '.$this->sending->file_name);
|
||||
throw new UnreadableFileEncountered('Error reading file: '.$this->sending->name);
|
||||
|
||||
return $data;
|
||||
}
|
||||
|
@@ -7,19 +7,17 @@ use App\Models\{Address,File};
|
||||
|
||||
class Tic extends Item
|
||||
{
|
||||
private string $file;
|
||||
|
||||
/**
|
||||
* @throws \Exception
|
||||
*/
|
||||
public function __construct(Address $ao,File $fo,int $action)
|
||||
{
|
||||
$this->action |= $action;
|
||||
|
||||
$tic = new FTNTic;
|
||||
|
||||
switch ($action) {
|
||||
case self::I_SEND:
|
||||
$tic = new FTNTic;
|
||||
$this->file = $tic->generate($ao,$fo);
|
||||
$this->file_type = self::IS_TIC;
|
||||
$this->file_name = sprintf('%s.tic',sprintf('%08x',$fo->id));
|
||||
$this->file_size = strlen($this->file);
|
||||
$this->file_mtime = $fo->created_at->timestamp;
|
||||
@@ -29,6 +27,9 @@ class Tic extends Item
|
||||
default:
|
||||
throw new \Exception('Unknown action: '.$action);
|
||||
}
|
||||
|
||||
$this->action = $action;
|
||||
$this->type = self::IS_TIC;
|
||||
}
|
||||
|
||||
public function read(int $start,int $length): string
|
||||
|
Reference in New Issue
Block a user