Untitled

 avatar
unknown
python
a year ago
26 kB
5
Indexable
import time as timer
import aiomysql
import asyncio
import aiohttp
import json

from datetime import datetime

from oauth2client.service_account import ServiceAccountCredentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import httplib2

# SQLALCHEMY
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy import Column, BigInteger, Boolean, String, JSON
from sqlalchemy.sql.expression import insert, desc, update
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base, sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://cashbox_user:wAieiwt@94.250.520.135:48425/amo_sheets")
Base = declarative_base()


# MODEL
class A(Base):
    __tablename__ = "amo_leads_v2"

    id = Column(BigInteger)
    our_id = Column(BigInteger, primary_key=True)
    amo_account = Column(String)
    amo_account_id = Column(BigInteger)
    name = Column(String)
    price = Column(BigInteger)
    responsible_user_id = Column(BigInteger)
    group_id = Column(BigInteger)
    status_id = Column(BigInteger)
    pipeline_id = Column(BigInteger)
    loss_reason_id = Column(BigInteger)
    created_by = Column(BigInteger)
    updated_by = Column(BigInteger)
    created_at = Column(BigInteger)
    updated_at = Column(BigInteger)
    closed_at = Column(BigInteger)
    closest_task_at = Column(BigInteger)
    is_deleted = Column(Boolean)
    custom_fields_values = Column(JSON)


MYSQL_DATA = {
    "user": 'user_agent_master',
    "password": 'ygzl2SMaMN8Te5I0QOGnO34unTO',
    "host": '195.140.146.20',
    "db": "user_agent_master",
    "port": 3306
}


