Rework crash polling, using optimised scope queries
This commit is contained in:
@@ -12,7 +12,6 @@ use Illuminate\Support\Facades\DB;
|
||||
use Illuminate\Support\Facades\Log;
|
||||
use Repat\LaravelJobs\Job;
|
||||
|
||||
use App\Classes\FTN\Message;
|
||||
use App\Models\Address;
|
||||
|
||||
class MailSend implements ShouldQueue
|
||||
@@ -47,24 +46,33 @@ class MailSend implements ShouldQueue
|
||||
*/
|
||||
public function handle(): void
|
||||
{
|
||||
// Netmail waiting by node (only netmail is routed)
|
||||
$netmails = Address::select(['addresses.id','addresses.zone_id','region_id','host_id','node_id','point_id','role','addresses.system_id',DB::raw('count(netmails.id) AS nm')])
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->join('netmails',['netmails.tftn_id'=>'addresses.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
$u = Address::select([
|
||||
'a.id',
|
||||
'addresses.system_id',
|
||||
'addresses.zone_id',
|
||||
'addresses.region_id',
|
||||
'addresses.host_id',
|
||||
'addresses.node_id',
|
||||
'addresses.point_id',
|
||||
'addresses.hub_id',
|
||||
'addresses.role',
|
||||
DB::raw('sum(a.uncollected_echomail) as uncollected_echomail'),
|
||||
DB::raw('sum(a.uncollected_netmail) as uncollected_netmail'),
|
||||
DB::raw('sum(a.uncollected_files) as uncollected_files')
|
||||
])
|
||||
->from(Address::UncollectedEchomail()->union(Address::UncollectedNetmail())->union(Address::UncollectedFiles()),'a')
|
||||
->where('systems.active',true)
|
||||
->where('addresses.active',TRUE)
|
||||
->where('zones.active',TRUE)
|
||||
->where('domains.active',TRUE)
|
||||
->join('addresses',['addresses.id'=>'a.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->where(function($query) {
|
||||
return $query->whereNull('autohold')
|
||||
->orWhere('autohold',FALSE);
|
||||
})
|
||||
->where(function($query) {
|
||||
return $query->whereRaw(sprintf('(flags & %d) > 0',Message::FLAG_INTRANSIT))
|
||||
->orWhereRaw(sprintf('(flags & %d) > 0',Message::FLAG_LOCAL));
|
||||
})
|
||||
->whereRaw(sprintf('(flags & %d) = 0',Message::FLAG_SENT))
|
||||
->when(! is_null($this->crash),function($query) {
|
||||
return $query->when(
|
||||
$this->crash,
|
||||
@@ -76,115 +84,40 @@ class MailSend implements ShouldQueue
|
||||
}
|
||||
);
|
||||
})
|
||||
->groupBy('addresses.id')
|
||||
->havingRaw('count(*) > 0')
|
||||
->groupBy('addresses.system_id','a.id','addresses.zone_id','addresses.region_id','addresses.host_id','addresses.node_id','addresses.point_id','addresses.hub_id','addresses.role')
|
||||
->with(['system','zone.domain'])
|
||||
->get();
|
||||
|
||||
// Echomail waiting by node
|
||||
$echomails = Address::select(['addresses.id','addresses.zone_id','region_id','host_id','node_id','point_id','role','addresses.system_id',DB::raw('count(*) AS em')])
|
||||
->distinct()
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->join('echomail_seenby',['echomail_seenby.address_id'=>'addresses.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
->where('addresses.active',TRUE)
|
||||
->where('zones.active',TRUE)
|
||||
->where('domains.active',TRUE)
|
||||
->where(function($query) {
|
||||
return $query->whereNull('autohold')
|
||||
->orWhere('autohold',FALSE);
|
||||
})
|
||||
->whereNotNull('export_at')
|
||||
->whereNull('sent_at')
|
||||
->when(! is_null($this->crash),function($query) {
|
||||
return $query->when(
|
||||
$this->crash,
|
||||
function($query) {
|
||||
return $query->where('pollmode',$this->crash);
|
||||
},
|
||||
function($query) {
|
||||
return $query->whereNotNull('pollmode');
|
||||
}
|
||||
);
|
||||
})
|
||||
->groupBy(['addresses.id'])
|
||||
->havingRaw('count(*) > 0')
|
||||
->FTNOrder()
|
||||
->get();
|
||||
// Return the system we poll
|
||||
$u = $u->transform(function($item) {
|
||||
if ($x=$item->parent()) {
|
||||
$x->uncollected_echomail = $item->uncollected_echomail;
|
||||
$x->uncollected_netmail = $item->uncollected_netmail;
|
||||
$x->uncollected_files = $item->uncollected_files;
|
||||
|
||||
// Files waiting by node
|
||||
$files = Address::select(['addresses.id','addresses.zone_id','region_id','host_id','node_id','point_id','role','addresses.system_id',DB::raw('count(*) AS fs')])
|
||||
->distinct()
|
||||
->join('zones',['zones.id'=>'addresses.zone_id'])
|
||||
->join('domains',['domains.id'=>'zones.domain_id'])
|
||||
->join('file_seenby',['file_seenby.address_id'=>'addresses.id'])
|
||||
->join('systems',['systems.id'=>'addresses.system_id'])
|
||||
->where('addresses.active',TRUE)
|
||||
->where('zones.active',TRUE)
|
||||
->where('domains.active',TRUE)
|
||||
->where(function($query) {
|
||||
return $query->whereNull('autohold')
|
||||
->orWhere('autohold',FALSE);
|
||||
})
|
||||
->whereNotNull('export_at')
|
||||
->whereNull('sent_at')
|
||||
->when(! is_null($this->crash),function($query) {
|
||||
return $query->when(
|
||||
$this->crash,
|
||||
function($query) {
|
||||
return $query->where('pollmode',$this->crash);
|
||||
},
|
||||
function($query) {
|
||||
return $query->whereNotNull('pollmode');
|
||||
}
|
||||
);
|
||||
})
|
||||
->groupBy(['addresses.id'])
|
||||
->havingRaw('count(*) > 0')
|
||||
->FTNOrder()
|
||||
->get();
|
||||
return $x;
|
||||
|
||||
// Merge our netmails
|
||||
foreach ($echomails as $ao) {
|
||||
if (($x=$netmails->search(function($item) use ($ao) { return $item->id === $ao->id; })) !== FALSE) {
|
||||
$netmails->get($x)->em = $ao->em;
|
||||
} else {
|
||||
$netmails->push($ao);
|
||||
return $item;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
foreach ($u->groupBy('ftn') as $oo) {
|
||||
if (Job::where('queue','poll')->get()->pluck('command.address.id')->search(($x=$oo->first())->id) === FALSE) {
|
||||
Log::info(sprintf('%s:- Polling [%s] - we have mail for [%d] links. (%d Netmail,%d Echomail,%d Files)',
|
||||
self::LOGKEY,
|
||||
$x->ftn,
|
||||
$oo->count(),
|
||||
$oo->sum('uncollected_netmail'),
|
||||
$oo->sum('uncollected_echomail'),
|
||||
$oo->sum('uncollected_files'),
|
||||
));
|
||||
|
||||
AddressPoll::dispatch($x);
|
||||
|
||||
// Merge our files
|
||||
foreach ($files as $ao) {
|
||||
if (($x=$netmails->search(function($item) use ($ao) { return $item->id === $ao->id; })) !== FALSE) {
|
||||
$netmails->get($x)->fs = $ao->fs;
|
||||
} else {
|
||||
$netmails->push($ao);
|
||||
Log::notice(sprintf('%s:= Not scheduling poll to [%s], there is already one in the queue',self::LOGKEY,$x->ftn));
|
||||
}
|
||||
}
|
||||
|
||||
// Remove direct links
|
||||
$netmails = $netmails->filter(function($item) { return $item->parent(); });
|
||||
|
||||
foreach ($netmails->groupBy(function($item) { return $item->parent()->ftn; }) as $oo) {
|
||||
$ao = $oo->first();
|
||||
|
||||
Log::info(sprintf('%s:- Polling [%s] - we have mail for [%d] links. (%d Netmail,%d Echomail,%d Files)',
|
||||
self::LOGKEY,
|
||||
$ao->ftn,
|
||||
$oo->count(),
|
||||
$oo->sum('nm'),
|
||||
$oo->sum('em'),
|
||||
$oo->sum('fs'),
|
||||
));
|
||||
|
||||
// @todo Only send crash mail - send normal mail with a schedule or hold mail
|
||||
|
||||
if (Job::where('queue',$this->queue)->get()->pluck('command.address.id')->search($ao->id) !== FALSE) {
|
||||
Log::alert(sprintf('%s:= Not scheduling poll to [%s], there is already one in the queue',self::LOGKEY,$ao->ftn));
|
||||
continue;
|
||||
}
|
||||
|
||||
AddressPoll::dispatch($ao);
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user