Untitled

 avatar
unknown
plain_text
4 years ago
3.2 kB
4
Indexable
class BulkProcessBasement(Task):
    track_started = True
    expires = (60 * 30)  # seconds

    soft_time_limit = TIME_LIMIT_BULK_PROCESS
    time_limit = TIME_LIMIT_BULK_PROCESS + 5

    def run(self, *args, **kwargs):
        pass

    def on_success(self, retval, task_id, args, kwargs):
        # celery event
        client_id = kwargs.get('client_id')
        client_settings = ClientSettings.objects.get(client_id=client_id)
        if self.__should_sent_notification(task_id, client_settings.time_bulk_processing_notification) is True:
            self.__send_notification(retval, client_settings.time_bulk_processing_notification, kwargs)

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        # celery event
        client_id = kwargs.get('client_id')
        client_settings = ClientSettings.objects.get(client_id=client_id)
        import_temp_id = kwargs.get('import_temp_id')
        data_import_lib = DataImportTemporary.objects.get(id=import_temp_id)
        data_import_lib.log = f'{einfo}'
        data_import_lib.progress = 100
        data_import_lib.status = FAILURE
        data_import_lib.save()
        if self.__should_sent_notification(task_id, client_settings.time_bulk_processing_notification) is True:
            # TODO: send notification for exception failure
            pass

    # def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
    #     pass

    @classmethod
    def __should_sent_notification(cls, task_id, time_bulk_processing_notification) -> bool:
        task_ins = TaskResult.objects.get_task(task_id)
        time_processed_delta = task_ins.date_done - task_ins.date_created
        time_processed_delta = time_processed_delta.seconds / 60  # to minutes
        if time_processed_delta >= time_bulk_processing_notification:
            return True
        return False

    @classmethod
    def __send_notification(cls, retval, time_bulk_processing_notification, kwargs):
        try:
            import_temp_id = kwargs.get('import_temp_id')
            data_import_lib = DataImportTemporary.objects.get(id=import_temp_id)
            description = {"uuid": import_temp_id,
                           "action": str(retval),
                           "process_started": data_import_lib.process_started,
                           "process_completed": data_import_lib.process_completed,
                           "total_items": data_import_lib.info_import_file['summary']['total'],
                           "total_items_success": data_import_lib.info_import_file['summary']['success'],
                           "minutes": time_bulk_processing_notification}
            items = json.loads(data_import_lib.json_data_last_cache)
            items = [{'id': item['id'], **item.get("_meta")} for item in items]
            user_information = data_import_lib.meta.get('user_info')

            username = user_information.get('last_name', 'there')
            email = user_information.get('email')
            EmailServices().send_bulk_processing('PF Development', username, description, items, [email])
        except Exception as error:
            logger.error(f'[__send_notification][BulkProgress] {error}')
Editor is loading...