Untitled
unknown
plain_text
2 years ago
6.5 kB
21
Indexable
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, col, lit, struct
from pyspark.sql.types import StructType, StructField, StringType, LongType
# метод для записи данных в 2 target: в PostgreSQL для фидбэков и в Kafka для триггеров
def foreach_batch_function(df, epoch_id):
# сохраняем df в памяти, чтобы не создавать df заново перед отправкой в Kafka
df.persist()
# записываем df в PostgreSQL с полем feedback
df.write \
.mode("append") \
.format("jdbc") \
.option("url", "jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de") \
.option('driver', 'org.postgresql.Driver') \
.option("dbtable", "test") \
.option("user", "master") \
.option("password", "de-master-password") \
.save()
# создаём df для отправки в Kafka. Сериализация в json.
kafka_df = df.select(to_json( \
struct("restaraunt_id", \
"adv_campaign_id", \
"adv_campaign_content", \
"adv_campaign_owner", \
"adv_campaign_owner_contact", \
"adv_campaign_datetime_start", \
"adv_campaign_datetime_end", \
"client_id", \
"datetime_created", \
"trigger_datetime_created")) \
.alias("value"))
# отправляем сообщения в результирующий топик Kafka без поля feedback
kafka_df.write \
.format("kafka") \
.option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";') \
.option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
.option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts') \
.option('kafka.ssl.truststore.password', 'changeit') \
.option('topic', 'trigger_result') \
.save()
# очищаем память от df
df.unpersist()
# необходимые библиотеки для интеграции Spark с Kafka и PostgreSQL
spark_jars_packages = ",".join(
[
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
"org.postgresql:postgresql:42.4.0",
]
)
# создаём spark сессию с необходимыми библиотеками в spark_jars_packages для интеграции с Kafka и PostgreSQL
spark = SparkSession.builder \
.appName("RestarauntSubscribeStreamingService") \
.config("spark.sql.session.timeZone", "UTC") \
.config("spark.jars.packages", spark_jars_packages) \
.getOrCreate()
kafka_pass = 'kafka-admin'
kafka_user = 'de-kafka-admin-2022'
# читаем из топика Kafka сообщения с акциями от ресторанов
restaraunt_read_stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";') \
.option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
.option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts') \
.option('kafka.ssl.truststore.password', 'changeit') \
.option('subscribe', 'base') \
.load()
# определяем схему входного сообщения для json
incomming_message_schema = StructType([StructField("restaraunt_id", StringType(), True), \
StructField("adv_campaign_id" , StringType(), True), \
StructField("adv_campaign_content" , StringType(), True), \
StructField("adv_campaign_owner" , StringType(), True), \
StructField("adv_campaign_owner_contact" , StringType(), True), \
StructField("adv_campaign_datetime_start" , LongType(), True), \
StructField("adv_campaign_datetime_end" , LongType(), True), \
StructField("datetime_created" , LongType(), True), \
])
# определяем текущее время в UTC в миллисекундах
current_timestamp_utc = int(round(datetime.utcnow().timestamp()))
# десериализуем из value сообщения json и фильтруем по времени старта и окончания акции
filtered_read_stream_df = restaraunt_read_stream_df \
.select(from_json(col("value").cast("string"), incomming_message_schema).alias("parsed_key_value")) \
.select(col("parsed_key_value.*")) \
.where((col("adv_campaign_datetime_start") < current_timestamp_utc) & (col("adv_campaign_datetime_end") > current_timestamp_utc))
# вычитываем всех пользователей с подпиской на рестораны
subscribers_restaraunts_df = spark.read \
.format('jdbc') \
.option('url', 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de') \
.option('driver', 'org.postgresql.Driver') \
.option('dbtable', 'subscribers_restaraunts') \
.option('user', 'master') \
.option('password', 'de-master-password') \
.load()
# джойним данные из сообщения Kafka с пользователями подписки по restaraunt_id (uuid). Добавляем время создания события.
result_df = filtered_read_stream_df.alias('stream') \
.join(subscribers_restaraunts_df.alias('static'), \
col("stream.restaraunt_id") == col("static.restaraunt_id")) \
.select(col("stream.*"), col("client_id")) \
.withColumn("trigger_datetime_created", lit(int(round(datetime.utcnow().timestamp()))))
# запускаем стриминг
result_df.writeStream \
.foreachBatch(foreach_batch_function) \
.start() \
.awaitTermination()
Editor is loading...
Leave a Comment