Untitled
unknown
python
3 years ago
6.2 kB
8
Indexable
from multiprocessing import Value, Pool, Lock
from collections import Counter, defaultdict, ChainMap
from itertools import combinations
import time, sys
import logging
import os
from pathlib import Path
Path(sys.argv[6]).mkdir(parents=True, exist_ok=True)
transactions = None
numOfProcesses = None
candidate_set = None
page_to_transactions = None
max_or = None
max_cs = None
folderPath = sys.argv[6]
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(os.path.join(folderPath,"mine.log")),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_transactions_data(file_path):
lines = open(file_path,'r').read().splitlines()
return [line.strip().split() for line in lines]
def initial_map(start_pos):
global numOfProcesses
end_pos = start_pos + min(int(len(transactions)/numOfProcesses),len(transactions)-start_pos)
counter = Counter()
cur_page_to_transactions = defaultdict(set)
for i in range(start_pos, end_pos):
counter.update(transactions[i])
for p in transactions[i]:
cur_page_to_transactions[p].add(i)
return counter, cur_page_to_transactions
def getSecondLevelCandidateSet(first_level_patterns):
pages = first_level_patterns.most_common()
return [(pages[i][0], pages[j][0]) for i in range(0,len(pages)) for j in range(i+1,len(pages))]
def getNextLevelCandidateSet(prev_level_patterns):
new_patterns = list()
same_part_map = defaultdict(list) # {'a':['b','c','d']}
for pattern in prev_level_patterns:
same_part_map[pattern[:-1]].append(pattern[-1])
for same_part_key in same_part_map:
new_patterns += [ same_part_key + new_part for new_part in combinations(same_part_map[same_part_key],2) ]
return new_patterns
def get_intersection_len(set1, set2):
return sum(1 for i in set2 if i in set1)
def get_no_patterns(start_pos):
global numOfProcesses
global page_to_transactions
global max_or
global transactions
end_pos = start_pos + min(int(len(candidate_set)/numOfProcesses),len(candidate_set)-start_pos)
no_patterns = {}
for i in range(start_pos, end_pos):
cur_set = set().union(*[page_to_transactions[candidate_set[i][j]] for j in range(0,len(candidate_set[i]) -1)])
intersection_len = get_intersection_len(cur_set, page_to_transactions[candidate_set[i][-1]])
logger.debug("intersection length of {} and {} = {}".format([candidate_set[i][j] for j in range(0,len(candidate_set[i]) -1)], candidate_set[i][-1],intersection_len))
logger.debug("{} {}".format(cur_set, page_to_transactions[candidate_set[i][-1]] ))
union_len = len(cur_set) + len(page_to_transactions[candidate_set[i][-1]]) - intersection_len
if intersection_len/len(page_to_transactions[candidate_set[i][-1]]) <= max_or and union_len/len(transactions) <= max_cs:
no_patterns[candidate_set[i]] = [union_len, intersection_len]
return no_patterns
def get_start_pos(count, step):
return list(range(0, count, step))
def save_first_level_patterns(file_path, first_level_patterns):
with open(file_path,'w') as f:
for p in first_level_patterns:
f.write("{} | {} | {}\n".format(p,first_level_patterns[p],0))
def save_nth_level_patterns(file_path, patterns):
with open(file_path,'w') as f:
for p in patterns:
f.write("{} | {} | {}\n".format(', '.join(map(str,p)), patterns[p][0], patterns[p][1]))
def ParallelCmine(inputFile, min_rf, folderPath):
global transactions
global numOfProcesses
global candidate_set
global page_to_transactions
logger.info("started mining")
logger.info("reading transactional data")
transactions = get_transactions_data(inputFile)
logger.info("First level mining")
web_page_counters = Counter()
page_to_transactions = defaultdict(set)
with Pool(processes=numOfProcesses) as pool:
counters = pool.map(initial_map, get_start_pos(len(transactions),int(len(transactions)/numOfProcesses)))
for c in counters:
web_page_counters += c[0]
for page in c[1]:
page_to_transactions[page] = page_to_transactions[page].union(c[1][page])
first_level_patterns = Counter({k: c for k, c in web_page_counters.items() if c >= min_rf*len(transactions) and c <= max_cs*len(transactions) })
logger.info("Got first level patterns : {} writing to file".format(len(first_level_patterns)))
save_first_level_patterns(os.path.join(folderPath,"1_level_patterns.txt"), first_level_patterns)
logger.info("Generating second level candidateset")
candidate_set = getSecondLevelCandidateSet(first_level_patterns)
level = 2
while True:
if len(candidate_set) <= 1:
break
logger.info("computing non-overlap patterns for level : {}".format(level))
no_patterns = {}
with Pool(processes=numOfProcesses) as pool:
no_patterns_parts = pool.map(get_no_patterns, get_start_pos(len(candidate_set), int(len(candidate_set)/numOfProcesses)))
no_patterns_parts.reverse()
no_patterns = dict(ChainMap(*no_patterns_parts))
logger.info("saving {} level patterns, got : {}".format(level, len(no_patterns)))
save_nth_level_patterns(os.path.join(folderPath,"{}_level_patterns.txt".format(level)), no_patterns)
logger.info("computing next level candidates")
new_patterns = no_patterns.keys()
candidate_set = getNextLevelCandidateSet(new_patterns)
level += 1
if __name__ == "__main__":
logger.info("Got the following arguments: {}".format(sys.argv))
min_rf = float(sys.argv[1])
max_or = float(sys.argv[2])
max_cs = float(sys.argv[3])
numOfProcesses = int(sys.argv[4])
transactions_file_path = str(sys.argv[5])
t1 = time.time()
output = ParallelCmine(transactions_file_path, min_rf, folderPath)
t2 = time.time()
logger.info("total time taken: {}".format(str(t2-t1)))Editor is loading...