Untitled
unknown
python
a year ago
4.0 kB
6
Indexable
# Import necessary libraries import pickle import pandas as pd import numpy as np from numpy import dot from numpy.linalg import norm import time import datetime import logging from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import FloatType, StructType, StructField, StringType, ArrayType from tqdm import tqdm from pyspark import SparkConf # Configure Spark conf = SparkConf().setAppName("CosineSimilarity") conf = conf.set("spark.driver.memory", "4g") conf = conf.set("spark.executor.memory", "4g") spark = SparkSession.builder.config(conf=conf).getOrCreate() def cos_similarity(embedding1, embedding2): """Calculate the cosine similarity between two embeddings.""" norm1 = norm(embedding1) norm2 = norm(embedding2) return float(dot(embedding1, embedding2) / (norm1 * norm2)) def process_row_pair(row1, row2, threshold=0.7): """Process a pair of rows and return a list if similarity exceeds the threshold.""" try: cos_sim = cos_similarity(row1.embedding, row2.embedding) if cos_sim > threshold and row1.client_id != row2.client_id: return Row( client_id1=row1.client_id, path1=row1.path, sentence1=row1.sentence, embedding1=row1.embedding, client_id2=row2.client_id, path2=row2.path, sentence2=row2.sentence, embedding2=row2.embedding, cos_sim=cos_sim ) except Exception as e: logging.error(f"Error processing rows {row1.client_id} and {row2.client_id}: {e}") return None def calculate_cosine_similarity(df, threshold=0.7): """Calculate cosine similarity within the dataframe using Spark.""" start_time = time.time() df = df.head(50000).copy() # Limit the size for now and make a copy # Convert numpy arrays to lists df['embedding'] = df['embedding'].apply(lambda x: x.tolist()) rdd = spark.sparkContext.parallelize(df.to_dict('records')) def compute_pairs(rows): results = [] rows = list(rows) for i in tqdm(range(len(rows)), desc="Processing rows"): row1 = rows[i] for j in range(i + 1, len(rows)): row2 = rows[j] result = process_row_pair(Row(**row1), Row(**row2), threshold) if result: results.append(result) return results result_rdd = rdd.mapPartitions(compute_pairs) # Define schema for resulting DataFrame schema = StructType([ StructField('client_id1', StringType(), True), StructField('path1', StringType(), True), StructField('sentence1', StringType(), True), StructField('embedding1', ArrayType(FloatType()), True), StructField('client_id2', StringType(), True), StructField('path2', StringType(), True), StructField('sentence2', StringType(), True), StructField('embedding2', ArrayType(FloatType()), True), StructField('cos_sim', FloatType(), True) ]) result_df = spark.createDataFrame(result_rdd, schema) result_pd_df = result_df.toPandas() print(f'Number of similar pairs: {result_pd_df.shape[0]}') print(f'Time taken: {time.time() - start_time} seconds') return result_pd_df def main(): start_time = datetime.datetime.now() print(f'Start time: {start_time}') # Load the data df_fr_train = pd.read_pickle('/content/drive/MyDrive/milaga/train_embed_fr.pickle') df_fr_train = df_fr_train.reset_index(drop=True) print(f'DataFrame shape: {df_fr_train.shape}') # Calculate cosine similarity result_df = calculate_cosine_similarity(df_fr_train) result_df.to_pickle("/content/drive/MyDrive/milaga/train_embed_fr_cosine_similarity_spark.pickle") end_time = datetime.datetime.now() print(f'End time: {end_time}') # Stop Spark session spark.stop() main()
Editor is loading...
Leave a Comment