Untitled
def func_sqlcol( __pv_df_param, __pdf_large_cols): __dtypedict = {} __cols_clob = [] __cols_blob = [] try: #print(("__df_large_cols: \n%s" % __pdf_large_cols)) if len(__pdf_large_cols) > 0: __cols_clob = __pdf_large_cols['column_name_clob'].values[0] __cols_clob = __cols_clob.split(',') if __cols_clob else [] __cols_blob = __pdf_large_cols['column_name_blob'].values[0] __cols_blob = __cols_blob.split(',') if __cols_blob else [] print(("__cols_clob : type{%s} len{%s}" % (type(__cols_clob), len(__cols_clob)))) print(("__cols_blob : type{%s} len{%s}" % (type(__cols_blob), len(__cols_blob)))) for i,j in zip(__pv_df_param.columns, __pv_df_param.dtypes): # table(s): additionaldet_xml_template.xml_template, tom_batch_order_template_ref.template if "object" in str(j) and i in __cols_blob: __dtypedict.update({i: sqlalchemy_types.BLOB(length=None)}) elif "object" in str(j) and i in __cols_clob: # table(s): tom_service_action_master.extended_rule, generic_template_ref.template_text __dtypedict.update({i: sqlalchemy_types.CLOB(length=None)}) elif "object" in str(j): __dtypedict.update({i: sqlalchemy_types.NVARCHAR(length=255)}) if "datetime" in str(j): __dtypedict.update({i: sqlalchemy_types.DateTime()}) if "float" in str(j): __dtypedict.update({i: sqlalchemy_types.Float(precision=3, asdecimal=True)}) if "int" in str(j): __dtypedict.update({i: sqlalchemy_types.INT()}) except Exception as __exp: print(("func_sqlcol exception. Error - {%s} . Line No - {%s} " % (str(__exp),str(sys_exc_info()[-1].tb_lineno)))) raise AirflowException(str(__exp)) return __dtypedict why do we write above function
Leave a Comment