Untitled
unknown
python
2 years ago
1.8 kB
6
Indexable
def sync(queue: queues.Queue, lock: synchronize.Lock) -> None: log.setup_subprocess_logging(queue) try: LOG.info(f"source={SOURCE_NAME}: sync start") kafka_consumer = kafka.KafkaConsumer( group_id=CONF.source_frontiir_cms.group_id, bootstrap_servers=CONF.source_frontiir_cms.bootstrap_servers, auto_offset_reset=CONF.source_frontiir_cms.auto_offset_reset, enable_auto_commit=CONF.source_frontiir_cms.enable_auto_commit, security_protocol=CONF.source_frontiir_cms.security_protocol, sasl_mechanism=CONF.source_frontiir_cms.sasl_mechanism, sasl_plain_username=CONF.source_frontiir_cms.sasl_plain_username, sasl_plain_password=CONF.source_frontiir_cms.sasl_plain_password, ) kafka_consumer.subscribe(CONF.source_frontiir_cms.topics) source_table = SourceTable() main_table = core.MainTable() max_retry = CONF.source_frontiir_cms.sync_retry while True: for retry in range(max_retry + 1): try: _sync(lock, kafka_consumer, source_table, main_table) break except Exception as e: # pylint: disable=broad-except if retry < max_retry: LOG.warning( f"source={SOURCE_NAME}: sync failed, " f"exception={type(e)}, retry(+{retry + 1})" ) else: raise e except KeyboardInterrupt: do something except Exception as e: LOG.error(f"source={SOURCE_NAME}: sync failed", exc_info=True) deal with e finally: LOG.info(f"source={SOURCE_NAME}: sync end") do cleanup()
Editor is loading...