Untitled
plain_text
24 days ago
18 kB
2
Indexable
Never
import base64 import json import os import time from blake3 import blake3 import requests import mimetypes import argparse TG_BOT_TOKEN = "" CHAT_ID = -100 def send_log(text): try: url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage" data = { "chat_id": CHAT_ID, "text": f"{text}", "parse_mode": "HTML", } resp = requests.post(url=url, data=data) # Check the response and handle any potential errors if resp.status_code != 200: print(f"Failed to send message. Status code: {resp.status_code}") print(resp.text) # Print the response content for further inspection except Exception as e: print(f"An error occurred: {e}") def is_jwt_expired(token): try: decoded_jwt = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8")) date_now = time.time() decoded_jwt["exp"] = decoded_jwt["exp"]-120 return decoded_jwt["exp"] <= date_now except Exception as e: if isinstance(e, Exception): raise Exception(f"Invalid token: {e}") return False def get_files_in_folder(folder_path): file_list = [] for root, dirs, files in os.walk(folder_path): for file in files: file_list.append(os.path.join(root, file)) return file_list def divide_list(input_list, max_items): divided_lists = [] current_list = [] for item in input_list: if len(current_list) < max_items: current_list.append(item) else: divided_lists.append(current_list) current_list = [item] if current_list: divided_lists.append(current_list) return divided_lists def get_content_type(filename): mime, _ = mimetypes.guess_type(filename) return mime def file_to_base64(filename): try: with open(filename, 'rb') as file: file_content = file.read() base64_content = base64.b64encode(file_content) return base64_content.decode('utf-8') except FileNotFoundError: return "File not found" except Exception as e: print(e) return f"An error occurred: {e}" def hash_file(filepath): with open(filepath, "rb") as file: contents = file.read() base64_contents = base64.b64encode(contents).decode("utf-8") extension = os.path.splitext(filepath)[1][1:] hash_input = base64_contents + extension hash_value = blake3(hash_input.encode("utf-8")).hexdigest() return hash_value[:32] def get_jwt_token(project_name , ac_id , email , token): url = "https://api.cloudflare.com/client/v4/accounts/"+ac_id+"/pages/projects/"+project_name+"/upload-token" headers = { 'X-Auth-Email': email, 'X-Auth-Key': token, 'Content-Type' : 'application/json' } response = requests.request("GET", url, headers=headers) if response.status_code != 200: print("Error: " + response.text) return None for i in range(1,10): try: print("get_jwt_token :: ",response.text) token = response.json()['result']['jwt'] if token is not None and len(token) > 10: return token else: continue except Exception as err: print("get_jwt_token err ::",err) continue print("Failed To get_jwt_token ") exit(0) def isProjectExist(project_name , ac_id , email , token): headers = { 'X-Auth-Email': email, 'X-Auth-Key': token, 'Content-Type' : 'application/json' } response = requests.get( f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects/{project_name}', headers=headers, ) respData = response.json() if respData.get('success'): return True print(respData.get('errors')) return False def createProject(project_name , ac_id , email , token): headers = { 'X-Auth-Email': email, 'X-Auth-Key': token, 'Content-Type' : 'application/json' } json_data = { 'name': project_name, 'production_branch': 'main', } response = requests.post( f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects', headers=headers, json=json_data, ) try: resp = response.json() print(resp) except: print(response.text) resp = {"success":False} return resp def get_files_hash(files : list): hash_s = [] for file_ in files: hash_ = hash_file(file_) hash_s.append(hash_) return hash_s def get_files_hash_and_manifest(files : list): hash_list = [] manifest = {} for file_ in files: hash_ = hash_file(file_) hash_list.append(hash_) asset_name = f"/{os.path.basename(file_)}" manifest[asset_name] = hash_ return {"file_hashes" : hash_list , "manifest" : manifest} def playload_upload_v2(files): file_hashes = [] manifest = {} for file_ in files: hash_ = hash_file(file_) file_hashes.append(hash_) asset_name = f"/{os.path.basename(file_)}" manifest[asset_name] = hash_ return {"file_hashes" : file_hashes , "manifest" : manifest} def generate_playload(files , hash_to_upload): json_data = [] file_hashes = [] manifest = {} for file_ in files: hash_ = hash_file(file_) if hash_ not in hash_to_upload: continue file_hashes.append(hash_) asset_name = f"/{os.path.basename(file_)}" manifest[asset_name] = hash_ json_data.append({ 'base64': True, 'key': hash_, 'value': file_to_base64(file_), 'metadata': { 'contentType': get_content_type(file_), }, }) return {"json_data" : json_data , "file_hashes" : file_hashes , "manifest" : manifest} def upload_assets(json_data ,jwt_token): headers = { "Authorization": "Bearer " + jwt_token, 'Content-Type' : 'application/json' } response = requests.post( 'https://api.cloudflare.com/api/v4/pages/assets/upload', headers=headers, json=json_data, ) try: resp = response.json() print(resp) except: print(response.text) resp = {"success":False} return resp # return file_hashes def upsert_hashes(json_data , jwt_token): headers = { "Content-Type": "application/json", "Authorization": "Bearer " + jwt_token } response = requests.post( 'https://api.cloudflare.com/client/v4/pages/assets/upsert-hashes', headers=headers, json={"hashes":json_data}, ) try: resp = response.json() print(resp) except: print(response.text) resp = {"success":False} return resp def push_deployment(str_json_playload , project_name , ac_id , email , token): str_json_playload = str_json_playload.replace('\\',"/") print("str_json_playload :: ",str_json_playload) data = f'------WebKitFormBoundaryS3GYPpcUKHU4A4n0\r\nContent-Disposition: form-data; name="manifest"\r\n\r\n{str_json_playload}\r\n------WebKitFormBoundaryS3GYPpcUKHU4A4n0--\r\n' headers = { 'X-Auth-Email': email, 'X-Auth-Key': token, 'Content-Type' : 'application/json', 'authority': 'dash.cloudflare.com', 'accept': '*/*', 'accept-language': 'en-US,en;q=0.9', 'content-type': 'multipart/form-data; boundary=----WebKitFormBoundaryS3GYPpcUKHU4A4n0', } response = requests.post( f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects/{project_name}/deployments', headers=headers, data=data, ) try: resp = response.json() print(resp) except: print(response.text) resp = {"success":False} return resp def check_missing(hashes , jwt_token): headers = { "Content-Type": "application/json", "Authorization": "Bearer " + jwt_token } response = requests.post( 'https://api.cloudflare.com/client/v4/pages/assets/check-missing', headers=headers, json={"hashes":hashes}, ) resp = response.json() #print(resp) if resp.get('success') ==True: return resp.get('result',[]) return [] def save_details(file_name, data): json_object = json.dumps(data, indent=4) with open(file_name, "w") as outfile: outfile.write(json_object) def get_saved_details(file_name): try: with open(file_name,'r') as fp: json_object = json.loads(fp.read()) return json_object except Exception as err: print("get_saved_details err :: ",err) return {} if __name__ == "__main__": # Create an ArgumentParser object parser = argparse.ArgumentParser(description='Command-line argument example') # Add arguments with key names parser.add_argument('--accountid', help='Account ID', required=True) parser.add_argument('--project', help='Project Name', required=True) parser.add_argument('--email', help='Email', required=True) parser.add_argument('--token', help='Email Token', required=True) parser.add_argument('--path', help='Folder_Paath', required=True) # Parse the command-line arguments args = parser.parse_args() project_name = args.project token = args.token email = args.email ac_id = args.accountid FOLDER_PATH = args.path NUMBER_OF_FILES_PER_DEPLOY = 100 NUMBER_OF_FILES_PER_REQUEST = 2 MAX_RETRY = 100 RETRY_SLEEP = 0 SLEEP_BEFORE_NEXT_DEPLY = 20 file_name = f"{ac_id}_{project_name}.json" isExist = isProjectExist(project_name,ac_id,email,token) print("isExist :: ",isExist) if not isExist: getSub = createProject(project_name,ac_id , email, token) if getSub.get('success'): print("New Project Created") else: print(getSub) print('Failed To Create New Project') exit(0) print(getSub) # exit(0) files_ = get_files_in_folder(FOLDER_PATH) total = len(files_) # hashes = get_files_hash(files_) dv_list = divide_list(files_, NUMBER_OF_FILES_PER_DEPLOY) current_uploaded = 0 saved_data = get_saved_details(file_name) for i_ , file_dv in enumerate(dv_list, 1): if saved_data.get(str(i_)): current_uploaded = current_uploaded + NUMBER_OF_FILES_PER_DEPLOY print(f"{project_name}\n\nList {i_} Already Processed") continue print(f"List {i_}: {len(file_dv)} | Total : {total} | Left : {total-current_uploaded}") send_log(f"{project_name}\n\nList {i_}: {len(file_dv)} | Total : {total} | Left : {total-current_uploaded}") playload_upload_v2_ = playload_upload_v2(file_dv) file_hashes = playload_upload_v2_.get('file_hashes') manifest = playload_upload_v2_.get('manifest') hashes = file_hashes jwt = get_jwt_token(project_name, ac_id,email, token) print("jwt :: ", jwt) # print(file_dv) # exit(0) hash_to_upload = check_missing(hashes, jwt) if len(hash_to_upload) == 0: print("Found Zero hash_to_upload :: skipping") send_log(f"{project_name}\n\nFound Zero hash_to_upload :: skipping") current_uploaded = current_uploaded + NUMBER_OF_FILES_PER_DEPLOY saved_data[i_] = {"file_hashes":file_hashes, "manifest":manifest} save_details(file_name, saved_data) continue new_file_list = divide_list(file_dv, NUMBER_OF_FILES_PER_REQUEST) for files_for_post in new_file_list: retry = 0 post_playload_upload = generate_playload(files_for_post , hash_to_upload).get('json_data') if len(post_playload_upload) < 1: continue for i in range(MAX_RETRY - 1): retry = retry+1 if is_jwt_expired(jwt): jwt = get_jwt_token(project_name, ac_id,email, token) uploadasset = upload_assets(post_playload_upload , jwt) if not uploadasset.get('success'): #send_log(f"uploadasset err : <code>{str(uploadasset)}</code>") print("uploadasset err : ", uploadasset) print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}") if retry >= MAX_RETRY: send_log(f"uploadasset err stopped script reached max retry {MAX_RETRY}: <code>{str(uploadasset)}</code>") exit(0) if RETRY_SLEEP > 0: time.sleep(RETRY_SLEEP) continue else: current_uploaded = current_uploaded+ NUMBER_OF_FILES_PER_REQUEST print("Uploaded Files : ",current_uploaded," | Total : ",total) #time.sleep(1) break if not uploadasset.get('success'): exit(0) print("uploadasset :: ",uploadasset) retry = 0 for i in range(MAX_RETRY - 1): retry = retry+1 push_deploy = push_deployment(json.dumps(manifest),project_name, ac_id,email,token) if not push_deploy.get('success'): print("push_deploy err : ", push_deploy) send_log(f"push_deploy err : <code>{str(push_deploy)}</code>") print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}") if RETRY_SLEEP > 0: time.sleep(RETRY_SLEEP) continue else: break print("push_deploy :: ",push_deploy) if not push_deploy.get('success'): exit(0) retry = 0 for i in range(MAX_RETRY - 1): retry = retry+1 if is_jwt_expired(jwt): jwt = get_jwt_token(project_name, ac_id,email, token) upsert_hash = upsert_hashes(file_hashes , jwt) if not upsert_hash.get('success'): #print("upsert_hash err : ", upsert_hash) send_log(f"upsert_hash err : <code>{str(upsert_hash)}</code>") print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}") if RETRY_SLEEP > 0: time.sleep(RETRY_SLEEP) continue else: break #print("upsert_hash :: ",upsert_hash) if not upsert_hash.get('success'): exit(0) saved_data[i_] = {"file_hashes":file_hashes, "manifest":manifest} save_details(file_name, saved_data) time.sleep(SLEEP_BEFORE_NEXT_DEPLY) print(" ============================================= \n\n") # print("file_hashes :: ",file_hashes) # print("manifest :: ",manifest) print("====== Pushing All Hashes and Manifest =====") send_log(project_name+"\n\n====== Pushing All Hashes and Manifest =====") manifest = dict() file_hashes = [] for data_ in saved_data: data_ = saved_data[data_] print(data_) file_hashes = file_hashes + data_.get("file_hashes") # manifes = manifes.update(data_.get("manifes")) if manifest is None: manifest = data_.get("manifest") else: manifest.update(data_.get("manifest")) retry = 0 for i in range(MAX_RETRY - 1): retry = retry+1 push_deploy = push_deployment(json.dumps(manifest),project_name, ac_id,email,token) if not push_deploy.get('success'): print("push_deploy err : ", push_deploy) send_log(f"push_deploy err : <code>{str(push_deploy)}</code>") print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}") if RETRY_SLEEP > 0: time.sleep(RETRY_SLEEP) continue else: break print("push_deploy :: ",push_deploy) if not push_deploy.get('success'): send_log(f"all push deploy error") exit(0) retry = 0 for i in range(MAX_RETRY - 1): retry = retry+1 try: if is_jwt_expired(jwt): jwt = get_jwt_token(project_name, ac_id,email, token) except: jwt = get_jwt_token(project_name, ac_id,email, token) upsert_hash = upsert_hashes(file_hashes , jwt) if not upsert_hash.get('success'): #print("upsert_hash err : ", upsert_hash) send_log(f"upsert_hash err : <code>{str(upsert_hash)}</code>") print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}") if RETRY_SLEEP > 0: time.sleep(RETRY_SLEEP) continue else: break #print("upsert_hash :: ",upsert_hash) if not upsert_hash.get('success'): exit(0) send_log(f"{project_name}\n\nAll Done\n\nCheck https://{project_name}.pages.dev") print("Done")