class AmoToSheet:
    async def first_settings(self):
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

        self.post_pool = sessionmaker(
            engine, expire_on_commit=False, class_=AsyncSession
        )
        self.mysql_pool = await aiomysql.create_pool(**MYSQL_DATA)

        scopes = ['https://www.googleapis.com/auth/spreadsheets']

        creds_service = ServiceAccountCredentials.from_json_keyfile_dict(
            creds_json, scopes).authorize(httplib2.Http())
        self.build = build('sheets', 'v4', http=creds_service)

    async def first_upload(self, amo_data):
        print(f"First db upload for {amo_data['account']} is started")
        headers = {"Authorization": f"Bearer {amo_data['token']}"}
        page = 1
        time_s = datetime.now()
        async with aiohttp.ClientSession(headers=headers) as session:
            while True:
                timer.sleep(1)
                url = f"https://{amo_data['account']}/api/v4/leads?limit=250&page={page}&order[created_at]=asc"
                async with session.get(url) as response:
                    if response.status == 200:
                        amo_json_resp = await response.json(
                            content_type='application/hal+json')
                        leads = amo_json_resp['_embedded']['leads']
                        leads_data = []
                        for lead in leads:
                            lead['amo_account'] = amo_data['account']
                            lead['amo_account_id'] = amo_data['account_id']
                            del lead['_links']
                            del lead['_embedded']
                            del lead['score']
                            del lead['account_id']
                            if "labor_cost" in lead:
                                del lead['labor_cost']
                            if "is_price_computed" in lead:
                                del lead['is_price_computed']
                            leads_data.append(lead)
                        async with self.post_pool() as post_session:
                            async with post_session.begin():
                                await post_session.execute(
                                    insert(A).values(leads_data))
                                print(f"{page} - inserted")
                                page += 1
                        await session.post(
                            url="https://bpm-run.ru/api/events?token=042812d1df955c2034abcd6b1dfdf3a48be2a616e59f1",
                            json={
                                "protocol": str(response.method),
                                "url": str(response.url),
                                "status_code": int(response.status),
                                "request_body": json.dumps({}),
                                "response": json.dumps(amo_json_resp)
                            }
                        )
                    else:
                        print(
                            f"First upload completed for {amo_data['account']}")
                        print(datetime.now() - time_s)
                        amo_resp = await response.text()
                        await session.post(
                            url="https://bpm-run.ru/api/events?token=042812d12dc17f2955c2034abcd6b1dfdf3a48be2a616e59f1",
                            json={
                                "protocol": str(response.method),
                                "url": str(response.url),
                                "status_code": int(response.status),
                                "request_body": json.dumps({}),
                                "response": json.dumps({"error": amo_resp})
                            }
                        )
                        break

    async def update_leads(self, amo_data):
        print(f"Update for {amo_data['account']} is started")
        headers = {"Authorization": f"Bearer {amo_data['token']}"}
        time = amo_data['updated_last']
        page = 1
        async with aiohttp.ClientSession(headers=headers) as session:
            while True:
                url = f"https://{amo_data['account']}/api/v4/leads?order[updated_at]=asc&filter[updated_at][from]={time}&page={page}&limit=250&with=contacts"
                async with session.get(url) as response:
                    if response.status == 200:
                        amo_json_resp = await response.json(
                            content_type='application/hal+json')
                        await session.post(
                            url="https://bpm-run.ru/api/events?token=042812d12dc17f2c095f1583fdadf955c2034abcd6b1dfdf3a48be2a616e59f1",
                            json={
                                "protocol": str(response.method),
                                "url": str(response.url),
                                "status_code": int(response.status),
                                "request_body": json.dumps({}),
                                "response": json.dumps(amo_json_resp)
                            }
                        )
                    else:
                        amo_resp = await response.text()
                        await session.post(
                            url="https://bpm-run.ru/api/events?token=042812d12ddf955c2034abcd6b1dfdf3a48be2a616e59f1",
                            json={
                                "protocol": str(response.method),
                                "url": str(response.url),
                                "status_code": int(response.status),
                                "request_body": json.dumps({}),
                                "response": json.dumps({"error": amo_resp})
                            }
                        )
                    if response.status == 200:
                        leads = amo_json_resp['_embedded']['leads']
                        leads_data = []
                        for lead in leads:
                            lead['amo_account'] = amo_data['account']
                            lead['amo_account_id'] = amo_data['account_id']
                            del lead['_links']
                            del lead['_embedded']
                            del lead['score']
                            del lead['account_id']
                            if "labor_cost" in lead:
                                del lead['labor_cost']
                            if "is_price_computed" in lead:
                                del lead['is_price_computed']
                            leads_data.append(lead)
                        async with self.post_pool() as post_session:
                            async with post_session.begin():
                                for lead_obj in leads_data:
                                    db_resp = await post_session.execute(
                                        select(A).where(
                                            A.id == lead_obj[
                                                "id"] and A.amo_account_id ==
                                            lead_obj["amo_account_id"]))
                                    lead = db_resp.scalars().first()
                                    if lead:
                                        await post_session.execute(
                                            update(A).where(
                                                A.id == lead_obj[
                                                    "id"] and A.amo_account_id ==
                                                lead_obj[
                                                    "amo_account_id"]).values(
                                                lead_obj))
                                    else:
                                        await post_session.execute(
                                            insert(A).values(lead_obj))
                                page += 1
                    else:
                        break

    async def update_google(self, spread_data):
        body = {"values": []}

        try:
            fields = (self.build.spreadsheets().values().get(
                spreadsheetId=spread_data['spread_id'],
                range="Лист2!A1:ZZZ1").execute()).get("values")
        except HttpError as e:
            print(e)
            # timer.sleep(20)
            # fields = (self.build.spreadsheets().values().get(spreadsheetId=spread_data['spread_id'],
            #                                                  range="Лист1!A1:ZZZ1").execute()).get("values")
            fields = None

        if fields:
            flag = fields[0][0]
            custom_table_fields = fields[0][7:]

        else:
            flag = None
            custom_table_fields = None

        default_headers = ["amo_lead_id", "amo_lead_link", "amo_lead_name",
                           "amo_lead_status_name", "amo_lead_user",
                           "amo_contact_name", "amo_contact_phone",
                           "created_at"]

        async with self.post_pool() as session:
            async with session.begin():
                db_resp = await session.execute(select(A).where(
                    A.pipeline_id == spread_data[
                        'pipeline_id'] and A.amo_account_id == spread_data[
                        'account_id']))
                leads = db_resp.scalars()

        if leads:
            async with self.mysql_pool.acquire() as conn:
                async with conn.cursor() as cur:
                    for lead in leads:
                        lead_dict = lead.__dict__
                        custom_fields_ids = [int(field.get("field_id")) for
                                             field in
                                             lead_dict[
                                                 'custom_fields_values']] if \
                        lead_dict[
                            'custom_fields_values'] else None

                        try:
                            status = next(
                                item for item in spread_data['statuses'] if
                                item["id"] == lead_dict["status_id"])
                        except StopIteration:
                            status = {"name": ""}

                        await cur.execute(
                            f"SELECT name FROM amo_users WHERE amo_user_id={lead_dict['responsible_user_id']}")
                        user = await cur.fetchone()

                        if user:
                            lead_resp_user = user[0]
                        else:
                            lead_resp_user = None

                        await cur.execute(
                            f"SELECT amo_install_id, amo_contact_id FROM amo_lead_contacts WHERE amo_lead_id={lead_dict['id']}")
                        contact_rel = await cur.fetchone()

                        contact_name = None
                        contact_phone = None

                        if contact_rel:
                            await cur.execute(
                                f"SELECT name, phone FROM amo_contacts WHERE amo_install_id={contact_rel[0]} AND amo_contact_id={contact_rel[1]}")
                            contact = await cur.fetchone()

                            if contact:
                                contact_name = contact[0]
                                contact_phone = contact[1]

                        lead_for_google = [lead_dict['id'],
                                           f"https://{lead_dict['amo_account']}/leads/detail/{lead_dict['id']}",
                                           lead_dict['name'],
                                           status.get("name"), lead_resp_user,
                                           contact_name,
                                           contact_phone,
                                           datetime.utcfromtimestamp(
                                               lead_dict['created_at']) \
                                               .strftime('%d.%m.%Y %H:%M:%S')]

                        for index, check in enumerate(lead_for_google):
                            if str(check).startswith("+"):
                                lead_for_google[index] = str(check).replace(
                                    "+", "")

                        if lead_dict['custom_fields_values']:
                            if custom_table_fields:
                                for custom in custom_table_fields:
                                    if len(custom.split(":")) != 1:
                                        custom_id = custom.split(":")[1]
                                        if custom_id.isdigit() and int(
                                                custom_id) in custom_fields_ids:
                                            for c in lead_dict[
                                                'custom_fields_values']:
                                                if int(custom_id) == int(
                                                        c.get("field_id")):
                                                    value = c.get("values")[
                                                        0].get("value")
                                                    if value is True:
                                                        value = "1"
                                                    elif value is False:
                                                        value = "0"
                                                    elif (
                                                            str(value).startswith(
                                                                    "15") or str(
                                                            value).startswith(
                                                            "16") or str(
                                                        value).startswith(
                                                        "17")) and len(
                                                        str(value)) == 10:
                                                        value = datetime.fromtimestamp(
                                                            int(value) + 10800).strftime(
                                                            "%d.%m.%Y")
                                                    elif str(value).startswith(
                                                            "+"):
                                                        value = str(
                                                            value).replace("+",
                                                                           "")
                                                    else:
                                                        value = value
                                                    lead_for_google.append(
                                                        value)
                                        else:
                                            lead_for_google.append(None)

                        body['values'].append(lead_for_google)

            if custom_table_fields:
                async with self.mysql_pool.acquire() as conn:
                    async with conn.cursor() as cur:
                        for custom in custom_table_fields:
                            if len(custom.split(":")) != 1:
                                custom_id = custom.split(":")[1]
                                if custom_id.isdigit() and flag:
                                    if len(flag.split(":")) != 1:
                                        if flag.split(":")[1] == "1":
                                            default_headers[
                                                0] = f"amo_lead_id:{flag.split(':')[1]}"

                                            await cur.execute(
                                                f"SELECT name FROM amo_lead_fields WHERE amo_id={custom_id}")
                                            rus_name = await cur.fetchone()
                                            if rus_name:
                                                default_headers.append(
                                                    f"{rus_name[0]}:{custom_id}")
                                            else:
                                                default_headers.append(
                                                    f"lead:{custom_id}")
                                        else:
                                            default_headers[
                                                0] = f"amo_lead_id:{flag.split(':')[1]}"
                                            default_headers.append(
                                                f"lead:{custom_id}")
                                    else:
                                        default_headers.append(
                                            f"lead:{custom_id}")
                                else:
                                    default_headers.append(f"lead:{custom_id}")

            body['values'].insert(0, default_headers)

        try:
            self.build.spreadsheets().values().clear(
                spreadsheetId=spread_data['spread_id'],
                range="Лист1!A1:Z50000000").execute()
            self.build.spreadsheets().values().append(
                spreadsheetId=spread_data['spread_id'], range="Лист1!A1",
                valueInputOption="USER_ENTERED", body=body).execute()

            async with aiohttp.ClientSession() as session_http:
                await session_http.post(
                    url="https://bpm-run.ru/api/events?token=042812d1f955c2034abcd6b1dfdf3a48be2a616e59f1",
                    json={
                        "protocol": "UPDATE",
                        "url": f"Google-Tables Update {spread_data['spread_id']}",
                        "status_code": 200,
                        "request_body": json.dumps(body),
                        "response": json.dumps({})
                    }
                )
        except HttpError as e:
            print(e)
            timer.sleep(20)
            self.build.spreadsheets().values().clear(
                spreadsheetId=spread_data['spread_id'],
                range="Лист1!A1:Z50000000").execute()
            self.build.spreadsheets().values().append(
                spreadsheetId=spread_data['spread_id'], range="Лист1!A1",
                valueInputOption="USER_ENTERED", body=body).execute()

            async with aiohttp.ClientSession() as session_http:
                await session_http.post(
                    url="https://bpm-run.ru/api/events?token=042812d1283fdadf955c2034abcd6b1dfdf3a48be2a616e59f1",
                    json={
                        "protocol": "UPDATE",
                        "url": f"Google-Tables Update {spread_data['spread_id']}",
                        "status_code": 200,
                        "request_body": json.dumps(body),
                        "response": json.dumps({})
                    }
                )

        print(f"Update spreadsheet {spread_data['spread_id']}")

    async def send_to_google_update(self, amo_data, exists):
        amo_data['updated_last'] = int(exists.updated_at)
        await self.update_leads(amo_data)
        print(f"Updated {amo_data['account']}")

        tasks = []
        statuses_list = []

        async with self.mysql_pool.acquire() as conn:
            async with conn.cursor() as cur:

                await cur.execute(
                    f"SELECT statuses FROM amo_lead_pipelines WHERE amo_install_id={amo_data['account_id']}")
                status_db = await cur.fetchall()

                await cur.execute(
                    f"SELECT spreadsheet_id, pipeline_id FROM amo_to_sheets WHERE active = 1 AND amo_install_id = {amo_data['account_id']}")
                sheets = await cur.fetchall()

                for status in status_db:
                    statuses_list.append(json.loads(status[0]))
                statuses = [item for sublist in statuses_list for item in
                            sublist]

                for sheet in sheets:
                    await cur.execute(
                        f"SELECT amo_pipeline_id FROM amo_lead_pipelines WHERE id={sheet[1]}")
                    pipeline = await cur.fetchone()

                    sheet_data = {"spread_id": sheet[0],
                                  "pipeline_id": pipeline[0],
                                  "statuses": statuses,
                                  "account_id": amo_data['account_id']}

                    tasks.append(
                        asyncio.create_task(self.update_google(sheet_data)))

        await asyncio.gather(*tasks)

        if amo_data['account_id'] == 45:
            async with aiohttp.ClientSession() as bpm_session:
                await bpm_session.post(
                    "https://bpm-run.ru/api/hook_d453bbb76672c5e4fb0fd452f",
                    json={})
                await bpm_session.post(
                    "https://bpm-run.ru/api/hook_b47fbf98400ee34664c3eed15",
                    json={})
                await bpm_session.post(
                    "https://bpm-run.ru/api/hook_ef849a33337251d419ee53f22",
                    json={})  # aвтоквалснг2
                await bpm_session.close()

    async def get_all_tables(self):
        await self.first_settings()
        start_time = datetime.now()
        async with self.mysql_pool.acquire() as conn:
            async with conn.cursor() as cur:

                await cur.execute(
                    "SELECT id, spreadsheet_id, pipeline_id, amo_install_id FROM amo_to_sheets WHERE active = 1")
                tables = await cur.fetchall()

                unique_ids = list(set([row[-1] for row in tables]))
                tasks = []
                print(f"Unique_ids: {str(unique_ids)}")
                async with self.post_pool() as session:
                    async with session.begin():
                        for account_id in unique_ids:
                            await cur.execute(
                                f"SELECT amo_account, amo_access_token FROM amo_install WHERE id={account_id}")
                            amo_install = await cur.fetchone()

                            amo_data = {"account": amo_install[0],
                                        "account_id": account_id,
                                        "token": amo_install[1]}

                            db_resp = await session.execute(
                                select(A).where(
                                    A.amo_account_id == account_id).order_by(
                                    desc(A.updated_at)).limit(1))
                            exists = db_resp.scalars().first()

                            if exists:
                                tasks.append(asyncio.create_task(
                                    self.send_to_google_update(amo_data,
                                                               exists)))
                            else:
                                await self.first_upload(amo_data)
                                db_resp = await session.execute(
                                    select(A).where(
                                        A.amo_account_id == account_id).order_by(
                                        desc(A.updated_at)).limit(
                                        1))
                                exists = db_resp.scalars().first()
                                tasks.append(asyncio.create_task(
                                    self.send_to_google_update(amo_data,
                                                               exists)))

                await asyncio.gather(*tasks)
                print("All tables were updated!")
                print(f"Running time: {datetime.now() - start_time}")


async def main():
    print(f"Time of script start {datetime.now()}")
    async with aiohttp.ClientSession() as session:
        await session.post(
            url="https://bpm-run.ru/api/events?token=0f955c2034abcd6b1dfdf3a48be2a616e59f1",
            json={
                "protocol": "START",
                "url": "Google-Tables Start",
                "status_code": 200,
                "request_body": json.dumps({}),
                "response": json.dumps({})
            }
        )
    ss = AmoToSheet()
    await ss.get_all_tables()


asyncio.run(main())
Editor is loading...
Leave a Comment