Untitled
unknown
plain_text
2 years ago
23 kB
5
Indexable
import base64 import email import re import time import typing from datetime import datetime, timedelta from imaplib import IMAP4_SSL from parsel import Selector from playwright._impl._sync_base import EventInfo from playwright.sync_api import ( sync_playwright, BrowserContext, Playwright, Browser, Page, ) from playwright_stealth import stealth_sync from constants import ( TWITTER_DOMAIN, FIREFOX_PREFS, IN_PROGRESS_STATUS, SUCCESS_STATUS, FAIL_STATUS, ) class TwitterAccountManager: def __init__( self, email_credentials: dict, proxies: typing.Union[dict, None], ): self.email_credentials = email_credentials self.proxies = proxies self.status = IN_PROGRESS_STATUS for _ in range(3): try: self.status = self.register_account() except BaseException: self.status = FAIL_STATUS @staticmethod def create_playwright_client() -> Playwright: return sync_playwright().start() @staticmethod def create_playwright_browser( playwright_client: Playwright, proxies: typing.Union[dict, None], headless: bool, ) -> Browser: return playwright_client.firefox.launch( headless=headless, proxy={'server': 'pre-context'} if proxies else None, firefox_user_prefs=FIREFOX_PREFS, ) @staticmethod def create_playwright_context( playwright_browser: Browser, proxies: typing.Union[dict, None], ) -> BrowserContext: return playwright_browser.new_context( proxy={ 'server': f'http://{proxies["url"]}:{proxies["port"]}', 'username': proxies["username"] or '', 'password': proxies["password"] or '', } if proxies else None, bypass_csp=True ) @staticmethod def create_playwright_page( playwright_browser_context: BrowserContext, ) -> Page: return playwright_browser_context.new_page() @staticmethod def get_date_from_string( date_as_string: str, ) -> datetime: try: return datetime.strptime(date_as_string, '%d-%m-%Y') except BaseException: pass try: return datetime.strptime(date_as_string, '%Y-%m-%d') except BaseException: return datetime.strptime('01-01-1994', '%Y-%m-%d') @staticmethod def open_page( playwright_page: Page, url: str, ) -> None: for retry_number in range(3): try: playwright_page.goto( url, timeout=10000, ) except BaseException as e: time.sleep(1) if retry_number + 1 == 10: raise e @staticmethod def close_popup_window( playwright_page: Page, ) -> None: for _ in range(5): try: time.sleep(1) playwright_page.evaluate( "document.getElementById('credential_picker_container').remove();" ) except BaseException: pass @staticmethod def press_signup_with_google_button( playwright_page: Page, ) -> None: try: sign_in_button = playwright_page.wait_for_selector( 'xpath=//*[contains(@data-testid, "google_sign_in_container")]', timeout=3000, ) sign_in_button.click() time.sleep(2) except BaseException: pass def enter_login_and_password( self, playwright_page: Page, popup_window: EventInfo, ) -> None: try: # click sign up/in button load_more_button = playwright_page.wait_for_selector( 'xpath=//*[contains(@data-testid, "google_sign_in_container")]', timeout=3000, ) load_more_button.click() time.sleep(2) # enter login popup_window.value.fill( selector='xpath=//input', value=self.email_credentials['email'], ) time.sleep(1) #confirm login popup_window.value.wait_for_selector( selector='xpath=//*[contains(@id, "Next")]//button', timeout=2000, ).click() time.sleep(2) except BaseException: pass try: # enter password popup_window.value.fill( selector='xpath=//input[@type="password"]', value=self.email_credentials['password'], ) time.sleep(1) # confirm password popup_window.value.wait_for_selector( selector='xpath=//*[contains(@id, "Next")]//button', timeout=2000, ).click() time.sleep(2) except BaseException: pass def bypass_recovery_email_confirmation( self, popup_window: EventInfo, ) -> None: try: # chose the "confirm recovery email" test popup_window.value.wait_for_selector( selector='xpath=//*[@data-challengetype="12"]', timeout=3000, ).click() time.sleep(2) # input the recovery email popup_window.value.fill( selector='xpath=//input[@aria-label]', value=self.email_credentials['recovery_email'], ) time.sleep(1) # confirm the recovery email popup_window.value.wait_for_selector( selector='xpath=//*[contains(@data-primary-action-label, "Next") or contains(@id, "Next")]//button', timeout=3000, ).click() time.sleep(2) except BaseException: pass def bypass_email_confirmation( self, playwright_page: Page, popup_window: EventInfo, ) -> None: try: # chose the recover email verification code test popup_window.value.wait_for_selector( selector='xpath=//*[@data-challengetype="30"]', timeout=3000, ).click() time.sleep(2) # get the code code = self.get_authentication_code() # enter the code popup_window.value.fill( selector='xpath=//input[@aria-label]', value=code, ) # confirm the code popup_window.value.wait_for_selector( selector='xpath=//*[contains(@id, "Next")]//button', timeout=3000, ).click() time.sleep(2) except BaseException as e: pass @staticmethod def confirm_gmail_enter( popup_window: EventInfo, ) -> None: # Confirm enter as gmail user if needed try: popup_window.value.wait_for_selector( 'xpath=//*[contains(@id, "yes") or contains(@id, "Yes") and @role="button"]', timeout=3000, ).click() time.sleep(2) except BaseException: pass @staticmethod def skip_popup_offer_window( popup_window: EventInfo, ) -> None: # clicking Not Now button try: popup_window.value.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Next")]', timeout=2000, ).click() time.sleep(2) except BaseException: pass @staticmethod def skip_offer_window( playwright_page: Page, ) -> None: # clicking Not Now button try: playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Next")]', timeout=2000, ).click() time.sleep(2) except BaseException: pass def enter_birthday_information( self, playwright_page: Page, ) -> bool: try: # get available option for month month_area_html = playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select', timeout=3000, ).inner_html() month_area_tree = Selector(month_area_html) month_area_options = [option.strip() for option in month_area_tree.xpath('//option/text()').extract()] # convert the birthday string into a datetime birthday = self.get_date_from_string(self.email_credentials['birthday']) # enter the birthday month playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select', timeout=2000, ).select_option(month_area_options[birthday.month - 1]) time.sleep(1) # enter the birthday day playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_2")]/../select', timeout=2000, ).select_option(str(birthday.day)) time.sleep(1) # enter the birthday year playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_3")]/../select', timeout=2000, ).select_option(str(birthday.year)) time.sleep(2) # confirm birthday playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Next")]', timeout=2000, ).click() time.sleep(2) return True except BaseException: return False @staticmethod def confirm_twitter_settings( playwright_page: Page, ) -> None: try: # confirm twitter settings playwright_page.wait_for_selector( selector='xpath=//*[contains(@id, "Next") or contains(@testid, "Next") ' 'or @role="button"]/div/span/span', timeout=2000, ).click() time.sleep(2) except BaseException: pass @staticmethod def accept_default_nickname( playwright_page: Page, ) -> None: # skip the pre-nickname window if appears try: playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Next")]', timeout=2000, ).click() time.sleep(2) except BaseException: pass # confirm default nickname try: playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Skip")]', timeout=2000, ).click() return except BaseException: pass # confirm default nickname try: playwright_page.wait_for_selector( selector='xpath=//*[contains(@id, "Next") or contains(@testid, "Next") ' 'or contains(@aria-label, "Next") or @role="button"]/div/span/span', timeout=2000, ).click() time.sleep(2) except BaseException: pass @staticmethod def reject_notifications( playwright_page: Page, ) -> None: try: playwright_page.wait_for_selector( selector='xpath=//*[@role="button" and not(@style)]', timeout=2000, ).click() time.sleep(2) except BaseException: pass def check_operation_status( self, playwright_page: Page, ) -> str: if 'twitter.com/home' in playwright_page.url: return SUCCESS_STATUS self.close_popup_window( playwright_page=playwright_page, ) try: playwright_page.reload() except BaseException: pass for _ in range(3): try: playwright_page.wait_for_selector( selector='xpath=//*[@role="button" or self::button]', timeout=2000, ).click() playwright_page.reload() break except BaseException: pass if 'twitter.com/home' in playwright_page.url: return SUCCESS_STATUS self.open_page( url='https://twitter.com/home', playwright_page=playwright_page, ) time.sleep(2) if 'twitter.com/home' in playwright_page.url: return SUCCESS_STATUS return FAIL_STATUS def change_the_account_name( self, playwright_page: Page, ) -> None: try: # open "Edit profile" page self.open_page( url='https://twitter.com/settings/profile', playwright_page=playwright_page, ) time.sleep(1) # enter the name from credentials playwright_page.fill( selector='xpath=//input[contains(@name, "Name")]', value=self.email_credentials['name'], ) time.sleep(1) # confirm the name changing playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Save")]', timeout=3000, ).click() time.sleep(2) except BaseException: pass def change_the_account_birthday( self, playwright_page: Page, ) -> None: try: # open "Edit profile" page self.open_page( url='https://twitter.com/settings/profile', playwright_page=playwright_page, ) time.sleep(1) # click "Edit birthday" button playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "ProfileBirthdate")]', timeout=3000, ).click() time.sleep(1) # confirm the birthday edit intention playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "SheetConfirm")]', timeout=3000, ).click() time.sleep(1) # get available option for month month_area_html = playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select', timeout=3000, ).inner_html() month_area_tree = Selector(month_area_html) month_area_options = [option.strip() for option in month_area_tree.xpath('//option/text()').extract()] # convert the birthday string into a datetime birthday = self.get_date_from_string(self.email_credentials['birthday']) # enter the birthday month playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select', timeout=2000, ).select_option(month_area_options[birthday.month - 1]) time.sleep(1) # enter the birthday day playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_2")]/../select', timeout=2000, ).select_option(str(birthday.day)) time.sleep(1) # enter the birthday year playwright_page.wait_for_selector( selector='xpath=//label[contains(@id, "SELECTOR_3")]/../select', timeout=2000, ).select_option(str(birthday.year)) time.sleep(1) # confirm the birthday changing playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "Save")]', timeout=3000, ).click() time.sleep(1) # confirm again if required playwright_page.wait_for_selector( selector='xpath=//*[contains(@data-testid, "SheetConfirm")]', timeout=3000, ).click() time.sleep(2) except BaseException: pass def register_account(self) -> str: playwright_client = self.create_playwright_client() playwright_browser = self.create_playwright_browser( playwright_client=playwright_client, proxies=self.proxies, headless=False, ) playwright_browser_context = self.create_playwright_context( playwright_browser=playwright_browser, proxies=self.proxies, ) playwright_page = self.create_playwright_page( playwright_browser_context=playwright_browser_context, ) stealth_sync(playwright_page) # open the twitter register page self.open_page( playwright_page=playwright_page, url=TWITTER_DOMAIN, ) # close first popup window if appears self.close_popup_window( playwright_page=playwright_page, ) # open browser context manager on event popup with playwright_page.expect_popup() as popup_window: # enter the email login and password self.enter_login_and_password( playwright_page=playwright_page, popup_window=popup_window, ) # bypass email verification if needed self.bypass_recovery_email_confirmation( popup_window=popup_window, ) # skip popup offer window if needed self.skip_popup_offer_window( popup_window=popup_window, ) # confirm enter as gmail user if needed self.confirm_gmail_enter( popup_window=popup_window, ) # enter birthday information birthday_entered = self.enter_birthday_information( playwright_page=playwright_page, ) # skip any offer window if needed self.skip_offer_window( playwright_page=playwright_page, ) # confirm enter with default nickname self.accept_default_nickname( playwright_page=playwright_page, ) # reject notifications if needed self.reject_notifications( playwright_page=playwright_page, ) # check if sign up is completed status = self.check_operation_status( playwright_page=playwright_page, ) if status != SUCCESS_STATUS: raise Exception('Register failed') # change the account name self.change_the_account_name( playwright_page=playwright_page, ) if birthday_entered: return status # change the account birthday if it wasn't do self.change_the_account_birthday( playwright_page=playwright_page, ) return status def get_authentication_code(self) -> str: click_datetime = datetime.utcnow() imap = IMAP4_SSL( host=self.email_credentials['recovery_email'].split('@')[-1], port=993, ) imap.login( user=self.email_credentials['recovery_email'], password=self.email_credentials['recovery_email_password'], ) for _ in range(6): time.sleep(10) imap.select() _, messages = imap.search(None, 'ALL') message_id_list = messages[0].split() for message_id in message_id_list[::-1]: try: _, message = imap.fetch(message_id, '(RFC822)') message_bytes = message[0][1] parsed_message = email.message_from_bytes(message_bytes) message_date_str = parsed_message['Date'] message_from_str = parsed_message['From'] message_timestamp = time.mktime(email.utils.parsedate(message_date_str)) sender_name, sender_email = email.utils.parseaddr(message_from_str) click_datetime = click_datetime.replace(microsecond=0) message_datetime = datetime.fromtimestamp(message_timestamp).replace(microsecond=0) if message_datetime >= click_datetime - timedelta(seconds=30) \ and sender_email == 'noreply@google.com': message_text = str(parsed_message.get_payload()[0]) message_text = base64.b64decode(message_text.split('base64')[-1].strip()) message_text = message_text.decode().replace('\r', '').replace('\n', '') code_list = re.findall(r'verification code is:.*?([0-9]+)', message_text) if code_list: return code_list[0].strip() except BaseException: pass raise Exception('NO CONFIRMATION CODE') if __name__ == '__main__': credentials = { 'email': 'bogdannest071@gmail.com', 'recovery_email': 'keshabbista18@gmail.com', 'recovery_email_password': 'VjIeH6EvXWpF', 'password': 'sdkHB23d', 'name': 'Dataox Team', 'birthday': '14-06-1993', } twitter_account_manager = TwitterAccountManager( email_credentials=credentials, proxies={ 'url': '91.225.13.33', 'port': '51523', 'username': 'alexandranedova0nPCH', 'password': 'FmcGqYgSvV', }, # proxies=None, )
Editor is loading...