Untitled

 avatar
unknown
python
a year ago
6.5 kB
5
Indexable
import requests
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from time import sleep
import requests
from selenium.webdriver.common.action_chains import ActionChains
import urllib.parse, urllib.request, sys
import os
import re

from easy_playlist import Playlists

import datetime
import time

# from selenium.webdriver.chrome.service import Service
# from webdriver_manager.chrome import ChromeDriverManager

current_directory = os.path.dirname(os.path.abspath(__file__))


pls = Playlists()
drivers = {}


def tex(text):
    r="".join(text.split())
    return r.lower()


    
def create_room(channel,key,uid,chatId,ss):
    close_driver(chatId)
    options = Options()
    options.add_argument('use-fake-device-for-media-stream')
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--use-fake-ui-for-media-stream')
    options.add_argument('--use-file-for-fake-audio-capture=Silent.wav')
    options.add_argument(f'--new-window={chatId}')
    driver = webdriver.Chrome(options=options)

    if ss=="sc":
        driver.get("https://sunny-hotteok-2f6e84.netlify.app/")
    else:
        driver.get("https://sunny-hotteok-2f6e84.netlify.app/")
    driver.tab_name = chatId
    # pls.add_playlist(chatId)
    drivers[chatId] = {"driver": driver, "start_time": datetime.datetime.now()}
    driver.find_element(By.ID,'appid').send_keys('2b0567d3ff534f0593528432dac20dc1')
    driver.find_element(By.ID,'token').send_keys(key)
    driver.find_element(By.ID,'channel').send_keys(channel)
    driver.find_element(By.ID,'uid').send_keys(uid)
    driver.find_element(By.ID,'join').click()
    sleep(1)
    if ss!="sc":
        driver.find_element(By.ID,'mute-audio').click()
    return driver



def convert_time(seconds):
    minutes = seconds // 60
    remaining_seconds = seconds % 60
    return minutes, remaining_seconds

def play_sc(url,chatId):

    driver=switch(chatId)
    url_input=driver.find_element(By.ID,'urlInput')
    url_input.clear()
    url_input.send_keys(url)
    driver.find_element(By.ID,'stop-audio-mixing').click()
    driver.find_element(By.ID,'local-audio-mixing').click()


# def play(song_info,chatId,custom:str ="no"):
#     driver=switch(chatId)
#     min,sec=convert_time(int(song_info["duration"]))
#     pl = pls.get_playlist(chatId)
#     index=pl.index
#     if custom =="no":
#         if str(index) !='-1':
#             if pl.playlist[index].playing==True:
#                 return "naughty hora ke bkl gana chal raha hai"
    
#     Entry = Query()
#     result = db.get(Entry.id == song_info["id"])
#     if result:
#         if os.path.exists(result["path"]):
#             absolute_path = os.path.join(current_directory, result["path"])
#             driver.find_element(By.ID,'local-file').send_keys(absolute_path)
#             driver.find_element(By.ID,'stop-audio-mixing').click()
#             driver.find_element(By.ID,'local-audio-mixing').click()
            
#             pl.add_music(absolute_path)
            
#             pl.play()
#             msg=f'''
# Now playing: {song_info["song"]}
# 01:23 ━━━━●───── {min}:{sec}
#                  ◁  ❚❚  ▷              
# '''
#             return msg
#         else:
#             print("song not")
#     else:
        
#         path=download_song(song_info)
#         song_info["path"]=path
#         db.insert(song_info)
#         if os.path.exists(song_info["path"]):
#             absolute_path = os.path.join(current_directory, song_info["path"])
#             driver.find_element(By.ID,'local-file').send_keys(absolute_path)
#             driver.find_element(By.ID,'stop-audio-mixing').click()
#             driver.find_element(By.ID,'local-audio-mixing').click()
#             pl = pls.get_playlist(chatId)
#             pl.add_music(absolute_path)
#             pl.play()
#             msg=f'''
# Now playing: {song_info["song"]}
# 01:23 ━━━━●───── {min}:{sec}
#                  ◁  ❚❚  ▷              
# '''
#             return msg
#         else:
#             return "song not found"
    







def pause(chatId):
    driver=switch(chatId)
    driver.find_element(By.ID,'pause').click()
    

def resume(chatId):
    driver=switch(chatId)
    # pl = pls.get_playlist(chatId)
    driver.find_element(By.ID,'resume').click()


def mute(chatId):
    driver=switch(chatId)
    driver.find_element(By.ID,'mute-audio').click()


def unmute(chatId):
    driver=switch(chatId)
    driver.find_element(By.ID,'unmute-audio').click()

def stop(chatId):
    driver=switch(chatId)
    # pl = pls.get_playlist(chatId)
    driver.find_element(By.ID,'stop-audio-mixing').click()
    # pl.stop()
    driver.find_element(By.ID,'mute-audio').click()

def playlist_clear(chatId):
    try:
        close_driver(chatId)
        pl = pls.get_playlist(chatId)
    except Exception as e: print(e)
    
def close_div(chatId):
    driver=switch(chatId)
    driver.close()
    drivers.remove(driver)

def volume(chatId,volume):
    driver=switch(chatId)
    driver.execute_script(f'setVolume({volume});')


def switch(chat_id):
    if chat_id in drivers:
        driver = drivers[chat_id]["driver"]
        driver.switch_to.window(driver.current_window_handle)
        return driver
    else:
        print(f"Driver for chat {chat_id} not found.")

def close_driver(chat_id):
    if chat_id in drivers:
        driver_data = drivers.pop(chat_id)
        driver = driver_data["driver"]
        driver.quit()
    else:
        print(f"Driver for chat {chat_id} not found.")

def close_old_tabs(max_duration):
    now = datetime.datetime.now()
    for chat_id, driver_data in list(drivers.items()):
        start_time = driver_data["start_time"]
        if (now - start_time).total_seconds() > max_duration:
            close_driver(chat_id)

def check_and_close_old_tabs_periodically(interval, max_duration):
    while True:
        close_old_tabs(max_duration)
        time.sleep(interval)


import threading
interval_seconds = 600 
max_tab_duration = 3600
check_thread = threading.Thread(target=check_and_close_old_tabs_periodically, args=(interval_seconds, max_tab_duration))
check_thread.start()


Editor is loading...