Untitled

 avatar
unknown
plain_text
2 years ago
23 kB
5
Indexable
import base64
import email
import re
import time
import typing
from datetime import datetime, timedelta
from imaplib import IMAP4_SSL

from parsel import Selector
from playwright._impl._sync_base import EventInfo
from playwright.sync_api import (
    sync_playwright,
    BrowserContext,
    Playwright,
    Browser,
    Page,
)
from playwright_stealth import stealth_sync

from constants import (
    TWITTER_DOMAIN,
    FIREFOX_PREFS,
    IN_PROGRESS_STATUS,
    SUCCESS_STATUS,
    FAIL_STATUS,
)


class TwitterAccountManager:

    def __init__(
            self,
            email_credentials: dict,
            proxies: typing.Union[dict, None],
    ):
        self.email_credentials = email_credentials
        self.proxies = proxies
        self.status = IN_PROGRESS_STATUS

        for _ in range(3):
            try:
                self.status = self.register_account()
            except BaseException:
                self.status = FAIL_STATUS

    @staticmethod
    def create_playwright_client() -> Playwright:
        return sync_playwright().start()

    @staticmethod
    def create_playwright_browser(
            playwright_client: Playwright,
            proxies: typing.Union[dict, None],
            headless: bool,
    ) -> Browser:
        return playwright_client.firefox.launch(
            headless=headless,
            proxy={'server': 'pre-context'} if proxies else None,
            firefox_user_prefs=FIREFOX_PREFS,
        )

    @staticmethod
    def create_playwright_context(
            playwright_browser: Browser,
            proxies: typing.Union[dict, None],
    ) -> BrowserContext:
        return playwright_browser.new_context(
            proxy={
                'server': f'http://{proxies["url"]}:{proxies["port"]}',
                'username': proxies["username"] or '',
                'password': proxies["password"] or '',
            } if proxies else None,
            bypass_csp=True
        )

    @staticmethod
    def create_playwright_page(
            playwright_browser_context: BrowserContext,
    ) -> Page:
        return playwright_browser_context.new_page()

    @staticmethod
    def get_date_from_string(
            date_as_string: str,
    ) -> datetime:
        try:
            return datetime.strptime(date_as_string, '%d-%m-%Y')
        except BaseException:
            pass

        try:
            return datetime.strptime(date_as_string, '%Y-%m-%d')
        except BaseException:
            return datetime.strptime('01-01-1994', '%Y-%m-%d')

    @staticmethod
    def open_page(
            playwright_page: Page,
            url: str,
    ) -> None:
        for retry_number in range(3):
            try:
                playwright_page.goto(
                    url,
                    timeout=10000,
                )
            except BaseException as e:
                time.sleep(1)

                if retry_number + 1 == 10:
                    raise e

    @staticmethod
    def close_popup_window(
            playwright_page: Page,
    ) -> None:
        for _ in range(5):
            try:
                time.sleep(1)
                playwright_page.evaluate(
                    "document.getElementById('credential_picker_container').remove();"
                )
            except BaseException:
                pass

    @staticmethod
    def press_signup_with_google_button(
            playwright_page: Page,
    ) -> None:
        try:
            sign_in_button = playwright_page.wait_for_selector(
                'xpath=//*[contains(@data-testid, "google_sign_in_container")]',
                timeout=3000,
            )
            sign_in_button.click()
            time.sleep(2)
        except BaseException:
            pass

    def enter_login_and_password(
            self,
            playwright_page: Page,
            popup_window: EventInfo,
    ) -> None:
        try:
            # click sign up/in button
            load_more_button = playwright_page.wait_for_selector(
                'xpath=//*[contains(@data-testid, "google_sign_in_container")]',
                timeout=3000,
            )
            load_more_button.click()
            time.sleep(2)

            # enter login
            popup_window.value.fill(
                selector='xpath=//input',
                value=self.email_credentials['email'],
            )
            time.sleep(1)

            #confirm login
            popup_window.value.wait_for_selector(
                selector='xpath=//*[contains(@id, "Next")]//button',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

        try:
            # enter password
            popup_window.value.fill(
                selector='xpath=//input[@type="password"]',
                value=self.email_credentials['password'],
            )
            time.sleep(1)

            # confirm password
            popup_window.value.wait_for_selector(
                selector='xpath=//*[contains(@id, "Next")]//button',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    def bypass_recovery_email_confirmation(
            self,
            popup_window: EventInfo,
    ) -> None:
        try:
            # chose the "confirm recovery email" test
            popup_window.value.wait_for_selector(
                selector='xpath=//*[@data-challengetype="12"]',
                timeout=3000,
            ).click()
            time.sleep(2)

            # input the recovery email
            popup_window.value.fill(
                selector='xpath=//input[@aria-label]',
                value=self.email_credentials['recovery_email'],
            )
            time.sleep(1)

            # confirm the recovery email
            popup_window.value.wait_for_selector(
                selector='xpath=//*[contains(@data-primary-action-label, "Next") or contains(@id, "Next")]//button',
                timeout=3000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    def bypass_email_confirmation(
            self,
            playwright_page: Page,
            popup_window: EventInfo,
    ) -> None:
        try:
            # chose the recover email verification code test
            popup_window.value.wait_for_selector(
                selector='xpath=//*[@data-challengetype="30"]',
                timeout=3000,
            ).click()
            time.sleep(2)

            # get the code
            code = self.get_authentication_code()

            # enter the code
            popup_window.value.fill(
                selector='xpath=//input[@aria-label]',
                value=code,
            )

            # confirm the code
            popup_window.value.wait_for_selector(
                selector='xpath=//*[contains(@id, "Next")]//button',
                timeout=3000,
            ).click()
            time.sleep(2)
        except BaseException as e:
            pass

    @staticmethod
    def confirm_gmail_enter(
            popup_window: EventInfo,
    ) -> None:
        # Confirm enter as gmail user if needed
        try:
            popup_window.value.wait_for_selector(
                'xpath=//*[contains(@id, "yes") or contains(@id, "Yes") and @role="button"]',
                timeout=3000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    @staticmethod
    def skip_popup_offer_window(
            popup_window: EventInfo,
    ) -> None:
        # clicking Not Now button
        try:
            popup_window.value.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Next")]',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    @staticmethod
    def skip_offer_window(
            playwright_page: Page,
    ) -> None:
        # clicking Not Now button
        try:
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Next")]',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    def enter_birthday_information(
            self,
            playwright_page: Page,
    ) -> bool:
        try:
            # get available option for month
            month_area_html = playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
                timeout=3000,
            ).inner_html()
            month_area_tree = Selector(month_area_html)
            month_area_options = [option.strip() for option in month_area_tree.xpath('//option/text()').extract()]

            # convert the birthday string into a datetime
            birthday = self.get_date_from_string(self.email_credentials['birthday'])

            # enter the birthday month
            playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
                timeout=2000,
            ).select_option(month_area_options[birthday.month - 1])
            time.sleep(1)

            # enter the birthday day
            playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_2")]/../select',
                timeout=2000,
            ).select_option(str(birthday.day))
            time.sleep(1)

            # enter the birthday year
            playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_3")]/../select',
                timeout=2000,
            ).select_option(str(birthday.year))
            time.sleep(2)

            # confirm birthday
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Next")]',
                timeout=2000,
            ).click()
            time.sleep(2)
            return True
        except BaseException:
            return False

    @staticmethod
    def confirm_twitter_settings(
            playwright_page: Page,
    ) -> None:
        try:
            # confirm twitter settings
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@id, "Next") or contains(@testid, "Next") '
                         'or @role="button"]/div/span/span',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    @staticmethod
    def accept_default_nickname(
            playwright_page: Page,
    ) -> None:
        # skip the pre-nickname window if appears
        try:
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Next")]',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

        # confirm default nickname
        try:
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Skip")]',
                timeout=2000,
            ).click()
            return
        except BaseException:
            pass

        # confirm default nickname
        try:
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@id, "Next") or contains(@testid, "Next") '
                         'or contains(@aria-label, "Next") or @role="button"]/div/span/span',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    @staticmethod
    def reject_notifications(
            playwright_page: Page,
    ) -> None:
        try:
            playwright_page.wait_for_selector(
                selector='xpath=//*[@role="button" and not(@style)]',
                timeout=2000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    def check_operation_status(
            self,
            playwright_page: Page,
    ) -> str:

        if 'twitter.com/home' in playwright_page.url:
            return SUCCESS_STATUS

        self.close_popup_window(
            playwright_page=playwright_page,
        )

        try:
            playwright_page.reload()
        except BaseException:
            pass

        for _ in range(3):
            try:
                playwright_page.wait_for_selector(
                    selector='xpath=//*[@role="button" or self::button]',
                    timeout=2000,
                ).click()
                playwright_page.reload()
                break
            except BaseException:
                pass

        if 'twitter.com/home' in playwright_page.url:
            return SUCCESS_STATUS

        self.open_page(
            url='https://twitter.com/home',
            playwright_page=playwright_page,
        )
        time.sleep(2)

        if 'twitter.com/home' in playwright_page.url:
            return SUCCESS_STATUS

        return FAIL_STATUS

    def change_the_account_name(
            self,
            playwright_page: Page,
    ) -> None:
        try:
            # open "Edit profile" page
            self.open_page(
                url='https://twitter.com/settings/profile',
                playwright_page=playwright_page,
            )
            time.sleep(1)

            # enter the name from credentials
            playwright_page.fill(
                selector='xpath=//input[contains(@name, "Name")]',
                value=self.email_credentials['name'],
            )
            time.sleep(1)

            # confirm the name changing
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Save")]',
                timeout=3000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    def change_the_account_birthday(
            self,
            playwright_page: Page,
    ) -> None:
        try:
            # open "Edit profile" page
            self.open_page(
                url='https://twitter.com/settings/profile',
                playwright_page=playwright_page,
            )
            time.sleep(1)

            # click "Edit birthday" button
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "ProfileBirthdate")]',
                timeout=3000,
            ).click()
            time.sleep(1)

            # confirm the birthday edit intention
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "SheetConfirm")]',
                timeout=3000,
            ).click()
            time.sleep(1)

            # get available option for month
            month_area_html = playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
                timeout=3000,
            ).inner_html()
            month_area_tree = Selector(month_area_html)
            month_area_options = [option.strip() for option in month_area_tree.xpath('//option/text()').extract()]

            # convert the birthday string into a datetime
            birthday = self.get_date_from_string(self.email_credentials['birthday'])

            # enter the birthday month
            playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
                timeout=2000,
            ).select_option(month_area_options[birthday.month - 1])
            time.sleep(1)

            # enter the birthday day
            playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_2")]/../select',
                timeout=2000,
            ).select_option(str(birthday.day))
            time.sleep(1)

            # enter the birthday year
            playwright_page.wait_for_selector(
                selector='xpath=//label[contains(@id, "SELECTOR_3")]/../select',
                timeout=2000,
            ).select_option(str(birthday.year))
            time.sleep(1)

            # confirm the birthday changing
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "Save")]',
                timeout=3000,
            ).click()
            time.sleep(1)

            # confirm again if required
            playwright_page.wait_for_selector(
                selector='xpath=//*[contains(@data-testid, "SheetConfirm")]',
                timeout=3000,
            ).click()
            time.sleep(2)
        except BaseException:
            pass

    def register_account(self) -> str:
        playwright_client = self.create_playwright_client()
        playwright_browser = self.create_playwright_browser(
            playwright_client=playwright_client,
            proxies=self.proxies,
            headless=False,
        )
        playwright_browser_context = self.create_playwright_context(
            playwright_browser=playwright_browser,
            proxies=self.proxies,
        )
        playwright_page = self.create_playwright_page(
            playwright_browser_context=playwright_browser_context,
        )
        stealth_sync(playwright_page)

        # open the twitter register page
        self.open_page(
            playwright_page=playwright_page,
            url=TWITTER_DOMAIN,
        )

        # close first popup window if appears
        self.close_popup_window(
            playwright_page=playwright_page,
        )

        # open browser context manager on event popup
        with playwright_page.expect_popup() as popup_window:
            # enter the email login and password
            self.enter_login_and_password(
                playwright_page=playwright_page,
                popup_window=popup_window,
            )

            # bypass email verification if needed
            self.bypass_recovery_email_confirmation(
                popup_window=popup_window,
            )

            # skip popup offer window if needed
            self.skip_popup_offer_window(
                popup_window=popup_window,
            )

            # confirm enter as gmail user if needed
            self.confirm_gmail_enter(
                popup_window=popup_window,
            )

            # enter birthday information
            birthday_entered = self.enter_birthday_information(
                playwright_page=playwright_page,
            )

            # skip any offer window if needed
            self.skip_offer_window(
                playwright_page=playwright_page,
            )

            # confirm enter with default nickname
            self.accept_default_nickname(
                playwright_page=playwright_page,
            )

            # reject notifications if needed
            self.reject_notifications(
                playwright_page=playwright_page,
            )

            # check if sign up is completed
            status = self.check_operation_status(
                playwright_page=playwright_page,
            )

            if status != SUCCESS_STATUS:
                raise Exception('Register failed')

            # change the account name
            self.change_the_account_name(
                playwright_page=playwright_page,
            )

            if birthday_entered:
                return status

            # change the account birthday if it wasn't do
            self.change_the_account_birthday(
                playwright_page=playwright_page,
            )
            return status

    def get_authentication_code(self) -> str:
        click_datetime = datetime.utcnow()
        imap = IMAP4_SSL(
            host=self.email_credentials['recovery_email'].split('@')[-1],
            port=993,
        )
        imap.login(
            user=self.email_credentials['recovery_email'],
            password=self.email_credentials['recovery_email_password'],
        )

        for _ in range(6):
            time.sleep(10)
            imap.select()
            _, messages = imap.search(None, 'ALL')
            message_id_list = messages[0].split()

            for message_id in message_id_list[::-1]:
                try:
                    _, message = imap.fetch(message_id, '(RFC822)')
                    message_bytes = message[0][1]
                    parsed_message = email.message_from_bytes(message_bytes)
                    message_date_str = parsed_message['Date']
                    message_from_str = parsed_message['From']
                    message_timestamp = time.mktime(email.utils.parsedate(message_date_str))
                    sender_name, sender_email = email.utils.parseaddr(message_from_str)
                    click_datetime = click_datetime.replace(microsecond=0)
                    message_datetime = datetime.fromtimestamp(message_timestamp).replace(microsecond=0)

                    if message_datetime >= click_datetime - timedelta(seconds=30) \
                            and sender_email == 'noreply@google.com':
                        message_text = str(parsed_message.get_payload()[0])
                        message_text = base64.b64decode(message_text.split('base64')[-1].strip())
                        message_text = message_text.decode().replace('\r', '').replace('\n', '')
                        code_list = re.findall(r'verification code is:.*?([0-9]+)', message_text)

                        if code_list:
                            return code_list[0].strip()
                except BaseException:
                    pass

        raise Exception('NO CONFIRMATION CODE')


if __name__ == '__main__':
    credentials = {
        'email': 'bogdannest071@gmail.com',
        'recovery_email': 'keshabbista18@gmail.com',
        'recovery_email_password': 'VjIeH6EvXWpF',
        'password': 'sdkHB23d',
        'name': 'Dataox Team',
        'birthday': '14-06-1993',
    }
    twitter_account_manager = TwitterAccountManager(
        email_credentials=credentials,
        proxies={
            'url': '91.225.13.33',
            'port': '51523',
            'username': 'alexandranedova0nPCH',
            'password': 'FmcGqYgSvV',
        },
        # proxies=None,
    )

Editor is loading...