Untitled
unknown
python
3 years ago
6.2 kB
10
Indexable
from multiprocessing import Value, Pool, Lock
from collections import Counter, defaultdict, ChainMap
from itertools import combinations
import time, sys
import logging
import os
from pathlib import Path
Path(sys.argv[6]).mkdir(parents=True, exist_ok=True)
transactions = None
numOfProcesses = None
candidate_set = None
page_to_transactions = None
max_or = None
max_cs = None
folderPath = sys.argv[6]
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(os.path.join(folderPath,"mine.log")),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_transactions_data(file_path):
lines = open(file_path,'r').read().splitlines()
return [line.strip().split() for line in lines]
def initial_map(start_pos):
global numOfProcesses
end_pos = start_pos + min(int(len(transactions)/numOfProcesses),len(transactions)-start_pos)
counter = Counter()
cur_page_to_transactions = defaultdict(set)
for i in range(start_pos, end_pos):
counter.update(transactions[i])
for p in transactions[i]:
cur_page_to_transactions[p].add(i)
return counter, cur_page_to_transactions
def getSecondLevelCandidateSet(first_level_patterns):
pages = first_level_patterns.most_common()
return [(pages[i][0], pages[j][0]) for i in range(0,len(pages)) for j in range(i+1,len(pages))]
def getNextLevelCandidateSet(prev_level_patterns):
new_patterns = list()
same_part_map = defaultdict(list) # {'a':['b','c','d']}
for pattern in prev_level_patterns:
same_part_map[pattern[:-1]].append(pattern[-1])
for same_part_key in same_part_map:
new_patterns += [ same_part_key + new_part for new_part in combinations(same_part_map[same_part_key],2) ]
return new_patterns
def get_intersection_len(set1, set2):
return sum(1 for i in set2 if i in set1)
def get_no_patterns(start_pos):
global numOfProcesses
global page_to_transactions
global max_or
global transactions
end_pos = start_pos + min(int(len(candidate_set)/numOfProcesses),len(candidate_set)-start_pos)
no_patterns = {}
for i in range(start_pos, end_pos):
cur_set = set().union(*[page_to_transactions[candidate_set[i][j]] for j in range(0,len(candidate_set[i]) -1)])
intersection_len = get_intersection_len(cur_set, page_to_transactions[candidate_set[i][-1]])
logger.debug("intersection length of {} and {} = {}".format([candidate_set[i][j] for j in range(0,len(candidate_set[i]) -1)], candidate_set[i][-1],intersection_len))
logger.debug("{} {}".format(cur_set, page_to_transactions[candidate_set[i][-1]] ))
union_len = len(cur_set) + len(page_to_transactions[candidate_set[i][-1]]) - intersection_len
if intersection_len/len(page_to_transactions[candidate_set[i][-1]]) <= max_or and union_len/len(transactions) <= max_cs:
no_patterns[candidate_set[i]] = [union_len, intersection_len]
return no_patterns
def get_start_pos(count, step):
return list(range(0, count, step))
def save_first_level_patterns(file_path, first_level_patterns):
with open(file_path,'w') as f:
for p in first_level_patterns:
f.write("{} | {} | {}\n".format(p,first_level_patterns[p],0))
def save_nth_level_patterns(file_path, patterns):
with open(file_path,'w') as f:
for p in patterns:
f.write("{} | {} | {}\n".format(', '.join(map(str,p)), patterns[p][0], patterns[p][1]))
def ParallelCmine(inputFile, min_rf, folderPath):
global transactions
global numOfProcesses
global candidate_set
global page_to_transactions
logger.info("started mining")
logger.info("reading transactional data")
transactions = get_transactions_data(inputFile)
logger.info("First level mining")
web_page_counters = Counter()
page_to_transactions = defaultdict(set)
with Pool(processes=numOfProcesses) as pool:
counters = pool.map(initial_map, get_start_pos(len(transactions),int(len(transactions)/numOfProcesses)))
for c in counters:
web_page_counters += c[0]
for page in c[1]:
page_to_transactions[page] = page_to_transactions[page].union(c[1][page])
first_level_patterns = Counter({k: c for k, c in web_page_counters.items() if c >= min_rf*len(transactions) and c <= max_cs*len(transactions) })
logger.info("Got first level patterns : {} writing to file".format(len(first_level_patterns)))
save_first_level_patterns(os.path.join(folderPath,"1_level_patterns.txt"), first_level_patterns)
logger.info("Generating second level candidateset")
candidate_set = getSecondLevelCandidateSet(first_level_patterns)
level = 2
while True:
if len(candidate_set) <= 1:
break
logger.info("computing non-overlap patterns for level : {}".format(level))
no_patterns = {}
with Pool(processes=numOfProcesses) as pool:
no_patterns_parts = pool.map(get_no_patterns, get_start_pos(len(candidate_set), int(len(candidate_set)/numOfProcesses)))
no_patterns_parts.reverse()
no_patterns = dict(ChainMap(*no_patterns_parts))
logger.info("saving {} level patterns, got : {}".format(level, len(no_patterns)))
save_nth_level_patterns(os.path.join(folderPath,"{}_level_patterns.txt".format(level)), no_patterns)
logger.info("computing next level candidates")
new_patterns = no_patterns.keys()
candidate_set = getNextLevelCandidateSet(new_patterns)
level += 1
if __name__ == "__main__":
logger.info("Got the following arguments: {}".format(sys.argv))
min_rf = float(sys.argv[1])
max_or = float(sys.argv[2])
max_cs = float(sys.argv[3])
numOfProcesses = int(sys.argv[4])
transactions_file_path = str(sys.argv[5])
t1 = time.time()
output = ParallelCmine(transactions_file_path, min_rf, folderPath)
t2 = time.time()
logger.info("total time taken: {}".format(str(t2-t1)))Editor is loading...