Untitled
unknown
plain_text
2 years ago
18 kB
11
Indexable
import base64
import json
import os
import time
from blake3 import blake3
import requests
import mimetypes
import argparse
TG_BOT_TOKEN = ""
CHAT_ID = -100
def send_log(text):
try:
url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/sendMessage"
data = {
"chat_id": CHAT_ID,
"text": f"{text}",
"parse_mode": "HTML",
}
resp = requests.post(url=url, data=data)
# Check the response and handle any potential errors
if resp.status_code != 200:
print(f"Failed to send message. Status code: {resp.status_code}")
print(resp.text) # Print the response content for further inspection
except Exception as e:
print(f"An error occurred: {e}")
def is_jwt_expired(token):
try:
decoded_jwt = json.loads(base64.b64decode(token.split(".")[1] + "===").decode("utf-8"))
date_now = time.time()
decoded_jwt["exp"] = decoded_jwt["exp"]-120
return decoded_jwt["exp"] <= date_now
except Exception as e:
if isinstance(e, Exception):
raise Exception(f"Invalid token: {e}")
return False
def get_files_in_folder(folder_path):
file_list = []
for root, dirs, files in os.walk(folder_path):
for file in files:
file_list.append(os.path.join(root, file))
return file_list
def divide_list(input_list, max_items):
divided_lists = []
current_list = []
for item in input_list:
if len(current_list) < max_items:
current_list.append(item)
else:
divided_lists.append(current_list)
current_list = [item]
if current_list:
divided_lists.append(current_list)
return divided_lists
def get_content_type(filename):
mime, _ = mimetypes.guess_type(filename)
return mime
def file_to_base64(filename):
try:
with open(filename, 'rb') as file:
file_content = file.read()
base64_content = base64.b64encode(file_content)
return base64_content.decode('utf-8')
except FileNotFoundError:
return "File not found"
except Exception as e:
print(e)
return f"An error occurred: {e}"
def hash_file(filepath):
with open(filepath, "rb") as file:
contents = file.read()
base64_contents = base64.b64encode(contents).decode("utf-8")
extension = os.path.splitext(filepath)[1][1:]
hash_input = base64_contents + extension
hash_value = blake3(hash_input.encode("utf-8")).hexdigest()
return hash_value[:32]
def get_jwt_token(project_name , ac_id , email , token):
url = "https://api.cloudflare.com/client/v4/accounts/"+ac_id+"/pages/projects/"+project_name+"/upload-token"
headers = {
'X-Auth-Email': email,
'X-Auth-Key': token,
'Content-Type' : 'application/json'
}
response = requests.request("GET", url, headers=headers)
if response.status_code != 200:
print("Error: " + response.text)
return None
for i in range(1,10):
try:
print("get_jwt_token :: ",response.text)
token = response.json()['result']['jwt']
if token is not None and len(token) > 10:
return token
else:
continue
except Exception as err:
print("get_jwt_token err ::",err)
continue
print("Failed To get_jwt_token ")
exit(0)
def isProjectExist(project_name , ac_id , email , token):
headers = {
'X-Auth-Email': email,
'X-Auth-Key': token,
'Content-Type' : 'application/json'
}
response = requests.get(
f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects/{project_name}',
headers=headers,
)
respData = response.json()
if respData.get('success'):
return True
print(respData.get('errors'))
return False
def createProject(project_name , ac_id , email , token):
headers = {
'X-Auth-Email': email,
'X-Auth-Key': token,
'Content-Type' : 'application/json'
}
json_data = {
'name': project_name,
'production_branch': 'main',
}
response = requests.post(
f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects',
headers=headers,
json=json_data,
)
try:
resp = response.json()
print(resp)
except:
print(response.text)
resp = {"success":False}
return resp
def get_files_hash(files : list):
hash_s = []
for file_ in files:
hash_ = hash_file(file_)
hash_s.append(hash_)
return hash_s
def get_files_hash_and_manifest(files : list):
hash_list = []
manifest = {}
for file_ in files:
hash_ = hash_file(file_)
hash_list.append(hash_)
asset_name = f"/{os.path.basename(file_)}"
manifest[asset_name] = hash_
return {"file_hashes" : hash_list , "manifest" : manifest}
def playload_upload_v2(files):
file_hashes = []
manifest = {}
for file_ in files:
hash_ = hash_file(file_)
file_hashes.append(hash_)
asset_name = f"/{os.path.basename(file_)}"
manifest[asset_name] = hash_
return {"file_hashes" : file_hashes , "manifest" : manifest}
def generate_playload(files , hash_to_upload):
json_data = []
file_hashes = []
manifest = {}
for file_ in files:
hash_ = hash_file(file_)
if hash_ not in hash_to_upload:
continue
file_hashes.append(hash_)
asset_name = f"/{os.path.basename(file_)}"
manifest[asset_name] = hash_
json_data.append({
'base64': True,
'key': hash_,
'value': file_to_base64(file_),
'metadata': {
'contentType': get_content_type(file_),
},
})
return {"json_data" : json_data , "file_hashes" : file_hashes , "manifest" : manifest}
def upload_assets(json_data ,jwt_token):
headers = {
"Authorization": "Bearer " + jwt_token,
'Content-Type' : 'application/json'
}
response = requests.post(
'https://api.cloudflare.com/api/v4/pages/assets/upload',
headers=headers,
json=json_data,
)
try:
resp = response.json()
print(resp)
except:
print(response.text)
resp = {"success":False}
return resp
# return file_hashes
def upsert_hashes(json_data , jwt_token):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + jwt_token
}
response = requests.post(
'https://api.cloudflare.com/client/v4/pages/assets/upsert-hashes',
headers=headers,
json={"hashes":json_data},
)
try:
resp = response.json()
print(resp)
except:
print(response.text)
resp = {"success":False}
return resp
def push_deployment(str_json_playload , project_name , ac_id , email , token):
str_json_playload = str_json_playload.replace('\\',"/")
print("str_json_playload :: ",str_json_playload)
data = f'------WebKitFormBoundaryS3GYPpcUKHU4A4n0\r\nContent-Disposition: form-data; name="manifest"\r\n\r\n{str_json_playload}\r\n------WebKitFormBoundaryS3GYPpcUKHU4A4n0--\r\n'
headers = {
'X-Auth-Email': email,
'X-Auth-Key': token,
'Content-Type' : 'application/json',
'authority': 'dash.cloudflare.com',
'accept': '*/*',
'accept-language': 'en-US,en;q=0.9',
'content-type': 'multipart/form-data; boundary=----WebKitFormBoundaryS3GYPpcUKHU4A4n0',
}
response = requests.post(
f'https://api.cloudflare.com/client/v4/accounts/{ac_id}/pages/projects/{project_name}/deployments',
headers=headers,
data=data,
)
try:
resp = response.json()
print(resp)
except:
print(response.text)
resp = {"success":False}
return resp
def check_missing(hashes , jwt_token):
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + jwt_token
}
response = requests.post(
'https://api.cloudflare.com/client/v4/pages/assets/check-missing',
headers=headers,
json={"hashes":hashes},
)
resp = response.json()
#print(resp)
if resp.get('success') ==True:
return resp.get('result',[])
return []
def save_details(file_name, data):
json_object = json.dumps(data, indent=4)
with open(file_name, "w") as outfile:
outfile.write(json_object)
def get_saved_details(file_name):
try:
with open(file_name,'r') as fp:
json_object = json.loads(fp.read())
return json_object
except Exception as err:
print("get_saved_details err :: ",err)
return {}
if __name__ == "__main__":
# Create an ArgumentParser object
parser = argparse.ArgumentParser(description='Command-line argument example')
# Add arguments with key names
parser.add_argument('--accountid', help='Account ID', required=True)
parser.add_argument('--project', help='Project Name', required=True)
parser.add_argument('--email', help='Email', required=True)
parser.add_argument('--token', help='Email Token', required=True)
parser.add_argument('--path', help='Folder_Paath', required=True)
# Parse the command-line arguments
args = parser.parse_args()
project_name = args.project
token = args.token
email = args.email
ac_id = args.accountid
FOLDER_PATH = args.path
NUMBER_OF_FILES_PER_DEPLOY = 100
NUMBER_OF_FILES_PER_REQUEST = 2
MAX_RETRY = 100
RETRY_SLEEP = 0
SLEEP_BEFORE_NEXT_DEPLY = 20
file_name = f"{ac_id}_{project_name}.json"
isExist = isProjectExist(project_name,ac_id,email,token)
print("isExist :: ",isExist)
if not isExist:
getSub = createProject(project_name,ac_id , email, token)
if getSub.get('success'):
print("New Project Created")
else:
print(getSub)
print('Failed To Create New Project')
exit(0)
print(getSub)
# exit(0)
files_ = get_files_in_folder(FOLDER_PATH)
total = len(files_)
# hashes = get_files_hash(files_)
dv_list = divide_list(files_, NUMBER_OF_FILES_PER_DEPLOY)
current_uploaded = 0
saved_data = get_saved_details(file_name)
for i_ , file_dv in enumerate(dv_list, 1):
if saved_data.get(str(i_)):
current_uploaded = current_uploaded + NUMBER_OF_FILES_PER_DEPLOY
print(f"{project_name}\n\nList {i_} Already Processed")
continue
print(f"List {i_}: {len(file_dv)} | Total : {total} | Left : {total-current_uploaded}")
send_log(f"{project_name}\n\nList {i_}: {len(file_dv)} | Total : {total} | Left : {total-current_uploaded}")
playload_upload_v2_ = playload_upload_v2(file_dv)
file_hashes = playload_upload_v2_.get('file_hashes')
manifest = playload_upload_v2_.get('manifest')
hashes = file_hashes
jwt = get_jwt_token(project_name, ac_id,email, token)
print("jwt :: ", jwt)
# print(file_dv)
# exit(0)
hash_to_upload = check_missing(hashes, jwt)
if len(hash_to_upload) == 0:
print("Found Zero hash_to_upload :: skipping")
send_log(f"{project_name}\n\nFound Zero hash_to_upload :: skipping")
current_uploaded = current_uploaded + NUMBER_OF_FILES_PER_DEPLOY
saved_data[i_] = {"file_hashes":file_hashes, "manifest":manifest}
save_details(file_name, saved_data)
continue
new_file_list = divide_list(file_dv, NUMBER_OF_FILES_PER_REQUEST)
for files_for_post in new_file_list:
retry = 0
post_playload_upload = generate_playload(files_for_post , hash_to_upload).get('json_data')
if len(post_playload_upload) < 1:
continue
for i in range(MAX_RETRY - 1):
retry = retry+1
if is_jwt_expired(jwt):
jwt = get_jwt_token(project_name, ac_id,email, token)
uploadasset = upload_assets(post_playload_upload , jwt)
if not uploadasset.get('success'):
#send_log(f"uploadasset err : <code>{str(uploadasset)}</code>")
print("uploadasset err : ", uploadasset)
print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
if retry >= MAX_RETRY:
send_log(f"uploadasset err stopped script reached max retry {MAX_RETRY}: <code>{str(uploadasset)}</code>")
exit(0)
if RETRY_SLEEP > 0:
time.sleep(RETRY_SLEEP)
continue
else:
current_uploaded = current_uploaded+ NUMBER_OF_FILES_PER_REQUEST
print("Uploaded Files : ",current_uploaded," | Total : ",total)
#time.sleep(1)
break
if not uploadasset.get('success'):
exit(0)
print("uploadasset :: ",uploadasset)
retry = 0
for i in range(MAX_RETRY - 1):
retry = retry+1
push_deploy = push_deployment(json.dumps(manifest),project_name, ac_id,email,token)
if not push_deploy.get('success'):
print("push_deploy err : ", push_deploy)
send_log(f"push_deploy err : <code>{str(push_deploy)}</code>")
print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
if RETRY_SLEEP > 0:
time.sleep(RETRY_SLEEP)
continue
else:
break
print("push_deploy :: ",push_deploy)
if not push_deploy.get('success'):
exit(0)
retry = 0
for i in range(MAX_RETRY - 1):
retry = retry+1
if is_jwt_expired(jwt):
jwt = get_jwt_token(project_name, ac_id,email, token)
upsert_hash = upsert_hashes(file_hashes , jwt)
if not upsert_hash.get('success'):
#print("upsert_hash err : ", upsert_hash)
send_log(f"upsert_hash err : <code>{str(upsert_hash)}</code>")
print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
if RETRY_SLEEP > 0:
time.sleep(RETRY_SLEEP)
continue
else:
break
#print("upsert_hash :: ",upsert_hash)
if not upsert_hash.get('success'):
exit(0)
saved_data[i_] = {"file_hashes":file_hashes, "manifest":manifest}
save_details(file_name, saved_data)
time.sleep(SLEEP_BEFORE_NEXT_DEPLY)
print(" ============================================= \n\n")
# print("file_hashes :: ",file_hashes)
# print("manifest :: ",manifest)
print("====== Pushing All Hashes and Manifest =====")
send_log(project_name+"\n\n====== Pushing All Hashes and Manifest =====")
manifest = dict()
file_hashes = []
for data_ in saved_data:
data_ = saved_data[data_]
print(data_)
file_hashes = file_hashes + data_.get("file_hashes")
# manifes = manifes.update(data_.get("manifes"))
if manifest is None:
manifest = data_.get("manifest")
else:
manifest.update(data_.get("manifest"))
retry = 0
for i in range(MAX_RETRY - 1):
retry = retry+1
push_deploy = push_deployment(json.dumps(manifest),project_name, ac_id,email,token)
if not push_deploy.get('success'):
print("push_deploy err : ", push_deploy)
send_log(f"push_deploy err : <code>{str(push_deploy)}</code>")
print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
if RETRY_SLEEP > 0:
time.sleep(RETRY_SLEEP)
continue
else:
break
print("push_deploy :: ",push_deploy)
if not push_deploy.get('success'):
send_log(f"all push deploy error")
exit(0)
retry = 0
for i in range(MAX_RETRY - 1):
retry = retry+1
try:
if is_jwt_expired(jwt):
jwt = get_jwt_token(project_name, ac_id,email, token)
except:
jwt = get_jwt_token(project_name, ac_id,email, token)
upsert_hash = upsert_hashes(file_hashes , jwt)
if not upsert_hash.get('success'):
#print("upsert_hash err : ", upsert_hash)
send_log(f"upsert_hash err : <code>{str(upsert_hash)}</code>")
print(f"Sleeping for {RETRY_SLEEP} | Retry Left : {MAX_RETRY - retry}")
if RETRY_SLEEP > 0:
time.sleep(RETRY_SLEEP)
continue
else:
break
#print("upsert_hash :: ",upsert_hash)
if not upsert_hash.get('success'):
exit(0)
send_log(f"{project_name}\n\nAll Done\n\nCheck https://{project_name}.pages.dev")
print("Done")Editor is loading...