Untitled
unknown
plain_text
2 years ago
7.0 kB
5
Indexable
from datetime import datetime
from time import sleep
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType, IntegerType
TOPIC_NAME_91 = 'student.topic.cohort<номер когорты>.<username>.out' # Это топик, в который Ваше приложение должно отправлять сообщения. Укажите здесь название Вашего топика student.topic.cohort<номер когорты>.<username>.out
TOPIC_NAME_IN = 'student.topic.cohort<номер когорты>.<username>' # Это топик, из которого Ваше приложение должно читать сообщения. Укажите здесь название Вашего топика student.topic.cohort<номер когорты>.<username>
#При первом запуске ваш топик student.topic.cohort<номер когорты>.<username>.out может не существовать в Kafka и вы можете увидеть такие сообщения:
# ERROR: Topic student.topic.cohort<номер когорты>.<username>.out error: Broker: Unknown topic or partition
# Это сообщение говорит о том, что тест начал проверять работу Вашего приложение, но так как Ваше приложение ещё не отправило туда сообщения, то топик ещё не создан. Нужно подождать несколько минут.
def spark_init(test_name) -> SparkSession:
spark_jars_packages = ",".join(
[
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
"org.postgresql:postgresql:42.4.0",
]
)
spark = (
SparkSession.builder.appName(test_name)
.config("spark.sql.session.timeZone", "UTC")
.config("spark.jars.packages", spark_jars_packages)
.getOrCreate()
)
return spark
postgresql_settings = {
'user': 'student',
'password': 'de-student'
}
def read_marketing(spark: SparkSession) -> DataFrame:
marketing_df = (spark.read
.format("jdbc")
.option("url", "jdbc:postgresql://rc1a-fswjkpli01zafgjm.mdb.yandexcloud.net:6432/de")
.option("dbtable", "marketing_companies")
.option("driver", "org.postgresql.Driver")
.options(**postgresql_settings)
.load())
return marketing_df
kafka_security_options = {
'kafka.security.protocol': 'SASL_SSL',
'kafka.sasl.mechanism': 'SCRAM-SHA-512',
'kafka.sasl.jaas.config': 'org.apache.kafka.common.security.scram.ScramLoginModule required username=\"de-student\" password=\"ltcneltyn\";'
}
def read_client_stream(spark: SparkSession) -> DataFrame:
schema = StructType([
StructField("client_id", StringType()),
StructField("timestamp", DoubleType()),
StructField("lat", DoubleType()),
StructField("lon", DoubleType()),
])
df = (spark.readStream.format('kafka')
.option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
.option("subscribe", TOPIC_NAME_IN)
.options(**kafka_security_options)
.option("maxOffsetsPerTrigger", 1000)
.load()
.withColumn('value', f.col('value').cast(StringType()))
.withColumn('event', f.from_json(f.col('value'), schema))
.selectExpr('event.*')
.withColumn('timestamp',
f.from_unixtime(f.col('timestamp'), "yyyy-MM-dd' 'HH:mm:ss.SSS").cast(TimestampType()))
.dropDuplicates(['client_id', 'timestamp'])
.withWatermark('timestamp', '10 minutes')
)
return df
def join(user_df, marketing_df) -> DataFrame:
"""
{
"client_id": идентификатор клиента,
"distance" : int расстояние до рестора/точки, в метрах
"adv_campaign_id": идентификатор рекламной акции,
"adv_campaign_description": описание рекламной акции,
"adv_campaign_start_time": время начала акции,
"adv_campaign_end_time": время окончания акции,
"created_at": время создания выходного ивента
}
"""
return (user_df
.crossJoin(marketing_df)
.withColumn("adv_campaign_id", marketing_df.id)
.withColumn("adv_campaign_description", marketing_df.description)
.withColumn("adv_campaign_start_time", marketing_df.start_time)
.withColumn("adv_campaign_end_time", marketing_df.end_time)
.withColumn("client_id", f.substring('client_id', 0, 6))
.withColumn("created_at", f.lit(datetime.now()))
.withColumn("a", (
f.pow(f.sin(f.radians(marketing_df.point_lat - user_df.lat) / 2), 2) +
f.cos(f.radians(user_df.lat)) * f.cos(f.radians(marketing_df.point_lat)) *
f.pow(f.sin(f.radians(marketing_df.point_lon - user_df.lon) / 2), 2)))
.withColumn("distance", (f.atan2(f.sqrt(f.col("a")), f.sqrt(-f.col("a") + 1)) * 12742000))
.withColumn("distance", f.col('distance').cast(IntegerType()))
.where(f.col("distance") <= 1000)
.dropDuplicates(['client_id', 'adv_campaign_id'])
.withWatermark('timestamp', '1 minutes')
.select(f.to_json(f.struct('client_id',
'distance',
'adv_campaign_id',
'adv_campaign_description',
'adv_campaign_start_time',
'adv_campaign_end_time',
'created_at')).alias('value')
)
)
def run_query(df):
return (df
.writeStream
.outputMode("append")
.format("kafka")
.option('kafka.bootstrap.servers', 'rc1b-2erh7b35n4j4v869.mdb.yandexcloud.net:9091')
.options(**kafka_security_options)
.option("topic", TOPIC_NAME_91)
.option("checkpointLocation", "test_query")
.trigger(processingTime="15 seconds")
.start())
if __name__ == "__main__":
spark = spark_init('join stream')
client_stream = read_client_stream(spark)
marketing_df = read_marketing(spark)
output = join(client_stream, marketing_df)
query = run_query(output)
while query.isActive:
print(f"query information: runId={query.runId}, "
f"status is {query.status}, "
f"recent progress={query.recentProgress}")
sleep(30)
query.awaitTermination()
#ключ: JIh4eP7691Editor is loading...