Added system polling
This commit is contained in:
@@ -3,24 +3,37 @@
|
||||
namespace App\Jobs;
|
||||
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldBeUnique;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\ManuallyFailedException;
|
||||
use Illuminate\Queue\MaxAttemptsExceededException;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Illuminate\Support\Facades\Notification;
|
||||
|
||||
use App\Classes\Protocol;
|
||||
use App\Classes\Protocol\{Binkp,EMSI};
|
||||
use App\Classes\Sock\SocketClient;
|
||||
use App\Classes\Sock\SocketException;
|
||||
use App\Models\{Address,Mailer,Setup};
|
||||
use App\Notifications\Netmails\PollingFailed;
|
||||
|
||||
class AddressPoll implements ShouldQueue
|
||||
class AddressPoll implements ShouldQueue, ShouldBeUnique
|
||||
{
|
||||
private const LOGKEY = 'JAP';
|
||||
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
public $tries = 5;
|
||||
|
||||
public $maxExceptions = 1;
|
||||
|
||||
public $failOnTimeout = TRUE;
|
||||
|
||||
public const QUEUE = 'poll';
|
||||
|
||||
private Address $ao;
|
||||
private ?Mailer $mo;
|
||||
|
||||
@@ -28,6 +41,50 @@ class AddressPoll implements ShouldQueue
|
||||
{
|
||||
$this->ao = $ao;
|
||||
$this->mo = $mo;
|
||||
|
||||
$this->onQueue(self::QUEUE);
|
||||
}
|
||||
|
||||
public function __get($key): mixed
|
||||
{
|
||||
switch ($key) {
|
||||
case 'address':
|
||||
return $this->ao;
|
||||
|
||||
case 'subject':
|
||||
return $this->ao->ftn;
|
||||
|
||||
default:
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Because pluck doesnt return __get() defined vars
|
||||
*
|
||||
* @param $key
|
||||
* @return bool
|
||||
*/
|
||||
public function __isset($key): bool
|
||||
{
|
||||
$keys = ['address'];
|
||||
|
||||
return in_array($key,$keys);
|
||||
}
|
||||
|
||||
/**
|
||||
* Time to wait between tries
|
||||
*
|
||||
* @return int[] in seconds
|
||||
*/
|
||||
public function backoff(): array
|
||||
{
|
||||
return [
|
||||
60*5, // 5 mins
|
||||
60*60, // 1 hr
|
||||
60*60*6, // 6 hrs
|
||||
60*60*12 // 12 hrs
|
||||
];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -35,24 +92,19 @@ class AddressPoll implements ShouldQueue
|
||||
*/
|
||||
public function handle()
|
||||
{
|
||||
if (! $this->ao->system->mailer_preferred->count() || ($this->mo && (! $this->ao->system->mailer_preferred->find($this->mo))))
|
||||
throw new \Exception(sprintf('Unable to poll [%s] missing mailer details',$this->ao->ftn));
|
||||
if (! $this->ao->system->mailer_preferred->count() || ($this->mo && (! $this->ao->system->mailer_preferred->find($this->mo)))) {
|
||||
$this->fail('Missing mailer details');
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
Log::info(sprintf('%s:- Polling [%s] - attempt [%d]',self::LOGKEY,$this->ao->ftn,$this->attempts()));
|
||||
|
||||
foreach ($this->ao->system->mailer_preferred as $o) {
|
||||
// If we chose a protocol, skip to find the mailer details for it
|
||||
if ($this->mo && ($o->id !== $this->mo->id))
|
||||
continue;
|
||||
|
||||
Log::info(sprintf('%s:- Starting a [%s] session to [%s:%d]',self::LOGKEY,$o->name,$this->ao->system->address,$o->pivot->port));
|
||||
|
||||
try {
|
||||
$client = SocketClient::create($this->ao->system->address,$o->pivot->port);
|
||||
|
||||
} catch (SocketException $e) {
|
||||
Log::error(sprintf('%s:! Unable to connect to [%s]: %s',self::LOGKEY,$this->ao->ftn,$e->getMessage()));
|
||||
exit(1);
|
||||
}
|
||||
|
||||
switch ($o->name) {
|
||||
case 'BINKP':
|
||||
$s = new Binkp(Setup::findOrFail(config('app.id')));
|
||||
@@ -67,16 +119,59 @@ class AddressPoll implements ShouldQueue
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new \Exception(sprintf('Node [%s] has a mailer type that is unhandled',$this->ao->ftn));
|
||||
$this->fail('Mailer type unhandled');
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (($s->session($session,$client,$this->ao) & Protocol::S_MASK) === Protocol::S_OK) {
|
||||
Log::info(sprintf('%s:= Connection ended successfully with [%s]',self::LOGKEY,$client->address_remote));
|
||||
break;
|
||||
Log::info(sprintf('%s:- Trying a [%s] session to [%s:%d] (%s)',
|
||||
self::LOGKEY,$o->name,$this->ao->system->address,$o->pivot->port,$this->ao->ftn));
|
||||
|
||||
} else {
|
||||
Log::alert(sprintf('%s:! Connection failed to [%s]',self::LOGKEY,$client->address_remote));
|
||||
try {
|
||||
$client = SocketClient::create($this->ao->system->address,$o->pivot->port);
|
||||
|
||||
if (($s->session($session,$client,$this->ao) & Protocol::S_MASK) === Protocol::S_OK) {
|
||||
Log::info(sprintf('%s:= Connection ended successfully with [%s] (%s)',self::LOGKEY,$client->address_remote,$this->ao->ftn));
|
||||
return;
|
||||
|
||||
} else {
|
||||
Log::alert(sprintf('%s:! Connection failed to [%s] (%s)',self::LOGKEY,$client->address_remote,$this->ao->ftn));
|
||||
}
|
||||
|
||||
} catch (SocketException $e) {
|
||||
Log::error(sprintf('%s:! Unable to connect to [%s]: %s',self::LOGKEY,$this->ao->ftn,$e->getMessage()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
$delay = (int)($this->backoff()[$this->attempts()-1] ?? last($this->backoff()));
|
||||
Log::info(sprintf('%s:= Retrying poll in %d seconds',self::LOGKEY,$delay));
|
||||
$this->release($delay);
|
||||
}
|
||||
|
||||
public function failed(\Throwable $exception): void
|
||||
{
|
||||
switch (get_class($exception)) {
|
||||
case ManuallyFailedException::class:
|
||||
Log::error(sprintf('%s:! Address Poll failed for [%s] (%s)',self::LOGKEY,$this->ao->ftn,$exception->getMessage()));
|
||||
break;
|
||||
|
||||
case MaxAttemptsExceededException::class:
|
||||
Log::error(sprintf('%s:! Address Poll was tried too many times for [%s]',self::LOGKEY,$this->ao->ftn));
|
||||
Notification::route('netmail',$this->ao)->notify(new PollingFailed);
|
||||
|
||||
$this->ao->system->autohold = TRUE;
|
||||
$this->ao->system->save();
|
||||
|
||||
exit(0);
|
||||
|
||||
default:
|
||||
Log::error(sprintf('%s:! Address Poll to [%s] with an unknown exception [%s]',self::LOGKEY,$this->ao->ftn,$exception->getMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
public function uniqueId(): string
|
||||
{
|
||||
return $this->ao->id;
|
||||
}
|
||||
}
|
179
app/Jobs/MailSend.php
Normal file
179
app/Jobs/MailSend.php
Normal file
@@ -0,0 +1,179 @@
|
||||
<?php
|
||||
|
||||
namespace App\Jobs;
|
||||
|
||||
use Illuminate\Bus\Queueable;
|
||||
use Illuminate\Contracts\Queue\ShouldBeUnique;
|
||||
use Illuminate\Contracts\Queue\ShouldQueue;
|
||||
use Illuminate\Foundation\Bus\Dispatchable;
|
||||
use Illuminate\Queue\InteractsWithQueue;
|
||||
use Illuminate\Queue\SerializesModels;
|
||||
use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Repat\LaravelJobs\Job;
|
||||
|
||||
use App\Classes\FTN\Message;
|
||||
use App\Models\Address;
|
||||
|
||||
class MailSend implements ShouldQueue
|
||||
{
|
||||
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
|
||||
|
||||
private const LOGKEY = 'JCM';
|
||||
|
||||
private ?bool $crash;
|
||||
|
||||
/**
|
||||
* @param bool $crash Send crash mail only
|
||||
*/
|
||||
public function __construct(bool $crash=NULL)
|
||||
{
|
||||
$this->crash = $crash;
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the job.
|
||||
*/
|
||||
public function handle(): void
|
||||
{
|
||||
// Netmail waiting by node (only netmail is routed)
|
||||
$netmails = Address::select(['addresses.id','addresses.zone_id','region_id','host_id','node_id','point_id','role','addresses.system_id',DB::raw('count(netmails.id) AS nm')])
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->join('netmails',['netmails.tftn_id'=>'addresses.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
->where('addresses.active',TRUE)
|
||||
->where('zones.active',TRUE)
|
||||
->where('domains.active',TRUE)
|
||||
->where(function($query) {
|
||||
return $query->whereNull('autohold')
|
||||
->orWhere('autohold',FALSE);
|
||||
})
|
||||
->where(function($query) {
|
||||
return $query->whereRaw(sprintf('(flags & %d) > 0',Message::FLAG_INTRANSIT))
|
||||
->orWhereRaw(sprintf('(flags & %d) > 0',Message::FLAG_LOCAL));
|
||||
})
|
||||
->whereRaw(sprintf('(flags & %d) = 0',Message::FLAG_SENT))
|
||||
->when(! is_null($this->crash),function($query) {
|
||||
return $query->when(
|
||||
$this->crash,
|
||||
function($query) {
|
||||
return $query->where('pollmode',$this->crash);
|
||||
},
|
||||
function($query) {
|
||||
return $query->whereNotNull('pollmode');
|
||||
}
|
||||
);
|
||||
})
|
||||
->groupBy('addresses.id')
|
||||
->havingRaw('count(*) > 0')
|
||||
->get();
|
||||
|
||||
// Echomail waiting by node
|
||||
$echomails = Address::select(['addresses.id','addresses.zone_id','region_id','host_id','node_id','point_id','role','addresses.system_id',DB::raw('count(*) AS em')])
|
||||
->distinct()
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->join('echomail_seenby',['echomail_seenby.address_id'=>'addresses.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
->where('addresses.active',TRUE)
|
||||
->where('zones.active',TRUE)
|
||||
->where('domains.active',TRUE)
|
||||
->where(function($query) {
|
||||
return $query->whereNull('autohold')
|
||||
->orWhere('autohold',FALSE);
|
||||
})
|
||||
->whereNotNull('export_at')
|
||||
->whereNull('sent_at')
|
||||
->when(! is_null($this->crash),function($query) {
|
||||
return $query->when(
|
||||
$this->crash,
|
||||
function($query) {
|
||||
return $query->where('pollmode',$this->crash);
|
||||
},
|
||||
function($query) {
|
||||
return $query->whereNotNull('pollmode');
|
||||
}
|
||||
);
|
||||
})
|
||||
->groupBy(['addresses.id'])
|
||||
->havingRaw('count(*) > 0')
|
||||
->FTNOrder()
|
||||
->get();
|
||||
|
||||
// Files waiting by node
|
||||
$files = Address::select(['addresses.id','addresses.zone_id','region_id','host_id','node_id','point_id','role','addresses.system_id',DB::raw('count(*) AS fs')])
|
||||
->distinct()
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->join('file_seenby',['file_seenby.address_id'=>'addresses.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
->where('addresses.active',TRUE)
|
||||
->where('zones.active',TRUE)
|
||||
->where('domains.active',TRUE)
|
||||
->where(function($query) {
|
||||
return $query->whereNull('autohold')
|
||||
->orWhere('autohold',FALSE);
|
||||
})
|
||||
->whereNotNull('export_at')
|
||||
->whereNull('sent_at')
|
||||
->when(! is_null($this->crash),function($query) {
|
||||
return $query->when(
|
||||
$this->crash,
|
||||
function($query) {
|
||||
return $query->where('pollmode',$this->crash);
|
||||
},
|
||||
function($query) {
|
||||
return $query->whereNotNull('pollmode');
|
||||
}
|
||||
);
|
||||
})
|
||||
->groupBy(['addresses.id'])
|
||||
->havingRaw('count(*) > 0')
|
||||
->FTNOrder()
|
||||
->get();
|
||||
|
||||
// Merge our netmails
|
||||
foreach ($echomails as $ao) {
|
||||
if (($x=$netmails->search(function($item) use ($ao) { return $item->id === $ao->id; })) !== FALSE) {
|
||||
$netmails->get($x)->em = $ao->em;
|
||||
} else {
|
||||
$netmails->push($ao);
|
||||
}
|
||||
}
|
||||
|
||||
// Merge our files
|
||||
foreach ($files as $ao) {
|
||||
if (($x=$netmails->search(function($item) use ($ao) { return $item->id === $ao->id; })) !== FALSE) {
|
||||
$netmails->get($x)->fs = $ao->fs;
|
||||
} else {
|
||||
$netmails->push($ao);
|
||||
}
|
||||
}
|
||||
|
||||
// Remove direct links
|
||||
$netmails = $netmails->filter(function($item) { return $item->parent(); });
|
||||
|
||||
foreach ($netmails->groupBy(function($item) { return $item->parent()->ftn; }) as $oo) {
|
||||
$ao = $oo->first();
|
||||
|
||||
Log::info(sprintf('%s:- Polling [%s] - we have mail for [%d] links. (%d Netmail,%d Echomail,%d Files)',
|
||||
self::LOGKEY,
|
||||
$ao->ftn,
|
||||
$oo->count(),
|
||||
$oo->sum('nm'),
|
||||
$oo->sum('em'),
|
||||
$oo->sum('fs'),
|
||||
));
|
||||
|
||||
// @todo Only send crash mail - send normal mail with a schedule or hold mail
|
||||
|
||||
if (Job::where('queue',$this->queue)->get()->pluck('command.address.id')->search($ao->id) !== FALSE) {
|
||||
Log::alert(sprintf('%s:= Not scheduling poll to [%s], there is already one in the queue',self::LOGKEY,$ao->ftn));
|
||||
continue;
|
||||
}
|
||||
|
||||
AddressPoll::dispatch($ao);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user