Untitled
import pandas as pd import threading punter_card_file = {"type": "csv", "filename": "0829新赌客银行卡流水聚合3.csv"} primary_card_file = {"type": "xlsx", "filename": "一级卡_赌客进账提现筛选.xlsx"} output_file = {"type": "xlsx", "filename": "out.xlsx"} results = [] lock = threading.Lock() def df_from(file): if file["type"] == "csv": return pd.read_csv(file["filename"]) else: # file["type"] == "xlsx" return pd.read_excel(file["filename"]) def save(df: pd.DataFrame, file): if file["type"] == "csv": df.to_csv(file["filename"]) else: # file["type"] == "xlsx" df.to_excel(file["filename"]) def filter(row): return ( lambda x: len(str(x)) in range(16, 20) and str(x).startswith(("6", "4", "5", "9")) )(row["交易对手账卡号"]) and (row["关联次数"]) > 9 def filter_cardnum(cardnum_series): return cardnum_series[cardnum_series.apply(filter, axis=1)] def split_dict(original_dict, n): keys = list(original_dict.keys()) avg_size = len(keys) // n remainder = len(keys) % n split_dicts = [] start = 0 for i in range(n): end = start + avg_size + (1 if i < remainder else 0) subset_keys = keys[start:end] subset_dict = {key: original_dict[key] for key in subset_keys} split_dicts.append(subset_dict) start = end return split_dicts def gen_result_from_dict(dic, set): result = { "交易卡号": [], "总金额": [], "最晚交易时间": [], "一级卡卡号": [], } for one_punter_card_info in dic: print(dic) punter_card_num = next(iter(one_punter_card_info)) ls = one_punter_card_info[punter_card_num] 总金额 = 0 最晚交易时间 = "0" 一级卡卡号 = set() for item in ls: if item["交易对手账卡号"] not in set: continue if str(item["收付标志"]) == "出": continue 总金额 += float(item["交易金额"]) 最晚交易时间 = max(最晚交易时间, item["交易时间"]) 一级卡卡号.add(str(item["交易对手账卡号"])) result["交易卡号"].append(punter_card_num) result["总金额"].append(总金额) result["最晚交易时间"].append(最晚交易时间) result["一级卡卡号"].append(", ".join(list(一级卡卡号))) with lock: results.append(result) # return result def main(): primary_card_set = set(filter_cardnum(df_from(primary_card_file))) punter_card_dict = { key: group.to_dict("records") for key, group in df_from(punter_card_file).groupby("交易卡号") } print("== load file ok ==\n") # multi-threading punter_card_dicts = split_dict(punter_card_dict, 8) threads = [] for i in range(8): t = threading.Thread( target=gen_result_from_dict, args=(punter_card_dicts[i], primary_card_set), ) threads.append(t) t.start() for t in threads: t.join() print("== merge results ==\n") # merge final_result = { "交易卡号": [], "总金额": [], "最晚交易时间": [], "一级卡卡号": [], } for result in results: final_result["交易卡号"] = result["交易卡号"] final_result["总金额"] = result["总金额"] final_result["最晚交易时间"] = result["最晚交易时间"] final_result["一级卡卡号"] = result["一级卡卡号"] save(pd.DataFrame(final_result), output_file) main()
Leave a Comment