Untitled

mail@pastecode.io avatarunknown
plain_text
24 days ago
18 kB
2
Indexable
Never
import base64
import json
import os
import time
from blake3 import blake3
import requests
import mimetypes
import argparse

TG_BOT_TOKEN = ""
CHAT_ID = -100
def send_log(text):
    try:
        url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
        data = {
            "chat_id": CHAT_ID,
            "text": f"{text}",
            "parse_mode": "HTML",
        }
        resp = requests.post(url=url, data=data)
        
        # Check the response and handle any potential errors
        if resp.status_code != 200:
            print(f"Failed to send message. Status code: {resp.status_code}")
            print(resp.text)  # Print the response content for further inspection
    except Exception as e:
        print(f"An error occurred: {e}")

def is_jwt_expired(token):
    try:
        decoded_jwt = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8"))
        date_now = time.time()
        decoded_jwt["exp"] = decoded_jwt["exp"]-120
        return decoded_jwt["exp"] <= date_now
    except Exception as e:
        if isinstance(e, Exception):
            raise Exception(f"Invalid token: {e}")
        return False
    
def get_files_in_folder(folder_path):
    file_list = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            file_list.append(os.path.join(root, file))
    return file_list


def divide_list(input_list, max_items):
    divided_lists = []
    current_list = []

    for item in input_list:
        if len(current_list) < max_items:
            current_list.append(item)
        else:
            divided_lists.append(current_list)
            current_list = [item]

    if current_list:
        divided_lists.append(current_list)

    return divided_lists

def get_content_type(filename):
    mime, _ = mimetypes.guess_type(filename)
    return mime

def file_to_base64(filename):
    try:
        with open(filename, 'rb') as file:
            file_content = file.read()
            base64_content = base64.b64encode(file_content)
            return base64_content.decode('utf-8')
    except FileNotFoundError:
        return "File not found"
    except Exception as e:
        print(e)
        return f"An error occurred: {e}"
    

def hash_file(filepath):
    with open(filepath, "rb") as file:
        contents = file.read()
        base64_contents = base64.b64encode(contents).decode("utf-8")
        extension = os.path.splitext(filepath)[1][1:]
        
        hash_input = base64_contents + extension
        hash_value = blake3(hash_input.encode("utf-8")).hexdigest()
        
        return hash_value[:32]

def get_jwt_token(project_name , ac_id , email , token):
    url = "https://api.cloudflare.com/client/v4/accounts/"+ac_id+"/pages/projects/"+project_name+"/upload-token"
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': token,
        'Content-Type' : 'application/json'
    }
    response = requests.request("GET", url, headers=headers)
    if response.status_code != 200:
        print("Error: " + response.text)
        return None
    for i in range(1,10):
        try:
            print("get_jwt_token :: ",response.text)
            token = response.json()['result']['jwt']
            if token is not None and len(token) > 10:
                return token
            else:
                continue
        except Exception as err:
            print("get_jwt_token err ::",err)
            continue
    print("Failed To get_jwt_token ")
    exit(0)

def isProjectExist(project_name ,  ac_id , email , token):
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': token,
        'Content-Type' : 'application/json'
    }
    response = requests.get(
        f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects/{project_name}',
        headers=headers,
    )
    respData = response.json()
    if respData.get('success'):
        return True
    print(respData.get('errors'))
    return False

def createProject(project_name , ac_id , email , token):
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': token,
        'Content-Type' : 'application/json'
    }

    json_data = {
        'name': project_name,
        'production_branch': 'main',
    }

    response = requests.post(
        f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects',
        headers=headers,
        json=json_data,
    )
    try:
        resp = response.json()
        print(resp)
    except:
        print(response.text)
        resp = {"success":False}
    return resp

def get_files_hash(files : list):
    hash_s = []
    for file_ in files:
        hash_ = hash_file(file_)
        hash_s.append(hash_)
    return hash_s

