Untitled
unknown
plain_text
5 years ago
3.2 kB
7
Indexable
class BulkProcessBasement(Task):
track_started = True
expires = (60 * 30) # seconds
soft_time_limit = TIME_LIMIT_BULK_PROCESS
time_limit = TIME_LIMIT_BULK_PROCESS + 5
def run(self, *args, **kwargs):
pass
def on_success(self, retval, task_id, args, kwargs):
# celery event
client_id = kwargs.get('client_id')
client_settings = ClientSettings.objects.get(client_id=client_id)
if self.__should_sent_notification(task_id, client_settings.time_bulk_processing_notification) is True:
self.__send_notification(retval, client_settings.time_bulk_processing_notification, kwargs)
def on_failure(self, exc, task_id, args, kwargs, einfo):
# celery event
client_id = kwargs.get('client_id')
client_settings = ClientSettings.objects.get(client_id=client_id)
import_temp_id = kwargs.get('import_temp_id')
data_import_lib = DataImportTemporary.objects.get(id=import_temp_id)
data_import_lib.log = f'{einfo}'
data_import_lib.progress = 100
data_import_lib.status = FAILURE
data_import_lib.save()
if self.__should_sent_notification(task_id, client_settings.time_bulk_processing_notification) is True:
# TODO: send notification for exception failure
pass
# def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
# pass
@classmethod
def __should_sent_notification(cls, task_id, time_bulk_processing_notification) -> bool:
task_ins = TaskResult.objects.get_task(task_id)
time_processed_delta = task_ins.date_done - task_ins.date_created
time_processed_delta = time_processed_delta.seconds / 60 # to minutes
if time_processed_delta >= time_bulk_processing_notification:
return True
return False
@classmethod
def __send_notification(cls, retval, time_bulk_processing_notification, kwargs):
try:
import_temp_id = kwargs.get('import_temp_id')
data_import_lib = DataImportTemporary.objects.get(id=import_temp_id)
description = {"uuid": import_temp_id,
"action": str(retval),
"process_started": data_import_lib.process_started,
"process_completed": data_import_lib.process_completed,
"total_items": data_import_lib.info_import_file['summary']['total'],
"total_items_success": data_import_lib.info_import_file['summary']['success'],
"minutes": time_bulk_processing_notification}
items = json.loads(data_import_lib.json_data_last_cache)
items = [{'id': item['id'], **item.get("_meta")} for item in items]
user_information = data_import_lib.meta.get('user_info')
username = user_information.get('last_name', 'there')
email = user_information.get('email')
EmailServices().send_bulk_processing('PF Development', username, description, items, [email])
except Exception as error:
logger.error(f'[__send_notification][BulkProgress] {error}')Editor is loading...