Untitled
unknown
plain_text
2 years ago
2.0 kB
7
Indexable
def func_sqlcol( __pv_df_param, __pdf_large_cols):
__dtypedict = {}
__cols_clob = []
__cols_blob = []
try:
#print(("__df_large_cols: \n%s" % __pdf_large_cols))
if len(__pdf_large_cols) > 0:
__cols_clob = __pdf_large_cols['column_name_clob'].values[0]
__cols_clob = __cols_clob.split(',') if __cols_clob else []
__cols_blob = __pdf_large_cols['column_name_blob'].values[0]
__cols_blob = __cols_blob.split(',') if __cols_blob else []
print(("__cols_clob : type{%s} len{%s}" % (type(__cols_clob), len(__cols_clob))))
print(("__cols_blob : type{%s} len{%s}" % (type(__cols_blob), len(__cols_blob))))
for i,j in zip(__pv_df_param.columns, __pv_df_param.dtypes):
# table(s): additionaldet_xml_template.xml_template, tom_batch_order_template_ref.template
if "object" in str(j) and i in __cols_blob:
__dtypedict.update({i: sqlalchemy_types.BLOB(length=None)})
elif "object" in str(j) and i in __cols_clob:
# table(s): tom_service_action_master.extended_rule, generic_template_ref.template_text
__dtypedict.update({i: sqlalchemy_types.CLOB(length=None)})
elif "object" in str(j):
__dtypedict.update({i: sqlalchemy_types.NVARCHAR(length=255)})
if "datetime" in str(j):
__dtypedict.update({i: sqlalchemy_types.DateTime()})
if "float" in str(j):
__dtypedict.update({i: sqlalchemy_types.Float(precision=3, asdecimal=True)})
if "int" in str(j):
__dtypedict.update({i: sqlalchemy_types.INT()})
except Exception as __exp:
print(("func_sqlcol exception. Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno))))
raise AirflowException(str(__exp))
return __dtypedict
why do we write above functionEditor is loading...
Leave a Comment