Untitled
unknown
plain_text
4 years ago
3.2 kB
4
Indexable
class BulkProcessBasement(Task): track_started = True expires = (60 * 30) # seconds soft_time_limit = TIME_LIMIT_BULK_PROCESS time_limit = TIME_LIMIT_BULK_PROCESS + 5 def run(self, *args, **kwargs): pass def on_success(self, retval, task_id, args, kwargs): # celery event client_id = kwargs.get('client_id') client_settings = ClientSettings.objects.get(client_id=client_id) if self.__should_sent_notification(task_id, client_settings.time_bulk_processing_notification) is True: self.__send_notification(retval, client_settings.time_bulk_processing_notification, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): # celery event client_id = kwargs.get('client_id') client_settings = ClientSettings.objects.get(client_id=client_id) import_temp_id = kwargs.get('import_temp_id') data_import_lib = DataImportTemporary.objects.get(id=import_temp_id) data_import_lib.log = f'{einfo}' data_import_lib.progress = 100 data_import_lib.status = FAILURE data_import_lib.save() if self.__should_sent_notification(task_id, client_settings.time_bulk_processing_notification) is True: # TODO: send notification for exception failure pass # def after_return(self, status, retval, task_id, args, kwargs, einfo=None): # pass @classmethod def __should_sent_notification(cls, task_id, time_bulk_processing_notification) -> bool: task_ins = TaskResult.objects.get_task(task_id) time_processed_delta = task_ins.date_done - task_ins.date_created time_processed_delta = time_processed_delta.seconds / 60 # to minutes if time_processed_delta >= time_bulk_processing_notification: return True return False @classmethod def __send_notification(cls, retval, time_bulk_processing_notification, kwargs): try: import_temp_id = kwargs.get('import_temp_id') data_import_lib = DataImportTemporary.objects.get(id=import_temp_id) description = {"uuid": import_temp_id, "action": str(retval), "process_started": data_import_lib.process_started, "process_completed": data_import_lib.process_completed, "total_items": data_import_lib.info_import_file['summary']['total'], "total_items_success": data_import_lib.info_import_file['summary']['success'], "minutes": time_bulk_processing_notification} items = json.loads(data_import_lib.json_data_last_cache) items = [{'id': item['id'], **item.get("_meta")} for item in items] user_information = data_import_lib.meta.get('user_info') username = user_information.get('last_name', 'there') email = user_information.get('email') EmailServices().send_bulk_processing('PF Development', username, description, items, [email]) except Exception as error: logger.error(f'[__send_notification][BulkProgress] {error}')
Editor is loading...