w_l = Window.partitionBy('user_id').orderBy(F.col('event.message_ts').desc())
view_last = events.where('msg_lon is not null') \
.withColumn("rn",F.row_number().over(Window().partitionBy('user_id').orderBy(F.col('event.message_ts').desc()))) \
.filter(F.col('rn') == 1) \
.drop(F.col('rn')) \
.selectExpr('user_id', 'msg_lon as lon', 'msg_lat as lat')
# локальное время сразу к событиям получил и дальше уже с ним выборки делаю
events_with_local_time = df_local_time(events)
events_city = events_with_local_time.select('user_id', 'city', 'local_time')
view_last = view_last.join(events_city, ['user_id'], 'inner')
view_last_channel = events.select(
F.col('event.subscription_channel').alias('channel'),
F.col('event.user').alias('user_id')
).distinct()
new = view_last_channel.join(view_last_channel.withColumnRenamed('user_id', 'user_id2'), ['channel'], 'inner') \
.filter('user_id < user_id2')
user_list = new.join(view_last,['user_id'],'inner') \
.withColumnRenamed('lon','lon_user1') \
.withColumnRenamed('lat','lat_user1') \
.drop('city').drop('local_time')
user_list = user_list\
.join(view_last, view_last['user_id'] == user_list['user_id2'], 'inner').drop(view_last['user_id']) \
.withColumnRenamed('lon','lon_user2') \
.withColumnRenamed('lat','lat_user2') \
.drop(view_last['city'])
#считаешь расстояние
# выбираешь отправителя и получателя, объединяешь с первернутыми и оставляешь уникальные
# вычетаешь пары из выборки