Untitled

 avatar
unknown
python
a year ago
4.0 kB
6
Indexable
# Import necessary libraries
import pickle
import pandas as pd
import numpy as np
from numpy import dot
from numpy.linalg import norm
import time
import datetime
import logging
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import FloatType, StructType, StructField, StringType, ArrayType
from tqdm import tqdm

from pyspark import SparkConf

# Configure Spark
conf = SparkConf().setAppName("CosineSimilarity")
conf = conf.set("spark.driver.memory", "4g")
conf = conf.set("spark.executor.memory", "4g")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

def cos_similarity(embedding1, embedding2):
    """Calculate the cosine similarity between two embeddings."""
    norm1 = norm(embedding1)
    norm2 = norm(embedding2)
    return float(dot(embedding1, embedding2) / (norm1 * norm2))

def process_row_pair(row1, row2, threshold=0.7):
    """Process a pair of rows and return a list if similarity exceeds the threshold."""
    try:
        cos_sim = cos_similarity(row1.embedding, row2.embedding)
        if cos_sim > threshold and row1.client_id != row2.client_id:
            return Row(
                client_id1=row1.client_id, path1=row1.path, sentence1=row1.sentence, embedding1=row1.embedding,
                client_id2=row2.client_id, path2=row2.path, sentence2=row2.sentence, embedding2=row2.embedding,
                cos_sim=cos_sim
            )
    except Exception as e:
        logging.error(f"Error processing rows {row1.client_id} and {row2.client_id}: {e}")
    return None

def calculate_cosine_similarity(df, threshold=0.7):
    """Calculate cosine similarity within the dataframe using Spark."""
    start_time = time.time()

    df = df.head(50000).copy()  # Limit the size for now and make a copy

    # Convert numpy arrays to lists
    df['embedding'] = df['embedding'].apply(lambda x: x.tolist())

    rdd = spark.sparkContext.parallelize(df.to_dict('records'))

    def compute_pairs(rows):
        results = []
        rows = list(rows)
        for i in tqdm(range(len(rows)), desc="Processing rows"):
            row1 = rows[i]
            for j in range(i + 1, len(rows)):
                row2 = rows[j]
                result = process_row_pair(Row(**row1), Row(**row2), threshold)
                if result:
                    results.append(result)
        return results

    result_rdd = rdd.mapPartitions(compute_pairs)

    # Define schema for resulting DataFrame
    schema = StructType([
        StructField('client_id1', StringType(), True),
        StructField('path1', StringType(), True),
        StructField('sentence1', StringType(), True),
        StructField('embedding1', ArrayType(FloatType()), True),
        StructField('client_id2', StringType(), True),
        StructField('path2', StringType(), True),
        StructField('sentence2', StringType(), True),
        StructField('embedding2', ArrayType(FloatType()), True),
        StructField('cos_sim', FloatType(), True)
    ])

    result_df = spark.createDataFrame(result_rdd, schema)

    result_pd_df = result_df.toPandas()

    print(f'Number of similar pairs: {result_pd_df.shape[0]}')
    print(f'Time taken: {time.time() - start_time} seconds')
    return result_pd_df

def main():
    start_time = datetime.datetime.now()
    print(f'Start time: {start_time}')

    # Load the data
    df_fr_train = pd.read_pickle('/content/drive/MyDrive/milaga/train_embed_fr.pickle')
    df_fr_train = df_fr_train.reset_index(drop=True)
    print(f'DataFrame shape: {df_fr_train.shape}')

    # Calculate cosine similarity
    result_df = calculate_cosine_similarity(df_fr_train)
    result_df.to_pickle("/content/drive/MyDrive/milaga/train_embed_fr_cosine_similarity_spark.pickle")

    end_time = datetime.datetime.now()
    print(f'End time: {end_time}')

    # Stop Spark session
    spark.stop()


main()
Editor is loading...
Leave a Comment