Untitled
unknown
python
2 years ago
26 kB
8
Indexable
import time as timer
import aiomysql
import asyncio
import aiohttp
import json
from datetime import datetime
from oauth2client.service_account import ServiceAccountCredentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import httplib2
# SQLALCHEMY
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import Column, BigInteger, Boolean, String, JSON
from sqlalchemy.sql.expression import insert, desc, update
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base, sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://cashbox_user:wAieiwt@94.250.520.135:48425/amo_sheets")
Base = declarative_base()
# MODEL
class A(Base):
__tablename__ = "amo_leads_v2"
id = Column(BigInteger)
our_id = Column(BigInteger, primary_key=True)
amo_account = Column(String)
amo_account_id = Column(BigInteger)
name = Column(String)
price = Column(BigInteger)
responsible_user_id = Column(BigInteger)
group_id = Column(BigInteger)
status_id = Column(BigInteger)
pipeline_id = Column(BigInteger)
loss_reason_id = Column(BigInteger)
created_by = Column(BigInteger)
updated_by = Column(BigInteger)
created_at = Column(BigInteger)
updated_at = Column(BigInteger)
closed_at = Column(BigInteger)
closest_task_at = Column(BigInteger)
is_deleted = Column(Boolean)
custom_fields_values = Column(JSON)
MYSQL_DATA = {
"user": 'user_agent_master',
"password": 'ygzl2SMaMN8Te5I0QOGnO34unTO',
"host": '195.140.146.20',
"db": "user_agent_master",
"port": 3306
}
class AmoToSheet:
async def first_settings(self):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
self.post_pool = sessionmaker(
engine, expire_on_commit=False, class_=AsyncSession
)
self.mysql_pool = await aiomysql.create_pool(**MYSQL_DATA)
scopes = ['https://www.googleapis.com/auth/spreadsheets']
creds_service = ServiceAccountCredentials.from_json_keyfile_dict(
creds_json, scopes).authorize(httplib2.Http())
self.build = build('sheets', 'v4', http=creds_service)
async def first_upload(self, amo_data):
print(f"First db upload for {amo_data['account']} is started")
headers = {"Authorization": f"Bearer {amo_data['token']}"}
page = 1
time_s = datetime.now()
async with aiohttp.ClientSession(headers=headers) as session:
while True:
timer.sleep(1)
url = f"https://{amo_data['account']}/api/v4/leads?limit=250&page={page}&order[created_at]=asc"
async with session.get(url) as response:
if response.status == 200:
amo_json_resp = await response.json(
content_type='application/hal+json')
leads = amo_json_resp['_embedded']['leads']
leads_data = []
for lead in leads:
lead['amo_account'] = amo_data['account']
lead['amo_account_id'] = amo_data['account_id']
del lead['_links']
del lead['_embedded']
del lead['score']
del lead['account_id']
if "labor_cost" in lead:
del lead['labor_cost']
if "is_price_computed" in lead:
del lead['is_price_computed']
leads_data.append(lead)
async with self.post_pool() as post_session:
async with post_session.begin():
await post_session.execute(
insert(A).values(leads_data))
print(f"{page} - inserted")
page += 1
await session.post(
url="https://bpm-run.ru/api/events?token=042812d1df955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": str(response.method),
"url": str(response.url),
"status_code": int(response.status),
"request_body": json.dumps({}),
"response": json.dumps(amo_json_resp)
}
)
else:
print(
f"First upload completed for {amo_data['account']}")
print(datetime.now() - time_s)
amo_resp = await response.text()
await session.post(
url="https://bpm-run.ru/api/events?token=042812d12dc17f2955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": str(response.method),
"url": str(response.url),
"status_code": int(response.status),
"request_body": json.dumps({}),
"response": json.dumps({"error": amo_resp})
}
)
break
async def update_leads(self, amo_data):
print(f"Update for {amo_data['account']} is started")
headers = {"Authorization": f"Bearer {amo_data['token']}"}
time = amo_data['updated_last']
page = 1
async with aiohttp.ClientSession(headers=headers) as session:
while True:
url = f"https://{amo_data['account']}/api/v4/leads?order[updated_at]=asc&filter[updated_at][from]={time}&page={page}&limit=250&with=contacts"
async with session.get(url) as response:
if response.status == 200:
amo_json_resp = await response.json(
content_type='application/hal+json')
await session.post(
url="https://bpm-run.ru/api/events?token=042812d12dc17f2c095f1583fdadf955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": str(response.method),
"url": str(response.url),
"status_code": int(response.status),
"request_body": json.dumps({}),
"response": json.dumps(amo_json_resp)
}
)
else:
amo_resp = await response.text()
await session.post(
url="https://bpm-run.ru/api/events?token=042812d12ddf955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": str(response.method),
"url": str(response.url),
"status_code": int(response.status),
"request_body": json.dumps({}),
"response": json.dumps({"error": amo_resp})
}
)
if response.status == 200:
leads = amo_json_resp['_embedded']['leads']
leads_data = []
for lead in leads:
lead['amo_account'] = amo_data['account']
lead['amo_account_id'] = amo_data['account_id']
del lead['_links']
del lead['_embedded']
del lead['score']
del lead['account_id']
if "labor_cost" in lead:
del lead['labor_cost']
if "is_price_computed" in lead:
del lead['is_price_computed']
leads_data.append(lead)
async with self.post_pool() as post_session:
async with post_session.begin():
for lead_obj in leads_data:
db_resp = await post_session.execute(
select(A).where(
A.id == lead_obj[
"id"] and A.amo_account_id ==
lead_obj["amo_account_id"]))
lead = db_resp.scalars().first()
if lead:
await post_session.execute(
update(A).where(
A.id == lead_obj[
"id"] and A.amo_account_id ==
lead_obj[
"amo_account_id"]).values(
lead_obj))
else:
await post_session.execute(
insert(A).values(lead_obj))
page += 1
else:
break
async def update_google(self, spread_data):
body = {"values": []}
try:
fields = (self.build.spreadsheets().values().get(
spreadsheetId=spread_data['spread_id'],
range="Лист2!A1:ZZZ1").execute()).get("values")
except HttpError as e:
print(e)
# timer.sleep(20)
# fields = (self.build.spreadsheets().values().get(spreadsheetId=spread_data['spread_id'],
# range="Лист1!A1:ZZZ1").execute()).get("values")
fields = None
if fields:
flag = fields[0][0]
custom_table_fields = fields[0][7:]
else:
flag = None
custom_table_fields = None
default_headers = ["amo_lead_id", "amo_lead_link", "amo_lead_name",
"amo_lead_status_name", "amo_lead_user",
"amo_contact_name", "amo_contact_phone",
"created_at"]
async with self.post_pool() as session:
async with session.begin():
db_resp = await session.execute(select(A).where(
A.pipeline_id == spread_data[
'pipeline_id'] and A.amo_account_id == spread_data[
'account_id']))
leads = db_resp.scalars()
if leads:
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
for lead in leads:
lead_dict = lead.__dict__
custom_fields_ids = [int(field.get("field_id")) for
field in
lead_dict[
'custom_fields_values']] if \
lead_dict[
'custom_fields_values'] else None
try:
status = next(
item for item in spread_data['statuses'] if
item["id"] == lead_dict["status_id"])
except StopIteration:
status = {"name": ""}
await cur.execute(
f"SELECT name FROM amo_users WHERE amo_user_id={lead_dict['responsible_user_id']}")
user = await cur.fetchone()
if user:
lead_resp_user = user[0]
else:
lead_resp_user = None
await cur.execute(
f"SELECT amo_install_id, amo_contact_id FROM amo_lead_contacts WHERE amo_lead_id={lead_dict['id']}")
contact_rel = await cur.fetchone()
contact_name = None
contact_phone = None
if contact_rel:
await cur.execute(
f"SELECT name, phone FROM amo_contacts WHERE amo_install_id={contact_rel[0]} AND amo_contact_id={contact_rel[1]}")
contact = await cur.fetchone()
if contact:
contact_name = contact[0]
contact_phone = contact[1]
lead_for_google = [lead_dict['id'],
f"https://{lead_dict['amo_account']}/leads/detail/{lead_dict['id']}",
lead_dict['name'],
status.get("name"), lead_resp_user,
contact_name,
contact_phone,
datetime.utcfromtimestamp(
lead_dict['created_at']) \
.strftime('%d.%m.%Y %H:%M:%S')]
for index, check in enumerate(lead_for_google):
if str(check).startswith("+"):
lead_for_google[index] = str(check).replace(
"+", "")
if lead_dict['custom_fields_values']:
if custom_table_fields:
for custom in custom_table_fields:
if len(custom.split(":")) != 1:
custom_id = custom.split(":")[1]
if custom_id.isdigit() and int(
custom_id) in custom_fields_ids:
for c in lead_dict[
'custom_fields_values']:
if int(custom_id) == int(
c.get("field_id")):
value = c.get("values")[
0].get("value")
if value is True:
value = "1"
elif value is False:
value = "0"
elif (
str(value).startswith(
"15") or str(
value).startswith(
"16") or str(
value).startswith(
"17")) and len(
str(value)) == 10:
value = datetime.fromtimestamp(
int(value) + 10800).strftime(
"%d.%m.%Y")
elif str(value).startswith(
"+"):
value = str(
value).replace("+",
"")
else:
value = value
lead_for_google.append(
value)
else:
lead_for_google.append(None)
body['values'].append(lead_for_google)
if custom_table_fields:
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
for custom in custom_table_fields:
if len(custom.split(":")) != 1:
custom_id = custom.split(":")[1]
if custom_id.isdigit() and flag:
if len(flag.split(":")) != 1:
if flag.split(":")[1] == "1":
default_headers[
0] = f"amo_lead_id:{flag.split(':')[1]}"
await cur.execute(
f"SELECT name FROM amo_lead_fields WHERE amo_id={custom_id}")
rus_name = await cur.fetchone()
if rus_name:
default_headers.append(
f"{rus_name[0]}:{custom_id}")
else:
default_headers.append(
f"lead:{custom_id}")
else:
default_headers[
0] = f"amo_lead_id:{flag.split(':')[1]}"
default_headers.append(
f"lead:{custom_id}")
else:
default_headers.append(
f"lead:{custom_id}")
else:
default_headers.append(f"lead:{custom_id}")
body['values'].insert(0, default_headers)
try:
self.build.spreadsheets().values().clear(
spreadsheetId=spread_data['spread_id'],
range="Лист1!A1:Z50000000").execute()
self.build.spreadsheets().values().append(
spreadsheetId=spread_data['spread_id'], range="Лист1!A1",
valueInputOption="USER_ENTERED", body=body).execute()
async with aiohttp.ClientSession() as session_http:
await session_http.post(
url="https://bpm-run.ru/api/events?token=042812d1f955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": "UPDATE",
"url": f"Google-Tables Update {spread_data['spread_id']}",
"status_code": 200,
"request_body": json.dumps(body),
"response": json.dumps({})
}
)
except HttpError as e:
print(e)
timer.sleep(20)
self.build.spreadsheets().values().clear(
spreadsheetId=spread_data['spread_id'],
range="Лист1!A1:Z50000000").execute()
self.build.spreadsheets().values().append(
spreadsheetId=spread_data['spread_id'], range="Лист1!A1",
valueInputOption="USER_ENTERED", body=body).execute()
async with aiohttp.ClientSession() as session_http:
await session_http.post(
url="https://bpm-run.ru/api/events?token=042812d1283fdadf955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": "UPDATE",
"url": f"Google-Tables Update {spread_data['spread_id']}",
"status_code": 200,
"request_body": json.dumps(body),
"response": json.dumps({})
}
)
print(f"Update spreadsheet {spread_data['spread_id']}")
async def send_to_google_update(self, amo_data, exists):
amo_data['updated_last'] = int(exists.updated_at)
await self.update_leads(amo_data)
print(f"Updated {amo_data['account']}")
tasks = []
statuses_list = []
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
f"SELECT statuses FROM amo_lead_pipelines WHERE amo_install_id={amo_data['account_id']}")
status_db = await cur.fetchall()
await cur.execute(
f"SELECT spreadsheet_id, pipeline_id FROM amo_to_sheets WHERE active = 1 AND amo_install_id = {amo_data['account_id']}")
sheets = await cur.fetchall()
for status in status_db:
statuses_list.append(json.loads(status[0]))
statuses = [item for sublist in statuses_list for item in
sublist]
for sheet in sheets:
await cur.execute(
f"SELECT amo_pipeline_id FROM amo_lead_pipelines WHERE id={sheet[1]}")
pipeline = await cur.fetchone()
sheet_data = {"spread_id": sheet[0],
"pipeline_id": pipeline[0],
"statuses": statuses,
"account_id": amo_data['account_id']}
tasks.append(
asyncio.create_task(self.update_google(sheet_data)))
await asyncio.gather(*tasks)
if amo_data['account_id'] == 45:
async with aiohttp.ClientSession() as bpm_session:
await bpm_session.post(
"https://bpm-run.ru/api/hook_d453bbb76672c5e4fb0fd452f",
json={})
await bpm_session.post(
"https://bpm-run.ru/api/hook_b47fbf98400ee34664c3eed15",
json={})
await bpm_session.post(
"https://bpm-run.ru/api/hook_ef849a33337251d419ee53f22",
json={}) # aвтоквалснг2
await bpm_session.close()
async def get_all_tables(self):
await self.first_settings()
start_time = datetime.now()
async with self.mysql_pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(
"SELECT id, spreadsheet_id, pipeline_id, amo_install_id FROM amo_to_sheets WHERE active = 1")
tables = await cur.fetchall()
unique_ids = list(set([row[-1] for row in tables]))
tasks = []
print(f"Unique_ids: {str(unique_ids)}")
async with self.post_pool() as session:
async with session.begin():
for account_id in unique_ids:
await cur.execute(
f"SELECT amo_account, amo_access_token FROM amo_install WHERE id={account_id}")
amo_install = await cur.fetchone()
amo_data = {"account": amo_install[0],
"account_id": account_id,
"token": amo_install[1]}
db_resp = await session.execute(
select(A).where(
A.amo_account_id == account_id).order_by(
desc(A.updated_at)).limit(1))
exists = db_resp.scalars().first()
if exists:
tasks.append(asyncio.create_task(
self.send_to_google_update(amo_data,
exists)))
else:
await self.first_upload(amo_data)
db_resp = await session.execute(
select(A).where(
A.amo_account_id == account_id).order_by(
desc(A.updated_at)).limit(
1))
exists = db_resp.scalars().first()
tasks.append(asyncio.create_task(
self.send_to_google_update(amo_data,
exists)))
await asyncio.gather(*tasks)
print("All tables were updated!")
print(f"Running time: {datetime.now() - start_time}")
async def main():
print(f"Time of script start {datetime.now()}")
async with aiohttp.ClientSession() as session:
await session.post(
url="https://bpm-run.ru/api/events?token=0f955c2034abcd6b1dfdf3a48be2a616e59f1",
json={
"protocol": "START",
"url": "Google-Tables Start",
"status_code": 200,
"request_body": json.dumps({}),
"response": json.dumps({})
}
)
ss = AmoToSheet()
await ss.get_all_tables()
asyncio.run(main())
Editor is loading...
Leave a Comment