Untitled
import os import numpy as np import spacy from collections import Counter, defaultdict from tqdm import tqdm from scipy.sparse import lil_matrix, csr_matrix from joblib import Parallel, delayed # Configurations DATASET_PATH = "the_pile.txt" # Modify this to your dataset path VOCAB_SIZE = 10000 # Top N words WINDOW_SIZE = 10 # Context window NUM_CORES = os.cpu_count() or 4 # Use all available CPU cores PMI_THRESHOLD = 0 # Only store PMI > 0 (optional for reducing size) # Load NLP tokenizer nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) # Step 1: Tokenize & Count Word Frequencies in Parallel def process_chunk(lines): """Tokenizes a chunk of text and counts word occurrences.""" word_counts = Counter() for line in lines: tokens = [token.text.lower() for token in nlp(line) if token.is_alpha] word_counts.update(tokens) return word_counts def tokenize_and_count(filepath): """Splits dataset into chunks and processes them in parallel.""" with open(filepath, "r", encoding="utf-8") as file: lines = file.readlines() chunk_size = len(lines) // NUM_CORES # Split workload across cores chunks = [lines[i : i + chunk_size] for i in range(0, len(lines), chunk_size)] # Parallel execution results = Parallel(n_jobs=NUM_CORES)(delayed(process_chunk)(chunk) for chunk in tqdm(chunks, desc="Tokenizing & Counting Words")) # Merge word counts total_counts = Counter() for wc in results: total_counts.update(wc) return total_counts # Step 2: Select Top 10K Words def get_top_vocab(word_counts, vocab_size): return {word for word, _ in word_counts.most_common(vocab_size)} # Step 3: Compute Co-occurrences in Parallel def process_cooccurrences(lines, vocab, word_to_id): """Processes a chunk of text to compute co-occurrences.""" local_matrix = lil_matrix((len(vocab), len(vocab)), dtype=np.int32) for line in lines: tokens = [token.text.lower() for token in nlp(line) if token.text.lower() in vocab] for i, word in enumerate(tokens): if word not in word_to_id: continue for j in range(max(0, i - WINDOW_SIZE), min(len(tokens), i + WINDOW_SIZE + 1)): if i != j and tokens[j] in word_to_id: local_matrix[word_to_id[word], word_to_id[tokens[j]]] += 1 return local_matrix def compute_cooccurrences(filepath, vocab): """Splits dataset and computes co-occurrences in parallel.""" word_to_id = {word: i for i, word in enumerate(vocab)} with open(filepath, "r", encoding="utf-8") as file: lines = file.readlines() chunk_size = len(lines) // NUM_CORES chunks = [lines[i : i + chunk_size] for i in range(0, len(lines), chunk_size)] # Parallel execution results = Parallel(n_jobs=NUM_CORES)(delayed(process_cooccurrences)(chunk, vocab, word_to_id) for chunk in tqdm(chunks, desc="Computing Co-occurrences")) # Merge co-occurrence matrices cooccurrence_matrix = lil_matrix((len(vocab), len(vocab)), dtype=np.int32) for matrix in results: cooccurrence_matrix += matrix return cooccurrence_matrix, word_to_id # Step 4: Compute PMI def compute_pmi(cooccurrence_matrix, word_counts, word_to_id): total_words = sum(word_counts.values()) total_pairs = cooccurrence_matrix.sum() pmi_matrix = lil_matrix(cooccurrence_matrix.shape, dtype=np.float32) for word1, i in word_to_id.items(): for word2, j in word_to_id.items(): if i >= j: # Avoid redundant calculations continue cooccur = cooccurrence_matrix[i, j] if cooccur > 0: p_w1 = word_counts[word1] / total_words p_w2 = word_counts[word2] / total_words p_w1_w2 = cooccur / total_pairs pmi = np.log2(p_w1_w2 / (p_w1 * p_w2)) if pmi > PMI_THRESHOLD: pmi_matrix[i, j] = pmi pmi_matrix[j, i] = pmi # Symmetric return pmi_matrix # Step 5: Save PMI Results def save_pmi(pmi_matrix, word_to_id, output_file="pmi_results.txt"): id_to_word = {i: word for word, i in word_to_id.items()} with open(output_file, "w", encoding="utf-8") as f: for i, j in zip(*pmi_matrix.nonzero()): f.write(f"{id_to_word[i]} {id_to_word[j]} {pmi_matrix[i, j]:.4f}\n") print(f"PMI results saved to {output_file}") # Main Execution if __name__ == "__main__": print("Starting Parallel PMI Computation for The Pile...") # Step 1: Get word frequencies (parallelized) word_counts = tokenize_and_count(DATASET_PATH) # Step 2: Get top vocab top_vocab = get_top_vocab(word_counts, VOCAB_SIZE) print(f"Selected top {VOCAB_SIZE} words.") # Step 3: Compute co-occurrences (parallelized) cooccurrence_matrix, word_to_id = compute_cooccurrences(DATASET_PATH, top_vocab) # Step 4: Compute PMI pmi_matrix = compute_pmi(cooccurrence_matrix, word_counts, word_to_id) # Step 5: Save PMI results save_pmi(pmi_matrix) print("PMI computation complete!")
Leave a Comment