def get_files_hash_and_manifest(files : list):
    hash_list = []
    manifest = {}
    for file_ in files:
        hash_ = hash_file(file_)
        hash_list.append(hash_)
        asset_name = f"/{os.path.basename(file_)}"
        manifest[asset_name] = hash_

    return {"file_hashes" : hash_list , "manifest" : manifest}

def playload_upload_v2(files):
    file_hashes = []
    manifest = {}
    for file_ in files:
        hash_ = hash_file(file_)
        file_hashes.append(hash_)
        asset_name = f"/{os.path.basename(file_)}"
        manifest[asset_name] = hash_
    return {"file_hashes" : file_hashes , "manifest" : manifest}

def generate_playload(files , hash_to_upload):
    json_data = []
    file_hashes = []
    manifest = {}
    for file_ in files:
        hash_ = hash_file(file_)
        if hash_ not in hash_to_upload:
            continue
        file_hashes.append(hash_)
        asset_name = f"/{os.path.basename(file_)}"
        manifest[asset_name] = hash_
        json_data.append({
            'base64': True,
            'key': hash_,
            'value': file_to_base64(file_),
            'metadata': {
                'contentType': get_content_type(file_),
            },
        })
    return {"json_data" : json_data , "file_hashes" : file_hashes , "manifest" : manifest}

def upload_assets(json_data ,jwt_token):
    headers = {
        "Authorization": "Bearer " + jwt_token,
        'Content-Type' : 'application/json'
    }
    
    response = requests.post(
        'https://api.cloudflare.com/api/v4/pages/assets/upload',
        headers=headers,
        json=json_data,
    )
    try:
        resp = response.json()
        print(resp)
    except:
        print(response.text)
        resp = {"success":False}
    return resp
    # return file_hashes

def upsert_hashes(json_data , jwt_token):
    headers = {
        "Content-Type": "application/json",
        "Authorization": "Bearer " + jwt_token
    }
    response = requests.post(
        'https://api.cloudflare.com/client/v4/pages/assets/upsert-hashes',
        headers=headers,
        json={"hashes":json_data},
    )
    try:
        resp = response.json()
        print(resp)
    except:
        print(response.text)
        resp = {"success":False}
    return resp

def push_deployment(str_json_playload , project_name , ac_id , email , token):
    str_json_playload = str_json_playload.replace('\\',"/")
    print("str_json_playload :: ",str_json_playload)
    data = f'------WebKitFormBoundaryS3GYPpcUKHU4A4n0\r\nContent-Disposition: form-data; name="manifest"\r\n\r\n{str_json_playload}\r\n------WebKitFormBoundaryS3GYPpcUKHU4A4n0--\r\n'
    
    headers = {
        'X-Auth-Email': email,
        'X-Auth-Key': token,
        'Content-Type' : 'application/json',
        'authority': 'dash.cloudflare.com',
        'accept': '*/*',
        'accept-language': 'en-US,en;q=0.9',
        'content-type': 'multipart/form-data; boundary=----WebKitFormBoundaryS3GYPpcUKHU4A4n0',

    }

    response = requests.post(
        f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects/{project_name}/deployments',
        headers=headers,
        data=data,
    )
    try:
        resp = response.json()
        print(resp)
    except:
        print(response.text)
        resp = {"success":False}
    return resp
    

def check_missing(hashes , jwt_token):
    headers = {
        "Content-Type": "application/json",
        "Authorization": "Bearer " + jwt_token
    }
    response = requests.post(
        'https://api.cloudflare.com/client/v4/pages/assets/check-missing',
        headers=headers,
        json={"hashes":hashes},
    )
    resp = response.json()
    #print(resp)
    if resp.get('success') ==True:
        return resp.get('result',[])
    return [] 

def save_details(file_name, data):
    json_object = json.dumps(data, indent=4)
    with open(file_name, "w") as outfile:
        outfile.write(json_object)

def get_saved_details(file_name):
    try:
        with open(file_name,'r') as fp:
            json_object = json.loads(fp.read())
        return json_object
    except Exception as err:
        print("get_saved_details  err :: ",err)
        return {}

