Untitled
unknown
plain_text
3 years ago
23 kB
8
Indexable
import base64
import email
import re
import time
import typing
from datetime import datetime, timedelta
from imaplib import IMAP4_SSL
from parsel import Selector
from playwright._impl._sync_base import EventInfo
from playwright.sync_api import (
sync_playwright,
BrowserContext,
Playwright,
Browser,
Page,
)
from playwright_stealth import stealth_sync
from constants import (
TWITTER_DOMAIN,
FIREFOX_PREFS,
IN_PROGRESS_STATUS,
SUCCESS_STATUS,
FAIL_STATUS,
)
class TwitterAccountManager:
def __init__(
self,
email_credentials: dict,
proxies: typing.Union[dict, None],
):
self.email_credentials = email_credentials
self.proxies = proxies
self.status = IN_PROGRESS_STATUS
for _ in range(3):
try:
self.status = self.register_account()
except BaseException:
self.status = FAIL_STATUS
@staticmethod
def create_playwright_client() -> Playwright:
return sync_playwright().start()
@staticmethod
def create_playwright_browser(
playwright_client: Playwright,
proxies: typing.Union[dict, None],
headless: bool,
) -> Browser:
return playwright_client.firefox.launch(
headless=headless,
proxy={'server': 'pre-context'} if proxies else None,
firefox_user_prefs=FIREFOX_PREFS,
)
@staticmethod
def create_playwright_context(
playwright_browser: Browser,
proxies: typing.Union[dict, None],
) -> BrowserContext:
return playwright_browser.new_context(
proxy={
'server': f'http://{proxies["url"]}:{proxies["port"]}',
'username': proxies["username"] or '',
'password': proxies["password"] or '',
} if proxies else None,
bypass_csp=True
)
@staticmethod
def create_playwright_page(
playwright_browser_context: BrowserContext,
) -> Page:
return playwright_browser_context.new_page()
@staticmethod
def get_date_from_string(
date_as_string: str,
) -> datetime:
try:
return datetime.strptime(date_as_string, '%d-%m-%Y')
except BaseException:
pass
try:
return datetime.strptime(date_as_string, '%Y-%m-%d')
except BaseException:
return datetime.strptime('01-01-1994', '%Y-%m-%d')
@staticmethod
def open_page(
playwright_page: Page,
url: str,
) -> None:
for retry_number in range(3):
try:
playwright_page.goto(
url,
timeout=10000,
)
except BaseException as e:
time.sleep(1)
if retry_number + 1 == 10:
raise e
@staticmethod
def close_popup_window(
playwright_page: Page,
) -> None:
for _ in range(5):
try:
time.sleep(1)
playwright_page.evaluate(
"document.getElementById('credential_picker_container').remove();"
)
except BaseException:
pass
@staticmethod
def press_signup_with_google_button(
playwright_page: Page,
) -> None:
try:
sign_in_button = playwright_page.wait_for_selector(
'xpath=//*[contains(@data-testid, "google_sign_in_container")]',
timeout=3000,
)
sign_in_button.click()
time.sleep(2)
except BaseException:
pass
def enter_login_and_password(
self,
playwright_page: Page,
popup_window: EventInfo,
) -> None:
try:
# click sign up/in button
load_more_button = playwright_page.wait_for_selector(
'xpath=//*[contains(@data-testid, "google_sign_in_container")]',
timeout=3000,
)
load_more_button.click()
time.sleep(2)
# enter login
popup_window.value.fill(
selector='xpath=//input',
value=self.email_credentials['email'],
)
time.sleep(1)
#confirm login
popup_window.value.wait_for_selector(
selector='xpath=//*[contains(@id, "Next")]//button',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
try:
# enter password
popup_window.value.fill(
selector='xpath=//input[@type="password"]',
value=self.email_credentials['password'],
)
time.sleep(1)
# confirm password
popup_window.value.wait_for_selector(
selector='xpath=//*[contains(@id, "Next")]//button',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
def bypass_recovery_email_confirmation(
self,
popup_window: EventInfo,
) -> None:
try:
# chose the "confirm recovery email" test
popup_window.value.wait_for_selector(
selector='xpath=//*[@data-challengetype="12"]',
timeout=3000,
).click()
time.sleep(2)
# input the recovery email
popup_window.value.fill(
selector='xpath=//input[@aria-label]',
value=self.email_credentials['recovery_email'],
)
time.sleep(1)
# confirm the recovery email
popup_window.value.wait_for_selector(
selector='xpath=//*[contains(@data-primary-action-label, "Next") or contains(@id, "Next")]//button',
timeout=3000,
).click()
time.sleep(2)
except BaseException:
pass
def bypass_email_confirmation(
self,
playwright_page: Page,
popup_window: EventInfo,
) -> None:
try:
# chose the recover email verification code test
popup_window.value.wait_for_selector(
selector='xpath=//*[@data-challengetype="30"]',
timeout=3000,
).click()
time.sleep(2)
# get the code
code = self.get_authentication_code()
# enter the code
popup_window.value.fill(
selector='xpath=//input[@aria-label]',
value=code,
)
# confirm the code
popup_window.value.wait_for_selector(
selector='xpath=//*[contains(@id, "Next")]//button',
timeout=3000,
).click()
time.sleep(2)
except BaseException as e:
pass
@staticmethod
def confirm_gmail_enter(
popup_window: EventInfo,
) -> None:
# Confirm enter as gmail user if needed
try:
popup_window.value.wait_for_selector(
'xpath=//*[contains(@id, "yes") or contains(@id, "Yes") and @role="button"]',
timeout=3000,
).click()
time.sleep(2)
except BaseException:
pass
@staticmethod
def skip_popup_offer_window(
popup_window: EventInfo,
) -> None:
# clicking Not Now button
try:
popup_window.value.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Next")]',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
@staticmethod
def skip_offer_window(
playwright_page: Page,
) -> None:
# clicking Not Now button
try:
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Next")]',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
def enter_birthday_information(
self,
playwright_page: Page,
) -> bool:
try:
# get available option for month
month_area_html = playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
timeout=3000,
).inner_html()
month_area_tree = Selector(month_area_html)
month_area_options = [option.strip() for option in month_area_tree.xpath('//option/text()').extract()]
# convert the birthday string into a datetime
birthday = self.get_date_from_string(self.email_credentials['birthday'])
# enter the birthday month
playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
timeout=2000,
).select_option(month_area_options[birthday.month - 1])
time.sleep(1)
# enter the birthday day
playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_2")]/../select',
timeout=2000,
).select_option(str(birthday.day))
time.sleep(1)
# enter the birthday year
playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_3")]/../select',
timeout=2000,
).select_option(str(birthday.year))
time.sleep(2)
# confirm birthday
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Next")]',
timeout=2000,
).click()
time.sleep(2)
return True
except BaseException:
return False
@staticmethod
def confirm_twitter_settings(
playwright_page: Page,
) -> None:
try:
# confirm twitter settings
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@id, "Next") or contains(@testid, "Next") '
'or @role="button"]/div/span/span',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
@staticmethod
def accept_default_nickname(
playwright_page: Page,
) -> None:
# skip the pre-nickname window if appears
try:
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Next")]',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
# confirm default nickname
try:
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Skip")]',
timeout=2000,
).click()
return
except BaseException:
pass
# confirm default nickname
try:
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@id, "Next") or contains(@testid, "Next") '
'or contains(@aria-label, "Next") or @role="button"]/div/span/span',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
@staticmethod
def reject_notifications(
playwright_page: Page,
) -> None:
try:
playwright_page.wait_for_selector(
selector='xpath=//*[@role="button" and not(@style)]',
timeout=2000,
).click()
time.sleep(2)
except BaseException:
pass
def check_operation_status(
self,
playwright_page: Page,
) -> str:
if 'twitter.com/home' in playwright_page.url:
return SUCCESS_STATUS
self.close_popup_window(
playwright_page=playwright_page,
)
try:
playwright_page.reload()
except BaseException:
pass
for _ in range(3):
try:
playwright_page.wait_for_selector(
selector='xpath=//*[@role="button" or self::button]',
timeout=2000,
).click()
playwright_page.reload()
break
except BaseException:
pass
if 'twitter.com/home' in playwright_page.url:
return SUCCESS_STATUS
self.open_page(
url='https://twitter.com/home',
playwright_page=playwright_page,
)
time.sleep(2)
if 'twitter.com/home' in playwright_page.url:
return SUCCESS_STATUS
return FAIL_STATUS
def change_the_account_name(
self,
playwright_page: Page,
) -> None:
try:
# open "Edit profile" page
self.open_page(
url='https://twitter.com/settings/profile',
playwright_page=playwright_page,
)
time.sleep(1)
# enter the name from credentials
playwright_page.fill(
selector='xpath=//input[contains(@name, "Name")]',
value=self.email_credentials['name'],
)
time.sleep(1)
# confirm the name changing
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Save")]',
timeout=3000,
).click()
time.sleep(2)
except BaseException:
pass
def change_the_account_birthday(
self,
playwright_page: Page,
) -> None:
try:
# open "Edit profile" page
self.open_page(
url='https://twitter.com/settings/profile',
playwright_page=playwright_page,
)
time.sleep(1)
# click "Edit birthday" button
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "ProfileBirthdate")]',
timeout=3000,
).click()
time.sleep(1)
# confirm the birthday edit intention
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "SheetConfirm")]',
timeout=3000,
).click()
time.sleep(1)
# get available option for month
month_area_html = playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
timeout=3000,
).inner_html()
month_area_tree = Selector(month_area_html)
month_area_options = [option.strip() for option in month_area_tree.xpath('//option/text()').extract()]
# convert the birthday string into a datetime
birthday = self.get_date_from_string(self.email_credentials['birthday'])
# enter the birthday month
playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_1")]/../select',
timeout=2000,
).select_option(month_area_options[birthday.month - 1])
time.sleep(1)
# enter the birthday day
playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_2")]/../select',
timeout=2000,
).select_option(str(birthday.day))
time.sleep(1)
# enter the birthday year
playwright_page.wait_for_selector(
selector='xpath=//label[contains(@id, "SELECTOR_3")]/../select',
timeout=2000,
).select_option(str(birthday.year))
time.sleep(1)
# confirm the birthday changing
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "Save")]',
timeout=3000,
).click()
time.sleep(1)
# confirm again if required
playwright_page.wait_for_selector(
selector='xpath=//*[contains(@data-testid, "SheetConfirm")]',
timeout=3000,
).click()
time.sleep(2)
except BaseException:
pass
def register_account(self) -> str:
playwright_client = self.create_playwright_client()
playwright_browser = self.create_playwright_browser(
playwright_client=playwright_client,
proxies=self.proxies,
headless=False,
)
playwright_browser_context = self.create_playwright_context(
playwright_browser=playwright_browser,
proxies=self.proxies,
)
playwright_page = self.create_playwright_page(
playwright_browser_context=playwright_browser_context,
)
stealth_sync(playwright_page)
# open the twitter register page
self.open_page(
playwright_page=playwright_page,
url=TWITTER_DOMAIN,
)
# close first popup window if appears
self.close_popup_window(
playwright_page=playwright_page,
)
# open browser context manager on event popup
with playwright_page.expect_popup() as popup_window:
# enter the email login and password
self.enter_login_and_password(
playwright_page=playwright_page,
popup_window=popup_window,
)
# bypass email verification if needed
self.bypass_recovery_email_confirmation(
popup_window=popup_window,
)
# skip popup offer window if needed
self.skip_popup_offer_window(
popup_window=popup_window,
)
# confirm enter as gmail user if needed
self.confirm_gmail_enter(
popup_window=popup_window,
)
# enter birthday information
birthday_entered = self.enter_birthday_information(
playwright_page=playwright_page,
)
# skip any offer window if needed
self.skip_offer_window(
playwright_page=playwright_page,
)
# confirm enter with default nickname
self.accept_default_nickname(
playwright_page=playwright_page,
)
# reject notifications if needed
self.reject_notifications(
playwright_page=playwright_page,
)
# check if sign up is completed
status = self.check_operation_status(
playwright_page=playwright_page,
)
if status != SUCCESS_STATUS:
raise Exception('Register failed')
# change the account name
self.change_the_account_name(
playwright_page=playwright_page,
)
if birthday_entered:
return status
# change the account birthday if it wasn't do
self.change_the_account_birthday(
playwright_page=playwright_page,
)
return status
def get_authentication_code(self) -> str:
click_datetime = datetime.utcnow()
imap = IMAP4_SSL(
host=self.email_credentials['recovery_email'].split('@')[-1],
port=993,
)
imap.login(
user=self.email_credentials['recovery_email'],
password=self.email_credentials['recovery_email_password'],
)
for _ in range(6):
time.sleep(10)
imap.select()
_, messages = imap.search(None, 'ALL')
message_id_list = messages[0].split()
for message_id in message_id_list[::-1]:
try:
_, message = imap.fetch(message_id, '(RFC822)')
message_bytes = message[0][1]
parsed_message = email.message_from_bytes(message_bytes)
message_date_str = parsed_message['Date']
message_from_str = parsed_message['From']
message_timestamp = time.mktime(email.utils.parsedate(message_date_str))
sender_name, sender_email = email.utils.parseaddr(message_from_str)
click_datetime = click_datetime.replace(microsecond=0)
message_datetime = datetime.fromtimestamp(message_timestamp).replace(microsecond=0)
if message_datetime >= click_datetime - timedelta(seconds=30) \
and sender_email == 'noreply@google.com':
message_text = str(parsed_message.get_payload()[0])
message_text = base64.b64decode(message_text.split('base64')[-1].strip())
message_text = message_text.decode().replace('\r', '').replace('\n', '')
code_list = re.findall(r'verification code is:.*?([0-9]+)', message_text)
if code_list:
return code_list[0].strip()
except BaseException:
pass
raise Exception('NO CONFIRMATION CODE')
if __name__ == '__main__':
credentials = {
'email': 'bogdannest071@gmail.com',
'recovery_email': 'keshabbista18@gmail.com',
'recovery_email_password': 'VjIeH6EvXWpF',
'password': 'sdkHB23d',
'name': 'Dataox Team',
'birthday': '14-06-1993',
}
twitter_account_manager = TwitterAccountManager(
email_credentials=credentials,
proxies={
'url': '91.225.13.33',
'port': '51523',
'username': 'alexandranedova0nPCH',
'password': 'FmcGqYgSvV',
},
# proxies=None,
)
Editor is loading...