Untitled

 avatar
unknown
plain_text
a year ago
6.5 kB
6
Indexable
import os

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, to_json, col, lit, struct
from pyspark.sql.types import StructType, StructField, StringType, LongType

# метод для записи данных в 2 target: в PostgreSQL для фидбэков и в Kafka для триггеров
def foreach_batch_function(df, epoch_id):
    # сохраняем df в памяти, чтобы не создавать df заново перед отправкой в Kafka
    df.persist()
    # записываем df в PostgreSQL с полем feedback
    df.write \
        .mode("append") \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de") \
        .option('driver', 'org.postgresql.Driver') \
        .option("dbtable", "test") \
        .option("user", "master") \
        .option("password", "de-master-password") \
        .save()
    # создаём df для отправки в Kafka. Сериализация в json.
    kafka_df = df.select(to_json( \
            struct("restaraunt_id", \
                   "adv_campaign_id", \
                   "adv_campaign_content", \
                   "adv_campaign_owner", \
                   "adv_campaign_owner_contact", \
                   "adv_campaign_datetime_start", \
                   "adv_campaign_datetime_end", \
                   "client_id", \
                   "datetime_created", \
                   "trigger_datetime_created")) \
            .alias("value"))
    # отправляем сообщения в результирующий топик Kafka без поля feedback
    kafka_df.write \
        .format("kafka") \
        .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
        .option('kafka.security.protocol', 'SASL_SSL') \
        .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";') \
        .option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
        .option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts') \
        .option('kafka.ssl.truststore.password', 'changeit') \
        .option('topic', 'trigger_result') \
        .save()
    # очищаем память от df
    df.unpersist()

# необходимые библиотеки для интеграции Spark с Kafka и PostgreSQL
spark_jars_packages = ",".join(
        [
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
            "org.postgresql:postgresql:42.4.0",
        ]
    )

# создаём spark сессию с необходимыми библиотеками в spark_jars_packages для интеграции с Kafka и PostgreSQL
spark = SparkSession.builder \
    .appName("RestarauntSubscribeStreamingService") \
    .config("spark.sql.session.timeZone", "UTC") \
    .config("spark.jars.packages", spark_jars_packages) \
    .getOrCreate()

kafka_pass = 'kafka-admin'
kafka_user = 'de-kafka-admin-2022'

# читаем из топика Kafka сообщения с акциями от ресторанов 
restaraunt_read_stream_df = spark.readStream \
    .format('kafka') \
    .option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091') \
    .option('kafka.security.protocol', 'SASL_SSL') \
    .option('kafka.sasl.jaas.config', 'org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka-admin" password="de-kafka-admin-2022";') \
    .option('kafka.sasl.mechanism', 'SCRAM-SHA-512') \
    .option('kafka.ssl.truststore.location', '/usr/lib/jvm/java-1.17.0-openjdk-amd64/lib/security/cacerts') \
    .option('kafka.ssl.truststore.password', 'changeit') \
    .option('subscribe', 'base') \
    .load()

# определяем схему входного сообщения для json
incomming_message_schema = StructType([StructField("restaraunt_id", StringType(), True), \
                     StructField("adv_campaign_id" , StringType(), True), \
                     StructField("adv_campaign_content" , StringType(), True), \
                     StructField("adv_campaign_owner" , StringType(), True), \
                     StructField("adv_campaign_owner_contact" , StringType(), True), \
                     StructField("adv_campaign_datetime_start" , LongType(), True), \
                     StructField("adv_campaign_datetime_end" , LongType(), True), \
                     StructField("datetime_created" , LongType(), True), \
                     ])

# определяем текущее время в UTC в миллисекундах
current_timestamp_utc = int(round(datetime.utcnow().timestamp()))

# десериализуем из value сообщения json и фильтруем по времени старта и окончания акции
filtered_read_stream_df = restaraunt_read_stream_df \
    .select(from_json(col("value").cast("string"), incomming_message_schema).alias("parsed_key_value")) \
    .select(col("parsed_key_value.*")) \
    .where((col("adv_campaign_datetime_start") < current_timestamp_utc) & (col("adv_campaign_datetime_end") > current_timestamp_utc))

# вычитываем всех пользователей с подпиской на рестораны
subscribers_restaraunts_df = spark.read \
                    .format('jdbc') \
                    .option('url', 'jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de') \
                    .option('driver', 'org.postgresql.Driver') \
                    .option('dbtable', 'subscribers_restaraunts') \
                    .option('user', 'master') \
                    .option('password', 'de-master-password') \
                    .load()

# джойним данные из сообщения Kafka с пользователями подписки по restaraunt_id (uuid). Добавляем время создания события.
result_df = filtered_read_stream_df.alias('stream') \
    .join(subscribers_restaraunts_df.alias('static'), \
        col("stream.restaraunt_id") == col("static.restaraunt_id")) \
    .select(col("stream.*"), col("client_id")) \
    .withColumn("trigger_datetime_created", lit(int(round(datetime.utcnow().timestamp()))))

# запускаем стриминг
result_df.writeStream \
    .foreachBatch(foreach_batch_function) \
    .start() \
    .awaitTermination()

Editor is loading...
Leave a Comment