Untitled
unknown
python
a year ago
26 kB
5
Indexable
import time as timer import aiomysql import asyncio import aiohttp import json from datetime import datetime from oauth2client.service_account import ServiceAccountCredentials from googleapiclient.discovery import build from googleapiclient.errors import HttpError import httplib2 # SQLALCHEMY from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy import Column, BigInteger, Boolean, String, JSON from sqlalchemy.sql.expression import insert, desc, update from sqlalchemy.future import select from sqlalchemy.orm import declarative_base, sessionmaker engine = create_async_engine( "postgresql+asyncpg://cashbox_user:wAieiwt@94.250.520.135:48425/amo_sheets") Base = declarative_base() # MODEL class A(Base): __tablename__ = "amo_leads_v2" id = Column(BigInteger) our_id = Column(BigInteger, primary_key=True) amo_account = Column(String) amo_account_id = Column(BigInteger) name = Column(String) price = Column(BigInteger) responsible_user_id = Column(BigInteger) group_id = Column(BigInteger) status_id = Column(BigInteger) pipeline_id = Column(BigInteger) loss_reason_id = Column(BigInteger) created_by = Column(BigInteger) updated_by = Column(BigInteger) created_at = Column(BigInteger) updated_at = Column(BigInteger) closed_at = Column(BigInteger) closest_task_at = Column(BigInteger) is_deleted = Column(Boolean) custom_fields_values = Column(JSON) MYSQL_DATA = { "user": 'user_agent_master', "password": 'ygzl2SMaMN8Te5I0QOGnO34unTO', "host": '195.140.146.20', "db": "user_agent_master", "port": 3306 } class AmoToSheet: async def first_settings(self): async with engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) self.post_pool = sessionmaker( engine, expire_on_commit=False, class_=AsyncSession ) self.mysql_pool = await aiomysql.create_pool(**MYSQL_DATA) scopes = ['https://www.googleapis.com/auth/spreadsheets'] creds_service = ServiceAccountCredentials.from_json_keyfile_dict( creds_json, scopes).authorize(httplib2.Http()) self.build = build('sheets', 'v4', http=creds_service) async def first_upload(self, amo_data): print(f"First db upload for {amo_data['account']} is started") headers = {"Authorization": f"Bearer {amo_data['token']}"} page = 1 time_s = datetime.now() async with aiohttp.ClientSession(headers=headers) as session: while True: timer.sleep(1) url = f"https://{amo_data['account']}/api/v4/leads?limit=250&page={page}&order[created_at]=asc" async with session.get(url) as response: if response.status == 200: amo_json_resp = await response.json( content_type='application/hal+json') leads = amo_json_resp['_embedded']['leads'] leads_data = [] for lead in leads: lead['amo_account'] = amo_data['account'] lead['amo_account_id'] = amo_data['account_id'] del lead['_links'] del lead['_embedded'] del lead['score'] del lead['account_id'] if "labor_cost" in lead: del lead['labor_cost'] if "is_price_computed" in lead: del lead['is_price_computed'] leads_data.append(lead) async with self.post_pool() as post_session: async with post_session.begin(): await post_session.execute( insert(A).values(leads_data)) print(f"{page} - inserted") page += 1 await session.post( url="https://bpm-run.ru/api/events?token=042812d1df955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": str(response.method), "url": str(response.url), "status_code": int(response.status), "request_body": json.dumps({}), "response": json.dumps(amo_json_resp) } ) else: print( f"First upload completed for {amo_data['account']}") print(datetime.now() - time_s) amo_resp = await response.text() await session.post( url="https://bpm-run.ru/api/events?token=042812d12dc17f2955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": str(response.method), "url": str(response.url), "status_code": int(response.status), "request_body": json.dumps({}), "response": json.dumps({"error": amo_resp}) } ) break async def update_leads(self, amo_data): print(f"Update for {amo_data['account']} is started") headers = {"Authorization": f"Bearer {amo_data['token']}"} time = amo_data['updated_last'] page = 1 async with aiohttp.ClientSession(headers=headers) as session: while True: url = f"https://{amo_data['account']}/api/v4/leads?order[updated_at]=asc&filter[updated_at][from]={time}&page={page}&limit=250&with=contacts" async with session.get(url) as response: if response.status == 200: amo_json_resp = await response.json( content_type='application/hal+json') await session.post( url="https://bpm-run.ru/api/events?token=042812d12dc17f2c095f1583fdadf955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": str(response.method), "url": str(response.url), "status_code": int(response.status), "request_body": json.dumps({}), "response": json.dumps(amo_json_resp) } ) else: amo_resp = await response.text() await session.post( url="https://bpm-run.ru/api/events?token=042812d12ddf955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": str(response.method), "url": str(response.url), "status_code": int(response.status), "request_body": json.dumps({}), "response": json.dumps({"error": amo_resp}) } ) if response.status == 200: leads = amo_json_resp['_embedded']['leads'] leads_data = [] for lead in leads: lead['amo_account'] = amo_data['account'] lead['amo_account_id'] = amo_data['account_id'] del lead['_links'] del lead['_embedded'] del lead['score'] del lead['account_id'] if "labor_cost" in lead: del lead['labor_cost'] if "is_price_computed" in lead: del lead['is_price_computed'] leads_data.append(lead) async with self.post_pool() as post_session: async with post_session.begin(): for lead_obj in leads_data: db_resp = await post_session.execute( select(A).where( A.id == lead_obj[ "id"] and A.amo_account_id == lead_obj["amo_account_id"])) lead = db_resp.scalars().first() if lead: await post_session.execute( update(A).where( A.id == lead_obj[ "id"] and A.amo_account_id == lead_obj[ "amo_account_id"]).values( lead_obj)) else: await post_session.execute( insert(A).values(lead_obj)) page += 1 else: break async def update_google(self, spread_data): body = {"values": []} try: fields = (self.build.spreadsheets().values().get( spreadsheetId=spread_data['spread_id'], range="Лист2!A1:ZZZ1").execute()).get("values") except HttpError as e: print(e) # timer.sleep(20) # fields = (self.build.spreadsheets().values().get(spreadsheetId=spread_data['spread_id'], # range="Лист1!A1:ZZZ1").execute()).get("values") fields = None if fields: flag = fields[0][0] custom_table_fields = fields[0][7:] else: flag = None custom_table_fields = None default_headers = ["amo_lead_id", "amo_lead_link", "amo_lead_name", "amo_lead_status_name", "amo_lead_user", "amo_contact_name", "amo_contact_phone", "created_at"] async with self.post_pool() as session: async with session.begin(): db_resp = await session.execute(select(A).where( A.pipeline_id == spread_data[ 'pipeline_id'] and A.amo_account_id == spread_data[ 'account_id'])) leads = db_resp.scalars() if leads: async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cur: for lead in leads: lead_dict = lead.__dict__ custom_fields_ids = [int(field.get("field_id")) for field in lead_dict[ 'custom_fields_values']] if \ lead_dict[ 'custom_fields_values'] else None try: status = next( item for item in spread_data['statuses'] if item["id"] == lead_dict["status_id"]) except StopIteration: status = {"name": ""} await cur.execute( f"SELECT name FROM amo_users WHERE amo_user_id={lead_dict['responsible_user_id']}") user = await cur.fetchone() if user: lead_resp_user = user[0] else: lead_resp_user = None await cur.execute( f"SELECT amo_install_id, amo_contact_id FROM amo_lead_contacts WHERE amo_lead_id={lead_dict['id']}") contact_rel = await cur.fetchone() contact_name = None contact_phone = None if contact_rel: await cur.execute( f"SELECT name, phone FROM amo_contacts WHERE amo_install_id={contact_rel[0]} AND amo_contact_id={contact_rel[1]}") contact = await cur.fetchone() if contact: contact_name = contact[0] contact_phone = contact[1] lead_for_google = [lead_dict['id'], f"https://{lead_dict['amo_account']}/leads/detail/{lead_dict['id']}", lead_dict['name'], status.get("name"), lead_resp_user, contact_name, contact_phone, datetime.utcfromtimestamp( lead_dict['created_at']) \ .strftime('%d.%m.%Y %H:%M:%S')] for index, check in enumerate(lead_for_google): if str(check).startswith("+"): lead_for_google[index] = str(check).replace( "+", "") if lead_dict['custom_fields_values']: if custom_table_fields: for custom in custom_table_fields: if len(custom.split(":")) != 1: custom_id = custom.split(":")[1] if custom_id.isdigit() and int( custom_id) in custom_fields_ids: for c in lead_dict[ 'custom_fields_values']: if int(custom_id) == int( c.get("field_id")): value = c.get("values")[ 0].get("value") if value is True: value = "1" elif value is False: value = "0" elif ( str(value).startswith( "15") or str( value).startswith( "16") or str( value).startswith( "17")) and len( str(value)) == 10: value = datetime.fromtimestamp( int(value) + 10800).strftime( "%d.%m.%Y") elif str(value).startswith( "+"): value = str( value).replace("+", "") else: value = value lead_for_google.append( value) else: lead_for_google.append(None) body['values'].append(lead_for_google) if custom_table_fields: async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cur: for custom in custom_table_fields: if len(custom.split(":")) != 1: custom_id = custom.split(":")[1] if custom_id.isdigit() and flag: if len(flag.split(":")) != 1: if flag.split(":")[1] == "1": default_headers[ 0] = f"amo_lead_id:{flag.split(':')[1]}" await cur.execute( f"SELECT name FROM amo_lead_fields WHERE amo_id={custom_id}") rus_name = await cur.fetchone() if rus_name: default_headers.append( f"{rus_name[0]}:{custom_id}") else: default_headers.append( f"lead:{custom_id}") else: default_headers[ 0] = f"amo_lead_id:{flag.split(':')[1]}" default_headers.append( f"lead:{custom_id}") else: default_headers.append( f"lead:{custom_id}") else: default_headers.append(f"lead:{custom_id}") body['values'].insert(0, default_headers) try: self.build.spreadsheets().values().clear( spreadsheetId=spread_data['spread_id'], range="Лист1!A1:Z50000000").execute() self.build.spreadsheets().values().append( spreadsheetId=spread_data['spread_id'], range="Лист1!A1", valueInputOption="USER_ENTERED", body=body).execute() async with aiohttp.ClientSession() as session_http: await session_http.post( url="https://bpm-run.ru/api/events?token=042812d1f955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": "UPDATE", "url": f"Google-Tables Update {spread_data['spread_id']}", "status_code": 200, "request_body": json.dumps(body), "response": json.dumps({}) } ) except HttpError as e: print(e) timer.sleep(20) self.build.spreadsheets().values().clear( spreadsheetId=spread_data['spread_id'], range="Лист1!A1:Z50000000").execute() self.build.spreadsheets().values().append( spreadsheetId=spread_data['spread_id'], range="Лист1!A1", valueInputOption="USER_ENTERED", body=body).execute() async with aiohttp.ClientSession() as session_http: await session_http.post( url="https://bpm-run.ru/api/events?token=042812d1283fdadf955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": "UPDATE", "url": f"Google-Tables Update {spread_data['spread_id']}", "status_code": 200, "request_body": json.dumps(body), "response": json.dumps({}) } ) print(f"Update spreadsheet {spread_data['spread_id']}") async def send_to_google_update(self, amo_data, exists): amo_data['updated_last'] = int(exists.updated_at) await self.update_leads(amo_data) print(f"Updated {amo_data['account']}") tasks = [] statuses_list = [] async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( f"SELECT statuses FROM amo_lead_pipelines WHERE amo_install_id={amo_data['account_id']}") status_db = await cur.fetchall() await cur.execute( f"SELECT spreadsheet_id, pipeline_id FROM amo_to_sheets WHERE active = 1 AND amo_install_id = {amo_data['account_id']}") sheets = await cur.fetchall() for status in status_db: statuses_list.append(json.loads(status[0])) statuses = [item for sublist in statuses_list for item in sublist] for sheet in sheets: await cur.execute( f"SELECT amo_pipeline_id FROM amo_lead_pipelines WHERE id={sheet[1]}") pipeline = await cur.fetchone() sheet_data = {"spread_id": sheet[0], "pipeline_id": pipeline[0], "statuses": statuses, "account_id": amo_data['account_id']} tasks.append( asyncio.create_task(self.update_google(sheet_data))) await asyncio.gather(*tasks) if amo_data['account_id'] == 45: async with aiohttp.ClientSession() as bpm_session: await bpm_session.post( "https://bpm-run.ru/api/hook_d453bbb76672c5e4fb0fd452f", json={}) await bpm_session.post( "https://bpm-run.ru/api/hook_b47fbf98400ee34664c3eed15", json={}) await bpm_session.post( "https://bpm-run.ru/api/hook_ef849a33337251d419ee53f22", json={}) # aвтоквалснг2 await bpm_session.close() async def get_all_tables(self): await self.first_settings() start_time = datetime.now() async with self.mysql_pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute( "SELECT id, spreadsheet_id, pipeline_id, amo_install_id FROM amo_to_sheets WHERE active = 1") tables = await cur.fetchall() unique_ids = list(set([row[-1] for row in tables])) tasks = [] print(f"Unique_ids: {str(unique_ids)}") async with self.post_pool() as session: async with session.begin(): for account_id in unique_ids: await cur.execute( f"SELECT amo_account, amo_access_token FROM amo_install WHERE id={account_id}") amo_install = await cur.fetchone() amo_data = {"account": amo_install[0], "account_id": account_id, "token": amo_install[1]} db_resp = await session.execute( select(A).where( A.amo_account_id == account_id).order_by( desc(A.updated_at)).limit(1)) exists = db_resp.scalars().first() if exists: tasks.append(asyncio.create_task( self.send_to_google_update(amo_data, exists))) else: await self.first_upload(amo_data) db_resp = await session.execute( select(A).where( A.amo_account_id == account_id).order_by( desc(A.updated_at)).limit( 1)) exists = db_resp.scalars().first() tasks.append(asyncio.create_task( self.send_to_google_update(amo_data, exists))) await asyncio.gather(*tasks) print("All tables were updated!") print(f"Running time: {datetime.now() - start_time}") async def main(): print(f"Time of script start {datetime.now()}") async with aiohttp.ClientSession() as session: await session.post( url="https://bpm-run.ru/api/events?token=0f955c2034abcd6b1dfdf3a48be2a616e59f1", json={ "protocol": "START", "url": "Google-Tables Start", "status_code": 200, "request_body": json.dumps({}), "response": json.dumps({}) } ) ss = AmoToSheet() await ss.get_all_tables() asyncio.run(main())
Editor is loading...
Leave a Comment