if __name__ == "__main__":

    # Create an ArgumentParser object
    parser = argparse.ArgumentParser(description='Command-line argument example')

    # Add arguments with key names
    parser.add_argument('--accountid', help='Account ID', required=True)
    parser.add_argument('--project', help='Project Name', required=True)
    parser.add_argument('--email', help='Email', required=True)
    parser.add_argument('--token', help='Email Token', required=True)
    parser.add_argument('--path', help='Folder_Paath', required=True)

    # Parse the command-line arguments
    args = parser.parse_args()

    project_name = args.project
    token = args.token
    email = args.email
    ac_id = args.accountid
    

    FOLDER_PATH = args.path
    NUMBER_OF_FILES_PER_DEPLOY = 100
    NUMBER_OF_FILES_PER_REQUEST = 2
    MAX_RETRY = 100
    RETRY_SLEEP  = 0
    SLEEP_BEFORE_NEXT_DEPLY = 20

    file_name = f"{ac_id}_{project_name}.json"

   
    isExist = isProjectExist(project_name,ac_id,email,token)
    print("isExist :: ",isExist)
    if not isExist:
        getSub = createProject(project_name,ac_id ,  email, token)
        if getSub.get('success'):
            print("New Project Created")
        else:
            print(getSub)
            print('Failed To Create New Project')
            exit(0)
        print(getSub)
    # exit(0)
    files_ = get_files_in_folder(FOLDER_PATH)
    
    total = len(files_)
    # hashes = get_files_hash(files_)
    dv_list = divide_list(files_, NUMBER_OF_FILES_PER_DEPLOY)
    current_uploaded = 0
    
    saved_data = get_saved_details(file_name)
    for i_ , file_dv in enumerate(dv_list, 1):
        if saved_data.get(str(i_)):
            current_uploaded = current_uploaded + NUMBER_OF_FILES_PER_DEPLOY
            print(f"{project_name}\n\nList {i_} Already Processed")
            continue
        print(f"List {i_}: {len(file_dv)} | Total : {total} | Left : {total-current_uploaded}")
        send_log(f"{project_name}\n\nList {i_}: {len(file_dv)} | Total : {total} | Left : {total-current_uploaded}")

        playload_upload_v2_ = playload_upload_v2(file_dv)
        file_hashes = playload_upload_v2_.get('file_hashes')
        manifest = playload_upload_v2_.get('manifest')

        hashes = file_hashes

        jwt = get_jwt_token(project_name, ac_id,email, token)
        print("jwt :: ", jwt)

        # print(file_dv)
        # exit(0)

        hash_to_upload = check_missing(hashes, jwt)
        
        if len(hash_to_upload) == 0:
            print("Found Zero hash_to_upload :: skipping")
            send_log(f"{project_name}\n\nFound Zero hash_to_upload :: skipping")
            current_uploaded = current_uploaded + NUMBER_OF_FILES_PER_DEPLOY
            saved_data[i_] = {"file_hashes":file_hashes, "manifest":manifest}
            save_details(file_name, saved_data)
            continue

        new_file_list = divide_list(file_dv, NUMBER_OF_FILES_PER_REQUEST)
        for files_for_post in new_file_list:
            retry = 0
            post_playload_upload = generate_playload(files_for_post , hash_to_upload).get('json_data')
            if len(post_playload_upload) < 1:
                continue
            for i in range(MAX_RETRY - 1):
                retry = retry+1
                if is_jwt_expired(jwt):
                    jwt = get_jwt_token(project_name, ac_id,email, token)
                uploadasset = upload_assets(post_playload_upload , jwt)
                if not uploadasset.get('success'):
                    #send_log(f"uploadasset err : <code>{str(uploadasset)}</code>")
                    print("uploadasset err : ", uploadasset)
                    print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")

                    if retry >= MAX_RETRY:
                        send_log(f"uploadasset err stopped script reached max retry  {MAX_RETRY}: <code>{str(uploadasset)}</code>")
                        exit(0)
                    if RETRY_SLEEP > 0:
                        time.sleep(RETRY_SLEEP)
                    continue
                else:
                    current_uploaded = current_uploaded+ NUMBER_OF_FILES_PER_REQUEST
                    print("Uploaded Files : ",current_uploaded," | Total : ",total)
                    #time.sleep(1)
                    break
        
        if not uploadasset.get('success'):
            
            exit(0)

        print("uploadasset :: ",uploadasset)
        retry = 0
        for i in range(MAX_RETRY - 1):
            retry = retry+1
            push_deploy = push_deployment(json.dumps(manifest),project_name, ac_id,email,token)
            if not push_deploy.get('success'):
                print("push_deploy err : ", push_deploy)
                send_log(f"push_deploy err : <code>{str(push_deploy)}</code>")
                print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
                if RETRY_SLEEP > 0:
                    time.sleep(RETRY_SLEEP)
                continue
            else:
                break
 
        print("push_deploy :: ",push_deploy)
 
        if not push_deploy.get('success'):
            exit(0)
 
 
        retry = 0
        for i in range(MAX_RETRY - 1):
            retry = retry+1
            if is_jwt_expired(jwt):
                jwt = get_jwt_token(project_name, ac_id,email, token)
            upsert_hash = upsert_hashes(file_hashes , jwt)
            if not upsert_hash.get('success'):
                #print("upsert_hash err : ", upsert_hash)
                send_log(f"upsert_hash err : <code>{str(upsert_hash)}</code>")
                print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
                if RETRY_SLEEP > 0:
                    time.sleep(RETRY_SLEEP)
                continue
            else:
                break
        
        #print("upsert_hash :: ",upsert_hash)
        if not upsert_hash.get('success'):
            exit(0)
        saved_data[i_] = {"file_hashes":file_hashes, "manifest":manifest}
        save_details(file_name, saved_data)
        time.sleep(SLEEP_BEFORE_NEXT_DEPLY)
        print(" ============================================= \n\n")
        # print("file_hashes :: ",file_hashes)
        # print("manifest :: ",manifest)
    print("====== Pushing All Hashes and Manifest =====")
    send_log(project_name+"\n\n====== Pushing All Hashes and Manifest =====")
    manifest = dict()
    file_hashes = []

    for data_ in saved_data:
        data_ = saved_data[data_]
        print(data_)
        file_hashes = file_hashes + data_.get("file_hashes")
        # manifes = manifes.update(data_.get("manifes"))
        if manifest is None:
            manifest = data_.get("manifest")
        else:
            manifest.update(data_.get("manifest"))

    retry = 0
    for i in range(MAX_RETRY - 1):
        retry = retry+1
        push_deploy = push_deployment(json.dumps(manifest),project_name, ac_id,email,token)
        if not push_deploy.get('success'):
            print("push_deploy err : ", push_deploy)
            send_log(f"push_deploy err : <code>{str(push_deploy)}</code>")
            print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
            if RETRY_SLEEP > 0:
                time.sleep(RETRY_SLEEP)
            continue
        else:
            break

    print("push_deploy :: ",push_deploy)

    if not push_deploy.get('success'):
        send_log(f"all push deploy error")
        exit(0)


    retry = 0
    for i in range(MAX_RETRY - 1):
        retry = retry+1
        try:
            if is_jwt_expired(jwt):
                jwt = get_jwt_token(project_name, ac_id,email, token)
        except:
            jwt = get_jwt_token(project_name, ac_id,email, token)
        upsert_hash = upsert_hashes(file_hashes , jwt)
        if not upsert_hash.get('success'):
            #print("upsert_hash err : ", upsert_hash)
            send_log(f"upsert_hash err : <code>{str(upsert_hash)}</code>")
            print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
            if RETRY_SLEEP > 0:
                time.sleep(RETRY_SLEEP)
            continue
        else:
            break
    
    #print("upsert_hash :: ",upsert_hash)
    if not upsert_hash.get('success'):
        exit(0)

send_log(f"{project_name}\n\nAll Done\n\nCheck https://{project_name}.pages.dev")
print("Done")