Untitled
unknown
plain_text
a year ago
2.8 kB
11
Indexable
def get_immediate_parent_children(self, df):
try:
conn = sqlite3.connect(os.getenv('SQLITE_DB_PATH'))
for index, row in df.iterrows():
var_name = row['Target']
program_id = row['Program_Name']
parent_child_var = {(var_name, var_name, program_id, 'paraname')}
master_set = set()
while parent_child_var:
current_var = parent_child_var.pop()
data = pd.read_sql_query(
sql=f"""
SELECT VARIABLE_PARENT, PROGRAM_NAME FROM BRE_ATTRIBUTE_TBL
WHERE VARIABLE_NAME='{current_var[0]}' AND VARIABLE_PARENT<>'N/A' AND PROGRAM_NAME='{program_id}'
""",
con=conn
).drop_duplicates()
work_set = set([(d['VARIABLE_PARENT'], current_var[0], d['PROGRAM_NAME']) for i,d in data.iterrows() if (d['VARIABLE_PARENT'], current_var[0], d['PROGRAM_NAME']) not in master_set])
master_set = master_set.union(work_set)
parent_child_var = work_set.union(parent_child_var)
data = pd.read_sql_query(
sql=f"""
SELECT VARIABLE_NAME, PROGRAM_NAME FROM BRE_ATTRIBUTE_TBL
WHERE VARIABLE_PARENT='{current_var[0]}' AND VARIABLE_NAME<>'N/A' AND PROGRAM_NAME='{program_id}'
""",
con=conn
).drop_duplicates()
work_set = set([(current_var[0], d['VARIABLE_NAME'], d['PROGRAM_NAME']) for i,d in data.iterrows() if (current_var[0], d['VARIABLE_NAME'], d['PROGRAM_NAME']) not in master_set])
master_set = master_set.union(work_set)
parents, children = [], []
parent_child_pairs = [i for i in master_set if (var_name in i[1] or var_name in i[2])]
for pair in parent_child_pairs:
if pair[1] == var_name: parents.append(pair[0])
if pair[0] == var_name: children.append(pair[1])
row['Parents'] = ', '.join(parents)
row['Children'] = ', '.join(children)
parent_child_var = work_set.union(parent_child_var)
conn.close()
return df
except Exception as e:
print("\n\n\nEXCEPTION OCCURRED HERE :",e)Editor is loading...
Leave a Comment