Untitled

 avatar
unknown
python
2 years ago
1.8 kB
6
Indexable
def sync(queue: queues.Queue, lock: synchronize.Lock) -> None:
    log.setup_subprocess_logging(queue)

    try:
        LOG.info(f"source={SOURCE_NAME}: sync start")

        kafka_consumer = kafka.KafkaConsumer(
            group_id=CONF.source_frontiir_cms.group_id,
            bootstrap_servers=CONF.source_frontiir_cms.bootstrap_servers,
            auto_offset_reset=CONF.source_frontiir_cms.auto_offset_reset,
            enable_auto_commit=CONF.source_frontiir_cms.enable_auto_commit,
            security_protocol=CONF.source_frontiir_cms.security_protocol,
            sasl_mechanism=CONF.source_frontiir_cms.sasl_mechanism,
            sasl_plain_username=CONF.source_frontiir_cms.sasl_plain_username,
            sasl_plain_password=CONF.source_frontiir_cms.sasl_plain_password,
        )

        kafka_consumer.subscribe(CONF.source_frontiir_cms.topics)

        source_table = SourceTable()

        main_table = core.MainTable()

        max_retry = CONF.source_frontiir_cms.sync_retry
        while True:
            for retry in range(max_retry + 1):
                try:
                    _sync(lock, kafka_consumer, source_table, main_table)
                    break
                except Exception as e:  # pylint: disable=broad-except
                    if retry < max_retry:
                        LOG.warning(
                            f"source={SOURCE_NAME}: sync failed, "
                            f"exception={type(e)}, retry(+{retry + 1})"
                        )
                    else:
                        raise e
    except KeyboardInterrupt:
        do something
    except Exception as e:
        LOG.error(f"source={SOURCE_NAME}: sync failed", exc_info=True)
        deal with e
    finally:
        LOG.info(f"source={SOURCE_NAME}: sync end")
        do cleanup()
Editor is loading...