Untitled

 avatar
unknown
python
2 years ago
6.2 kB
7
Indexable
from multiprocessing import Value, Pool, Lock
from collections import Counter, defaultdict, ChainMap
from itertools import combinations
import time, sys
import logging
import os
from pathlib import Path

Path(sys.argv[6]).mkdir(parents=True, exist_ok=True)

transactions = None
numOfProcesses = None
candidate_set = None
page_to_transactions = None
max_or = None
max_cs = None

folderPath = sys.argv[6]

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler(os.path.join(folderPath,"mine.log")),
        logging.StreamHandler(sys.stdout)
    ]
)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

def get_transactions_data(file_path):
    lines = open(file_path,'r').read().splitlines()
    return [line.strip().split() for line in lines]

def initial_map(start_pos):
    global numOfProcesses
    end_pos = start_pos + min(int(len(transactions)/numOfProcesses),len(transactions)-start_pos)
    counter = Counter()
    cur_page_to_transactions = defaultdict(set)
    for i in range(start_pos, end_pos):
        counter.update(transactions[i])
        for p in transactions[i]:
            cur_page_to_transactions[p].add(i)
    return counter, cur_page_to_transactions

def getSecondLevelCandidateSet(first_level_patterns):
    pages = first_level_patterns.most_common()
    return [(pages[i][0], pages[j][0]) for i in range(0,len(pages)) for j in range(i+1,len(pages))]

def getNextLevelCandidateSet(prev_level_patterns):
    new_patterns = list()
    same_part_map = defaultdict(list) # {'a':['b','c','d']}
    for pattern in prev_level_patterns:
        same_part_map[pattern[:-1]].append(pattern[-1])
    for same_part_key in same_part_map:
        new_patterns += [ same_part_key + new_part for new_part in combinations(same_part_map[same_part_key],2) ]
    return new_patterns

def get_intersection_len(set1, set2):
    return sum(1 for i in set2 if i in set1)

def get_no_patterns(start_pos):
    global numOfProcesses
    global page_to_transactions
    global max_or
    global transactions
    end_pos = start_pos + min(int(len(candidate_set)/numOfProcesses),len(candidate_set)-start_pos)
    no_patterns = {}
    for i in range(start_pos, end_pos):
        cur_set = set().union(*[page_to_transactions[candidate_set[i][j]] for j in range(0,len(candidate_set[i]) -1)])
        intersection_len = get_intersection_len(cur_set, page_to_transactions[candidate_set[i][-1]])
        logger.debug("intersection length of {} and {}  = {}".format([candidate_set[i][j] for j in range(0,len(candidate_set[i]) -1)],  candidate_set[i][-1],intersection_len))
        logger.debug("{} {}".format(cur_set, page_to_transactions[candidate_set[i][-1]] ))
        union_len = len(cur_set) + len(page_to_transactions[candidate_set[i][-1]]) - intersection_len
        if intersection_len/len(page_to_transactions[candidate_set[i][-1]]) <= max_or and union_len/len(transactions) <= max_cs:
        	no_patterns[candidate_set[i]] = [union_len, intersection_len]
    return no_patterns
        
def get_start_pos(count, step):
    return list(range(0, count, step))

def save_first_level_patterns(file_path, first_level_patterns):
    with open(file_path,'w') as f:
        for p in first_level_patterns:
            f.write("{} | {} | {}\n".format(p,first_level_patterns[p],0))
            
def save_nth_level_patterns(file_path, patterns):
    with open(file_path,'w') as f:
        for p in patterns:
            f.write("{} | {} | {}\n".format(', '.join(map(str,p)), patterns[p][0], patterns[p][1]))

def ParallelCmine(inputFile, min_rf, folderPath):
    global transactions
    global numOfProcesses
    global candidate_set
    global page_to_transactions
    
    logger.info("started mining")
    
    logger.info("reading transactional data")
    transactions = get_transactions_data(inputFile)
    
    logger.info("First level mining")
    
    web_page_counters = Counter()
    page_to_transactions = defaultdict(set)
    with Pool(processes=numOfProcesses) as pool:
        counters = pool.map(initial_map, get_start_pos(len(transactions),int(len(transactions)/numOfProcesses)))
        for c in counters:
            web_page_counters += c[0]
            for page in c[1]:
                page_to_transactions[page] = page_to_transactions[page].union(c[1][page])
    
    first_level_patterns = Counter({k: c for k, c in web_page_counters.items() if c >= min_rf*len(transactions) and c <= max_cs*len(transactions) })
    
    logger.info("Got first level patterns : {}  writing to file".format(len(first_level_patterns)))
    
    save_first_level_patterns(os.path.join(folderPath,"1_level_patterns.txt"), first_level_patterns)
    
    logger.info("Generating second level candidateset")
    
    candidate_set = getSecondLevelCandidateSet(first_level_patterns)
    
    level = 2
    
    while True:
        if len(candidate_set) <= 1:
            break
        
        logger.info("computing non-overlap patterns for level : {}".format(level))
        
        no_patterns = {}
        with Pool(processes=numOfProcesses) as pool:
            no_patterns_parts = pool.map(get_no_patterns, get_start_pos(len(candidate_set), int(len(candidate_set)/numOfProcesses)))
            no_patterns_parts.reverse()
            no_patterns = dict(ChainMap(*no_patterns_parts))
        
        logger.info("saving {} level patterns, got : {}".format(level, len(no_patterns)))
        save_nth_level_patterns(os.path.join(folderPath,"{}_level_patterns.txt".format(level)), no_patterns)
        
        logger.info("computing next level candidates")
        new_patterns = no_patterns.keys()
        candidate_set = getNextLevelCandidateSet(new_patterns)
        level += 1



if __name__ == "__main__":
	logger.info("Got the following arguments: {}".format(sys.argv))
	min_rf = float(sys.argv[1])
	max_or = float(sys.argv[2])
	max_cs = float(sys.argv[3])
	numOfProcesses = int(sys.argv[4])
	transactions_file_path = str(sys.argv[5])
	t1 = time.time()
	output = ParallelCmine(transactions_file_path, min_rf, folderPath)
	t2 = time.time()
	logger.info("total time taken: {}".format(str(t2-t1)))
Editor is loading...