Untitled
unknown
python
3 years ago
1.8 kB
7
Indexable
def sync(queue: queues.Queue, lock: synchronize.Lock) -> None:
log.setup_subprocess_logging(queue)
try:
LOG.info(f"source={SOURCE_NAME}: sync start")
kafka_consumer = kafka.KafkaConsumer(
group_id=CONF.source_frontiir_cms.group_id,
bootstrap_servers=CONF.source_frontiir_cms.bootstrap_servers,
auto_offset_reset=CONF.source_frontiir_cms.auto_offset_reset,
enable_auto_commit=CONF.source_frontiir_cms.enable_auto_commit,
security_protocol=CONF.source_frontiir_cms.security_protocol,
sasl_mechanism=CONF.source_frontiir_cms.sasl_mechanism,
sasl_plain_username=CONF.source_frontiir_cms.sasl_plain_username,
sasl_plain_password=CONF.source_frontiir_cms.sasl_plain_password,
)
kafka_consumer.subscribe(CONF.source_frontiir_cms.topics)
source_table = SourceTable()
main_table = core.MainTable()
max_retry = CONF.source_frontiir_cms.sync_retry
while True:
for retry in range(max_retry + 1):
try:
_sync(lock, kafka_consumer, source_table, main_table)
break
except Exception as e: # pylint: disable=broad-except
if retry < max_retry:
LOG.warning(
f"source={SOURCE_NAME}: sync failed, "
f"exception={type(e)}, retry(+{retry + 1})"
)
else:
raise e
except KeyboardInterrupt:
do something
except Exception as e:
LOG.error(f"source={SOURCE_NAME}: sync failed", exc_info=True)
deal with e
finally:
LOG.info(f"source={SOURCE_NAME}: sync end")
do cleanup()
Editor is loading...