Untitled
unknown
plain_text
9 months ago
5.2 kB
9
Indexable
import os
import numpy as np
import spacy
from collections import Counter, defaultdict
from tqdm import tqdm
from scipy.sparse import lil_matrix, csr_matrix
from joblib import Parallel, delayed
# Configurations
DATASET_PATH = "the_pile.txt" # Modify this to your dataset path
VOCAB_SIZE = 10000 # Top N words
WINDOW_SIZE = 10 # Context window
NUM_CORES = os.cpu_count() or 4 # Use all available CPU cores
PMI_THRESHOLD = 0 # Only store PMI > 0 (optional for reducing size)
# Load NLP tokenizer
nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"])
# Step 1: Tokenize & Count Word Frequencies in Parallel
def process_chunk(lines):
"""Tokenizes a chunk of text and counts word occurrences."""
word_counts = Counter()
for line in lines:
tokens = [token.text.lower() for token in nlp(line) if token.is_alpha]
word_counts.update(tokens)
return word_counts
def tokenize_and_count(filepath):
"""Splits dataset into chunks and processes them in parallel."""
with open(filepath, "r", encoding="utf-8") as file:
lines = file.readlines()
chunk_size = len(lines) // NUM_CORES # Split workload across cores
chunks = [lines[i : i + chunk_size] for i in range(0, len(lines), chunk_size)]
# Parallel execution
results = Parallel(n_jobs=NUM_CORES)(delayed(process_chunk)(chunk) for chunk in tqdm(chunks, desc="Tokenizing & Counting Words"))
# Merge word counts
total_counts = Counter()
for wc in results:
total_counts.update(wc)
return total_counts
# Step 2: Select Top 10K Words
def get_top_vocab(word_counts, vocab_size):
return {word for word, _ in word_counts.most_common(vocab_size)}
# Step 3: Compute Co-occurrences in Parallel
def process_cooccurrences(lines, vocab, word_to_id):
"""Processes a chunk of text to compute co-occurrences."""
local_matrix = lil_matrix((len(vocab), len(vocab)), dtype=np.int32)
for line in lines:
tokens = [token.text.lower() for token in nlp(line) if token.text.lower() in vocab]
for i, word in enumerate(tokens):
if word not in word_to_id:
continue
for j in range(max(0, i - WINDOW_SIZE), min(len(tokens), i + WINDOW_SIZE + 1)):
if i != j and tokens[j] in word_to_id:
local_matrix[word_to_id[word], word_to_id[tokens[j]]] += 1
return local_matrix
def compute_cooccurrences(filepath, vocab):
"""Splits dataset and computes co-occurrences in parallel."""
word_to_id = {word: i for i, word in enumerate(vocab)}
with open(filepath, "r", encoding="utf-8") as file:
lines = file.readlines()
chunk_size = len(lines) // NUM_CORES
chunks = [lines[i : i + chunk_size] for i in range(0, len(lines), chunk_size)]
# Parallel execution
results = Parallel(n_jobs=NUM_CORES)(delayed(process_cooccurrences)(chunk, vocab, word_to_id) for chunk in tqdm(chunks, desc="Computing Co-occurrences"))
# Merge co-occurrence matrices
cooccurrence_matrix = lil_matrix((len(vocab), len(vocab)), dtype=np.int32)
for matrix in results:
cooccurrence_matrix += matrix
return cooccurrence_matrix, word_to_id
# Step 4: Compute PMI
def compute_pmi(cooccurrence_matrix, word_counts, word_to_id):
total_words = sum(word_counts.values())
total_pairs = cooccurrence_matrix.sum()
pmi_matrix = lil_matrix(cooccurrence_matrix.shape, dtype=np.float32)
for word1, i in word_to_id.items():
for word2, j in word_to_id.items():
if i >= j: # Avoid redundant calculations
continue
cooccur = cooccurrence_matrix[i, j]
if cooccur > 0:
p_w1 = word_counts[word1] / total_words
p_w2 = word_counts[word2] / total_words
p_w1_w2 = cooccur / total_pairs
pmi = np.log2(p_w1_w2 / (p_w1 * p_w2))
if pmi > PMI_THRESHOLD:
pmi_matrix[i, j] = pmi
pmi_matrix[j, i] = pmi # Symmetric
return pmi_matrix
# Step 5: Save PMI Results
def save_pmi(pmi_matrix, word_to_id, output_file="pmi_results.txt"):
id_to_word = {i: word for word, i in word_to_id.items()}
with open(output_file, "w", encoding="utf-8") as f:
for i, j in zip(*pmi_matrix.nonzero()):
f.write(f"{id_to_word[i]} {id_to_word[j]} {pmi_matrix[i, j]:.4f}\n")
print(f"PMI results saved to {output_file}")
# Main Execution
if __name__ == "__main__":
print("Starting Parallel PMI Computation for The Pile...")
# Step 1: Get word frequencies (parallelized)
word_counts = tokenize_and_count(DATASET_PATH)
# Step 2: Get top vocab
top_vocab = get_top_vocab(word_counts, VOCAB_SIZE)
print(f"Selected top {VOCAB_SIZE} words.")
# Step 3: Compute co-occurrences (parallelized)
cooccurrence_matrix, word_to_id = compute_cooccurrences(DATASET_PATH, top_vocab)
# Step 4: Compute PMI
pmi_matrix = compute_pmi(cooccurrence_matrix, word_counts, word_to_id)
# Step 5: Save PMI results
save_pmi(pmi_matrix)
print("PMI computation complete!")Editor is loading...
Leave a Comment