Untitled
unknown
plain_text
2 years ago
96 kB
10
Indexable
from selenium.common.exceptions import StaleElementReferenceException, ElementClickInterceptedException
from selenium.webdriver.common.keys import Keys
from selenium.webdriver import ActionChains, Keys
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
import os
import sys
import time
import random
import csv
from dotenv import load_dotenv
from typing import List
import pytz
import datetime
from datetime import datetime
from datetime import date
from datetime import timedelta
import re
from selenium import webdriver
import numpy
from .html_parser import strip_tags
from .skutils import SkUtils
# from slack_automation.praiser_util import get_praise_statement
from .utils import SlackMessage
from .utils import PostTypes
from .utils import detect_skills
from .utils import join_strings,format_sorted_skills,sanitize_skills
from .open_ai import sort_skills
load_dotenv()
JOB_CHANNEL_FILTER_KEYS = os.getenv("CHANNEL_FILTER_KEYS")
INTRO_CHANNEL_FILTER_KEYS = os.getenv("INTRO_CHANNEL_KEYS")
FEEDBACK_CHANNEL_FILTER_KEYS = os.getenv("FEEDBACK_CHANNEL_KEYS")
PRODUCT_CHANNEL_FILTER_KEYS = os.getenv("PRODUCT_CHANNEL_KEYS")
MATERIAL_CHANNEL_KEYS = os.getenv("MATERIAL_CHANNEL_KEYS")
TALENT_LIST_URL = os.getenv("TALENT_PAGE_URL")
MESSAGE_FILTER_KEYS = os.getenv("MESSAGE_FILTER_KEYS")
class DMUtil:
def __init__(self, driver, ldUtil, dbManager):
self.driver = driver
self.ldUtil = ldUtil
self.dbManager = dbManager
self.dmCount = 0
async def sendDMToJobPosters(self):
try:
"""Getting messages"""
active_channel = None
while True:
channel_elements = self.get_channels()
if active_channel is None:
active_channel = channel_elements[0]
if active_channel == channel_elements[-1]:
break
for i in range(channel_elements.index(active_channel), len(channel_elements)):
channel_element = channel_elements[i]
active_channel = channel_element
try:
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
channel_name = channel_element.find_element(by=By.CSS_SELECTOR, value="span[class*=p-channel_sidebar__name]").text
print("################################# " + channel_name + " #################################")
await self.get_messages_from_channel(channel_element, channel_name)
except Exception as e:
continue
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
async def sendDMToChannels(self, workspace):
try:
"""Getting messages"""
active_channel = None
while True:
channel_elements = self.get_channels()
if active_channel is None:
active_channel = channel_elements[0]
if active_channel == channel_elements[-1]:
return
for i in range(channel_elements.index(active_channel), len(channel_elements)):
channel_element = channel_elements[i]
active_channel = channel_element
try:
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
channel_name = channel_element.find_element(by=By.CSS_SELECTOR, value="span[class*=p-channel_sidebar__name]").text
if any(key.strip() in channel_name for key in INTRO_CHANNEL_FILTER_KEYS.split(",")):
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
time.sleep(0.2)
self.driver.execute_script("arguments[0].click();", channel_element)
time.sleep(1)
if not self.dbManager.check_intro_exists(workspace):
print('################################# Sending intro to "' + channel_name + '" Channel #################################')
self.dbManager.add_intro_history(workspace)
await self.send_intro_dm()
time.sleep(5 + random.uniform(0, 0.5))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see the action directory
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_intro_dir = os.path.join(env_dir_path, 'sending-messages-to-channels')
os.makedirs(send_intro_dir, exist_ok=True)
time.sleep(5 + random.uniform(0, 0.5))
intro_element = self.driver.find_element(By.CLASS_NAME, "p-message_pane_message__message--last")
sender_id = intro_element.find_element(By.CLASS_NAME, "c-message__sender").get_attribute("data-stringify-text")
print("Sender ID: " + sender_id)
message_id = intro_element.find_element(By.CLASS_NAME, "c-timestamp").get_attribute("href")
print("Message ID: " + message_id)
timestamp_id = intro_element.find_element(By.CSS_SELECTOR, 'a.c-link.c-timestamp').get_attribute('aria-label')
print("Timestamp ID: " + timestamp_id)
time.sleep(5 + random.uniform(0, 0.5))
if not self.dbManager.check_intro_data_exists(workspace, channel_name, timestamp_id, sender_id, message_id):
self.dbManager.add_intro_data(workspace, channel_name, timestamp_id, sender_id, message_id)
# Scroll the message element into view and unfurl any links
actions = ActionChains(self.driver)
actions.move_to_element(intro_element).perform()
while True:
collapse_buttons = intro_element.find_elements(By.XPATH, "//button[@class='c-button-unstyled c-message_attachment__delete']")
if len(collapse_buttons) == 1:
break
# Click each collapse button
for button in collapse_buttons:
try:
# Check if the link preview is for a YouTube video
link_preview_element = button.find_element(By.XPATH, "../..")
youtube_link = link_preview_element.find_elements(By.XPATH, ".//a[contains(@href, 'youtube.com')]")
if youtube_link:
continue
button.click()
# Wait for the prompt to appear and click the "Remove" button
remove_button = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@data-qa='dialog_go']")))
remove_button.click()
except:
continue
screenshot_name = f'send-intro-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_intro_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# Take a first screenshot and save it in the see-the-action directory
screenshot_name = f'send-intro-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
time.sleep(5 + random.uniform(0, 0.5))
# screenshot_name = f'screenshot_send_intro_dm_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sending-messages-to-channels/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
elif any(key.strip() in channel_name for key in FEEDBACK_CHANNEL_FILTER_KEYS.split(",")):
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
time.sleep(0.2)
self.driver.execute_script("arguments[0].click();", channel_element)
time.sleep(1)
if not self.dbManager.check_feedback_exists(workspace):
print('################################# Sending Feedback dm to "' + channel_name + '" Channel #################################')
self.dbManager.add_feedback_history(workspace)
await self.send_feedback_dm()
time.sleep(3 + random.uniform(0, 2))
# screenshot_name = f'screenshot_send_feedback_dm_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sending-messages-to-channels/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
elif "product" in channel_name.lower() and "hunt" in channel_name.lower():
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
time.sleep(0.2)
self.driver.execute_script("arguments[0].click();", channel_element)
time.sleep(1)
if not self.dbManager.check_product_exists(workspace):
print('################################# Sending Product dm to "' + channel_name + '" Channel #################################')
self.dbManager.add_product_history(workspace)
await self.send_product_dm()
time.sleep(3 + random.uniform(0, 2))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see-the-action directory
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_product_dir = os.path.join(env_dir_path, 'sending-messages-to-channels')
os.makedirs(send_product_dir, exist_ok=True)
# get the feedback message in the channel and save it to the database
product_element = self.driver.find_element(By.CLASS_NAME, "p-message_pane_message__message--last")
sender_id = product_element.find_element(By.CLASS_NAME, "c-message__sender").get_attribute("data-stringify-text")
print("Sender ID: " + sender_id)
message_id = product_element.find_element(By.CLASS_NAME, "c-timestamp").get_attribute("href")
print("Message ID: " + message_id)
timestamp_id = product_element.find_element(By.CSS_SELECTOR, 'a.c-link.c-timestamp').get_attribute('aria-label')
print("Timestamp ID: " + timestamp_id)
time.sleep(5 + random.uniform(0, 0.5))
if not self.dbManager.check_product_data_exists(workspace, channel_name, timestamp_id, sender_id, message_id):
self.dbManager.add_product_data(workspace, channel_name, timestamp_id, sender_id, message_id)
screenshot_name = f'send-product-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_product_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
screenshot_name = f'send-product-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# screenshot_name = f'screenshot_send_product_dm_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sending-messages-to-channels/', screenshot_name)
# self.driver.save_screenshot(screenshot_
# path)
elif any(key.strip() in channel_name for key in JOB_CHANNEL_FILTER_KEYS.split(",")):
# return
# pass
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
time.sleep(0.2)
self.driver.execute_script("arguments[0].click();", channel_element)
time.sleep(1)
if self.dmCount <= 10:
print('################################# Searching for Client or Talent posts in "' + channel_name + '" Channel #################################')
await self.get_messages_from_channel(channel_element, channel_name)
time.sleep(3 + random.uniform(0, 2))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
get_messages_from_channel_dir = os.path.join(env_dir_path, 'channel-read')
os.makedirs(get_messages_from_channel_dir, exist_ok=True)
screenshot_name = f'channel-read-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(get_messages_from_channel_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# slackbot_element = self.driver.find_element(by=By.XPATH, value="//span[@data-qa='channel_sidebar_name_slackbot']")
# slackbot_element.click()
# screenshot_name = f'screenshot_get_messages_from_channel_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/channel-read-screenshots/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
elif any(key.strip() in channel_name for key in MATERIAL_CHANNEL_KEYS.split(",")):
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel_element)
time.sleep(0.2)
self.driver.execute_script("arguments[0].click();", channel_element)
time.sleep(1)
choice = numpy.random.choice(numpy.arange(0, 3), p=[0.5, 0.25, 0.25])
if not self.dbManager.check_random_exists(workspace):
print('################################# Sending Material dm to "' + channel_name + '" Channel #################################')
self.dbManager.add_random_history(workspace)
if choice == 0:
# pass
await self.send_dm_random_culture()
time.sleep(3 + random.uniform(0, 2))
elif choice == 1:
# pass
await self.send_dm_random_blog()
time.sleep(3 + random.uniform(0, 2))
elif choice == 2:
# pass
await self.send_dm_random_video()
time.sleep(3 + random.uniform(0, 2))
# get the random message in the channel and save it to the database
random_element = self.driver.find_element(By.CLASS_NAME, "p-message_pane_message__message--last")
sender_id = random_element.find_element(By.CLASS_NAME, "c-message__sender").get_attribute("data-stringify-text")
print("Sender ID: " + sender_id)
message_id = random_element.find_element(By.CLASS_NAME, "c-timestamp").get_attribute("href")
print("Message ID: " + message_id)
timestamp_id = random_element.find_element(By.CSS_SELECTOR, 'a.c-link.c-timestamp').get_attribute('aria-label')
print("Timestamp ID: " + timestamp_id)
time.sleep(5 + random.uniform(0, 0.5))
if not self.dbManager.check_random_data_exists(workspace, channel_name, timestamp_id, sender_id, message_id):
self.dbManager.add_random_data(workspace, channel_name, timestamp_id, sender_id, message_id)
else:
pass
except Exception as e:
continue
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
def checkDMResponse(self):
channels = self.get_dm_channels()
active_channel = None
with open("dm_data.csv", "a", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
writer.writerow(["Channel Name", "Message Content", "Reaction Types", "Link"])
for channel in channels:
active_channel = channel
try:
self.driver.execute_script("arguments[0].scrollIntoView(true);", channel)
channel_name = channel.find_element(by=By.CSS_SELECTOR, value="span[class*=p-channel_sidebar__name]").text
print("****************** Channel name:" + channel_name + " ***************************")
webdriver.ActionChains(self.driver).move_to_element(channel).click(channel).perform()
# time.sleep(1)
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, "//div[@data-qa='message_input']")))
time.sleep(5 + random.uniform(0, 0.5))
time_tags = self.get_time_tags()
if len(time_tags) > 0:
time_tags[-1].click()
time.sleep(1 + random.uniform(0, 0.5))
jump_to_dropdown = self.driver.find_element(by=By.XPATH, value="//div[@data-qa='menu_items']")
jump_elements = jump_to_dropdown.find_elements(by=By.CLASS_NAME, value="c-menu_item__li")
jump_to_specific_date = jump_elements[-2]
webdriver.ActionChains(self.driver).move_to_element(jump_to_specific_date).click(jump_to_specific_date).perform()
time.sleep(1 + random.uniform(0, 0.5))
message_panel = self.driver.find_element(by=By.CLASS_NAME, value="c-message_list")
focus_message = None
messages = message_panel.find_elements(by=By.XPATH, value="//div[(@class='c-virtual_list__item') and (@tabindex='-1')]")
existing_responses = []
with open("dm_data.csv", "r", newline="", encoding="utf-8") as csvfile:
reader = csv.reader(csvfile)
next(reader) # skip the header row
for row in reader:
existing_responses.append(row[1]) # assume that message content is stored in the second column
for message in messages:
# message_links = []
avatar = message.find_elements(by=By.CLASS_NAME, value="c-avatar")
if len(avatar) > 0:
focus_message = message
break
print("********************** Checking Messages ************************")
filter_completed = False
dm_detected = False
while not filter_completed:
messages = message_panel.find_elements(by=By.XPATH, value="//div[(@class='c-virtual_list__item') and (@tabindex='-1')]")
start_index = messages.index(focus_message) if focus_message is not None else 0
message_element = messages[start_index]
reaction_detected = False
reaction_type = []
self.driver.execute_script("arguments[0].scrollIntoView(true);", messages[start_index])
time.sleep(0.2)
# message_links = []
message_links = []
message_content = ""
message_content += messages[start_index].text
# message_content += strip_tags(messages[start_index].get_attribute("innerHTML"))
for link in messages[start_index].find_elements(by=By.TAG_NAME, value="a"):
message_links.append(link.get_attribute("href"))
if len(message_element.find_elements(by=By.XPATH, value=".//div[@data-qa='reaction_bar']")) > 0:
reaction_detected = True
reactions = message_element.find_element(by=By.XPATH, value=".//div[@data-qa='reaction_bar']").find_elements(by=By.TAG_NAME, value="img")
for reaction in reactions:
reaction_type.append(reaction.get_attribute("aria-label"))
time.sleep(0.2 + random.uniform(0, 0.5))
if message_element == messages[-1]:
filter_completed = True
for message in messages[start_index + 1:]:
avatar = message.find_elements(by=By.CLASS_NAME, value="c-avatar")
if len(avatar) == 0:
# strip_tags(message.get_attribute('innerHTML'))
message_content += message.text
# message_content += strip_tags(message.get_attribute("innerHTML"))
for link in message.find_elements(by=By.TAG_NAME, value="a"):
message_links.append(link.get_attribute("href"))
if message == messages[-1]:
filter_completed = True
else:
focus_message = message
break
if len(message.find_elements(by=By.XPATH, value=".//div[@data-qa='reaction_bar']")) > 0:
reaction_detected = True
reactions = message_element.find_element(by=By.XPATH, value=".//div[@data-qa='reaction_bar']").find_elements(by=By.TAG_NAME, value="img")
for reaction in reactions:
reaction_type.append(reaction.get_attribute("aria-label"))
# print('############################### Message Content: ' + message_content + ' #####################################')
if dm_detected:
# self.driver.save_screenshot("screenshot.png")
# if not self.dbManager.check_dm_response_history(",".join(message_links)):
# self.dbManager.add_dm_response_history(",".join(message_links)) # add response to response history
print(
"******************************** \n CMO/CEO/Marketer should respond to this potential client who has written back: \n{message}\n *************************************".format(
message=message_content
)
)
# filter_completed = True
# break
# if dm_detected:
if message_content not in existing_responses:
with open("dm_data.csv", "a", newline="") as csvfile:
writer = csv.writer(csvfile)
writer.writerow([channel_name, message_content, ", ".join(reaction_type), ",".join(message_links)])
existing_responses.append(message_content)
filter_completed = True
break
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-reading-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see-the-action directory
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
response_parse_dir = os.path.join(env_dir_path, 'response-parse')
os.makedirs(response_parse_dir, exist_ok=True)
# if "P.S. If instead you are looking for work you can also join" in message_content:
if "New" in message_content:
time.sleep(5 + random.uniform(0, 0.5))
# workspace = self.driver.find_element(by=By.XPATH, value="//span[@class='p-ia__sidebar_header__team_name_text']").text
workspace = self.driver.find_element(by=By.XPATH, value="//span[contains(@class, 'p-ia__sidebar_header__team_name_text') or contains(@class, 'p-ia4_home_header_menu__team_name')]").text
print("Workspace: " + workspace)
dm_element = self.driver.find_element(By.CLASS_NAME, "c-message_kit__gutter__right")
sender_id = dm_element.find_element(By.CSS_SELECTOR, "button[data-qa='message_sender_name']").text
print("Sender ID: " + sender_id)
message_id = dm_element.find_element(By.CLASS_NAME, "c-link").get_attribute("href")
print("Message ID: " + message_id)
timestamp_id = dm_element.find_element(By.CSS_SELECTOR, 'a.c-link.c-timestamp').get_attribute('aria-label')
print("Day / Time: " + timestamp_id)
if not self.dbManager.check_dm_response_history(workspace, sender_id, message_id, timestamp_id):
self.dbManager.add_dm_response_history(workspace, sender_id, message_id, timestamp_id) # add response to response history
screenshot_name = f'response-parse-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(response_parse_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
screenshot_name = f'response-parse-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# screenshot_name = f'screenshot_dm_response_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/responseparse-script-screenshots/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
dm_detected = True
if reaction_detected:
print(
"******************************** \n CMO/CEO/Marketer should note that this potential client gave an emoji response: \n{types}\n *************************************".format(
types=", ".join(reaction_type)
)
)
message_timestamp = int(message_element.get_attribute("data-item-key").split(".")[0])
message_date = datetime.fromtimestamp(message_timestamp).date()
today = date.today()
week_ago = today - timedelta(days=7)
if message_date <= week_ago and message_element == messages[-1]:
self.send_reminder_dm(channel_name)
time.sleep(0.5 + random.uniform(0, 0.5))
except Exception as e:
continue
def add_dm_link(self, text, url):
link_composer_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='link-composer-button']")
self.driver.execute_script("arguments[0].click();", link_composer_button)
time.sleep(0.1 + random.uniform(0, 0.5))
link_text = self.driver.find_element(by=By.ID, value="composer-link-modal-text")
link_text.send_keys(text)
link_value = self.driver.find_element(by=By.ID, value="composer-link-modal-url")
link_value.send_keys(url)
time.sleep(0.1 + random.uniform(0, 0.5))
save_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='dialog_go']")
self.driver.execute_script("arguments[0].click();", save_button)
def add_bold_text(self, message_input_field, text):
bold_composer_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='bold-composer-button']")
self.driver.execute_script("arguments[0].click();", bold_composer_button)
message_input_field.send_keys(text)
self.driver.execute_script("arguments[0].click();", bold_composer_button)
def add_italic_text(self, message_input_field, text):
italic_composer_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='italic-composer-button']")
self.driver.execute_script("arguments[0].click();", italic_composer_button)
message_input_field.send_keys(text)
self.driver.execute_script("arguments[0].click();", italic_composer_button)
def get_channels(self) -> List[WebElement]:
channel_elements = self.driver.find_elements(by=By.CSS_SELECTOR, value="div[data-qa-channel-sidebar-channel-type*=channel]")
return channel_elements
def get_dm_channels(self) -> List[WebElement]:
channel_elements = self.driver.find_elements(by=By.XPATH, value="//div[@data-qa-channel-sidebar-channel-type='im']")
channel_elements = list(filter(lambda channel: channel.get_attribute("data-section-channel-index") != "0", channel_elements))
return channel_elements
async def send_intro_dm(self):
try:
WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("Hi, I am Castro, and I look forward to learning a lot from this group. I’m a developer at ")
self.add_dm_link(text="ldtalentwork.com", url="http://ldtalentwork.com/")
time.sleep(0.1 + random.uniform(0, 0.5))
message_input_field.send_keys("— ")
message_input_field.send_keys("a project born out of Stanford University. It’s a tool that helps you find diverse vetted developers and helps you manage them in an efficient and transparent way.")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_bold_text(message_input_field=message_input_field, text="Free Trial: ")
message_input_field.send_keys("It gives you ~5 free hours to try them before hiring them for contract or full-time roles. To see how incredibly unique LD is, check out this video:")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("https://www.youtube.com/watch?v=EVAo8LPCZmA.")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_bold_text(message_input_field=message_input_field, text="Mission: ")
message_input_field.send_keys("To strengthen remote teams by helping them drop location constraints and include qualified but overlooked talent from the majority world: ")
self.add_italic_text(message_input_field=message_input_field, text="diversity meets capitalism.")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("Here is our one pager: https://bit.ly/3ShMLJZ. We are always looking for brutally honest feedback :)")
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
pass
async def send_feedback_dm(self):
try:
WebDriverWait(self.driver, float("inf")).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("Seeking feedback from teams who want to try ")
self.add_dm_link(text="LD Talent", url="http://ldtalentwork.com/")
message_input_field.send_keys("— ")
message_input_field.send_keys(
"a project born out of Stanford University. It’s a tool that helps you find vetted, diverse, and highly productive developers. The platform gives you ~5 free hours to try them before hiring them for contract or full-time roles. Build your MVP, v2, etc. ("
)
self.add_dm_link(text="appointments calendar", url="http://ldtalentwork.com/contact/")
message_input_field.send_keys(Keys.BACKSPACE)
message_input_field.send_keys("). To see how incredibly unique LD is, check out this ")
# message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_dm_link(text="video", url="http://youtube.com/watch?v=EVAo8LPCZmA")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("Diversity meets Capitalism: Our mission is to help teams drop location constraints that unknowingly and implicitly overlook qualified, motivated, and ")
self.add_dm_link(text="efficient", url="http://ldtalentwork.com/")
message_input_field.send_keys(
"but underrepresented people from roles they’re perfect for. Given the rise of remote distributed teams in the new normal, we believe that global collaborations are the future."
)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see the action directory
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_feedback_dir = os.path.join(env_dir_path, 'sending-messages-to-channels')
os.makedirs(send_feedback_dir, exist_ok=True)
# Find the message element
# workspace = self.driver.find_element(by=By.XPATH, value="//span[@class='p-ia__sidebar_header__team_name_text']").text
workspace = self.driver.find_element(by=By.XPATH, value="//span[contains(@class, 'p-ia__sidebar_header__team_name_text') or contains(@class, 'p-ia4_home_header_menu__team_name')]").text
channel_name = self.driver.find_element(by=By.CSS_SELECTOR, value="span[class*='p-view_header__channel_title p-view_header__truncated_text']").text
# get the feedback message in the channel and save it to the database
feedback_element = self.driver.find_element(By.CLASS_NAME, "p-message_pane_message__message--last")
timestamp_id = feedback_element.find_element(By.CSS_SELECTOR, 'a.c-link.c-timestamp').get_attribute('aria-label')
print("Timestamp ID: " + timestamp_id)
sender_id = feedback_element.find_element(By.CLASS_NAME, "c-message__sender").get_attribute("data-stringify-text")
print("Sender ID: " + sender_id)
message_id = feedback_element.find_element(By.CLASS_NAME, "c-timestamp").get_attribute("href")
print("Message ID: " + message_id)
time.sleep(5 + random.uniform(0, 0.5))
if not self.dbManager.check_feedback_data_exists(workspace, channel_name, timestamp_id, sender_id, message_id):
self.dbManager.add_feedback_data(workspace, channel_name, timestamp_id, sender_id, message_id)
# Scroll the message element into view and unfurl any links
actions = ActionChains(self.driver)
actions.move_to_element(feedback_element).perform()
while True:
collapse_buttons = feedback_element.find_elements(By.XPATH, "//button[@class='c-button-unstyled c-message_attachment__delete']")
if len(collapse_buttons) == 1:
break
# Click each collapse button
for button in collapse_buttons:
try:
# Check if the link preview is for a YouTube video
link_preview_element = button.find_element(By.XPATH, "../..")
youtube_link = link_preview_element.find_elements(By.XPATH, ".//a[contains(@href, 'youtube.com')]")
if youtube_link:
continue
button.click()
# Wait for the prompt to appear and click the "Remove" button
remove_button = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@data-qa='dialog_go']")))
remove_button.click()
except:
continue
screenshot_name = f'send-feedback-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_feedback_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
screenshot_name = f'send-feedback-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
self.driver.back()
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
async def send_product_dm(self):
try:
WebDriverWait(self.driver, float("inf")).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("We launched ")
self.add_dm_link(text="https://www.producthunt.com/posts/ld-talent", url="https://www.producthunt.com/posts/ld-talent")
message_input_field.send_keys(". ")
message_input_field.send_keys("We would love all of your votes and feedback.")
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
async def send_dm(self, channel_name, sender_name, _skills, current_url,msg:SlackMessage):
try:
# open talent pick page
self.driver.execute_script("window.open('');")
self.driver.switch_to.window(self.driver.window_handles[1])
self.driver.get(TALENT_LIST_URL)
time.sleep(0.1)
# skills = SkUtils.sort_skills_by_rate(_skills,self.ldUtil.skillRateList)
jb = dict()
jb['skills'] = ",".join(_skills)
jb['description'] = msg.body
str_sorted_skills = sort_skills(jb)
# Get skills from comma separated list
sorted_skills_unf = [sk for sk in str_sorted_skills.split(",")][:5]
# Some skills like nodejs are inadvently renamed to node.js, handle this
sorted_skills_fmt = sanitize_skills(sorted_skills_unf,self.ldUtil.skillList)
# Format for legacy find developers
sorted_skills = [{'skill':sk} for sk in sorted_skills_fmt]
(top_3_developers,shareable_link) = SkUtils.find_top3_devs(self.driver,sorted_skills)
time.sleep(0.1)
# driver.get(current_url)
time.sleep(1)
self.driver.switch_to.window(self.driver.window_handles[-1])
self.driver.close()
self.driver.switch_to.window(self.driver.window_handles[0])
time.sleep(1)
# send_message_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='member_profile_message_btn']")
# self.driver.execute_script("arguments[0].click();", send_message_button)
# self.driver.back()
WebDriverWait(self.driver, float("inf")).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("Hi ")
message_input_field.send_keys("@" + sender_name)
message_input_field.send_keys(" "+msg.statement_of_praise+" ")
# poster_element_list = self.driver.find_element(by=By.ID, value="chat_input_tab_ui")
# poster_element = poster_element_list.find_elements(by=By.TAG_NAME, value="li")[0]
# self.driver.execute_script("arguments[0].click();", poster_element)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
if msg.type == PostTypes.HIRING:
message_input_field.send_keys("Regarding your post in #" + channel_name + ", ")
time.sleep(0.1 + random.uniform(0, 0.5))
self.add_dm_link(text="these candidates", url=shareable_link)
message_input_field.send_keys("may be relevant to your search for ")
if _skills == ["ux design", "aws", "django", "react", "python"]:
message_input_field.send_keys("collaborators")
else:
message_input_field.send_keys("collaborators who know / do ")
# ld_skills = []
# for skill_item in self.ldUtil.skillList:
# if skill_item["key"] in _skills[:5]:
# ld_skills.append(skill_item["text"])
# message_input_field.send_keys(", ".join(ld_skills))
rskills = format_sorted_skills(sorted_skills,self.ldUtil.skillList)
ld_skills = [sk['skill'] for sk in rskills]
message_input_field.send_keys(join_strings(ld_skills))
message_input_field.send_keys(".")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("I would recommend inviting ")
for top_developer in top_3_developers:
time.sleep(0.5 + random.uniform(0, 0.5))
self.add_dm_link(text=top_developer["name"].strip(), url=top_developer["link"])
message_input_field.send_keys(Keys.BACKSPACE)
if top_developer["name"] == top_3_developers[-1]["name"]:
message_input_field.send_keys(".")
elif top_developer["name"] == top_3_developers[-2]["name"]:
message_input_field.send_keys(" and ")
else:
message_input_field.send_keys(", ")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
if msg.type == PostTypes.SEEKING:
message_input_field.send_keys("If you are looking for work, send your resume to info@ldtalentwork.com")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("LD Talent is a future of work platform that has helped 100+ remote teams quickly find vetted, motivated, and productive talent.")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
if msg.type == PostTypes.HIRING:
message_input_field.send_keys("Teams hiring on LD Talent include startups funded by Y Combinator and 500 Startups, tech giants like Baidu and Infosys, and universities like Yale and Northwestern.")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
if msg.type == PostTypes.SEEKING:
self.add_bold_text(message_input_field=message_input_field, text="P.S. ")
message_input_field.send_keys("If instead you are looking for collaborators, ")
self.add_dm_link(text="these candidates", url=shareable_link)
message_input_field.send_keys(" may be relevant to your search. ")
message_input_field.send_keys("I would recommend inviting ")
for top_developer in top_3_developers:
time.sleep(0.5 + random.uniform(0, 0.5))
self.add_dm_link(text=top_developer["name"].strip(), url=top_developer["link"])
message_input_field.send_keys(Keys.BACKSPACE)
if top_developer == top_3_developers[-1]:
message_input_field.send_keys(".")
elif top_developer == top_3_developers[-2]:
message_input_field.send_keys(" and ")
else:
message_input_field.send_keys(", ")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_bold_text(message_input_field=message_input_field, text="Free Trial: ")
message_input_field.send_keys("It gives you ")
self.add_dm_link(text="~5 free hours", url="http://ldtalent.org/pricing/")
message_input_field.send_keys(" to try them before hiring them for contract or full-time roles. To see how incredibly unique LD is, check out this ")
self.add_dm_link(text="video", url="https://www.youtube.com/watch?v=EVAo8LPCZmA")
message_input_field.send_keys(Keys.BACKSPACE)
message_input_field.send_keys(".")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_bold_text(message_input_field=message_input_field, text="Mission: ")
message_input_field.send_keys("To strengthen remote teams by helping them drop location constraints and include qualified but overlooked talent from the majority world: diversity meets capitalism.")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_bold_text(message_input_field=message_input_field, text="Contact: ")
message_input_field.send_keys("If you have questions, you can schedule a call with our founders, a team out of Stanford University in California - ")
time.sleep(0.1 + random.uniform(0, 0.5))
self.add_dm_link(text="ldtalentwork.com/contact/", url="http://ldtalentwork.com/contact/")
message_input_field.send_keys(Keys.BACKSPACE)
message_input_field.send_keys(".")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
if msg.type == PostTypes.HIRING:
self.add_bold_text(message_input_field=message_input_field, text="P.S. ")
message_input_field.send_keys("If instead you are looking for work you can also join ")
self.add_dm_link(text="ldtalentwork.com/talent/", url="http://ldtalentwork.com/talent/")
message_input_field.send_keys(
"for remote full-time / part-time work, lifelong learning incentives, peer feedback, and venture funding for your own ideas. For this, you can send your resumé to info@ldtalentwork.com"
)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
self.add_dm_link(text="PM Service", url='https://www.ldtalentwork.com/freelancer/profile/?erid=gobid')
message_input_field.send_keys(" · ")
self.add_dm_link(text="Join our Investors", url='https://wefunder.com/ldtalentwork/')
time.sleep(0.5 + random.uniform(0, 0.5))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see-the-action directory
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_dm_dir = os.path.join(env_dir_path, 'sender-script')
os.makedirs(send_dm_dir, exist_ok=True)
screenshot_name = f'screenshot_senddm_to_posters_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sender-script-screenshots/', screenshot_name)
self.driver.save_screenshot(screenshot_path)
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
time.sleep(0.5 + random.uniform(0, 0.5))
dm_element = self.driver.find_element(By.CLASS_NAME, "c-message_kit__gutter__right")
time.sleep(0.5 + random.uniform(0, 0.5))
# Find the message element
# workspace = self.driver.find_element(by=By.XPATH, value="//span[@class='p-ia__sidebar_header__team_name_text']").text
workspace = self.driver.find_element(by=By.XPATH, value="//span[contains(@class, 'p-ia__sidebar_header__team_name_text') or contains(@class, 'p-ia4_home_header_menu__team_name')]").text
# Extract the name
# receiver_id = dm_element.find_element(By.CSS_SELECTOR, "a[data-member-label*='@']").text
receiver_id = sender_name
# Extract the link
message_id = dm_element.find_element(By.CLASS_NAME, "c-link").get_attribute("href")
# Extract the time
timestamp_elem = dm_element.find_element(By.CLASS_NAME, "c-timestamp__label")
timestamp_str = timestamp_elem.text
# # format the time string with today's date
now = datetime.now()
date_str = now.strftime("%Y-%m-%d")
timestamp_id = f"{date_str} {timestamp_str}"
time.sleep(0.5 + random.uniform(0, 0.5))
if not self.dbManager.check_dm_exists(workspace, receiver_id, message_id, timestamp_id):
self.dbManager.add_dm_history(workspace, receiver_id, message_id, timestamp_id)
# Scroll the message element into view
actions = ActionChains(self.driver)
actions.move_to_element(dm_element).perform()
time.sleep(8)
while True:
collapse_buttons = dm_element.find_elements(By.XPATH, "//button[@class='c-button-unstyled c-message_attachment__delete']")
if len(collapse_buttons) == 0:
break
# Click each collapse button
# for button in collapse_buttons:
# try:
# button.click()
button = collapse_buttons[0]
self.driver.execute_script("arguments[0].click();", button)
# Wait for the prompt to appear and click the "Remove" button
remove_button = WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@data-qa='dialog_go']")))
remove_button.click()
time.sleep(3)
# except:
# # pass
# continue
screenshot_name = f'sender-script-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_dm_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
screenshot_name = f'sender-script-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
time.sleep(0.5 + random.uniform(0, 0.5))
self.driver.back()
time.sleep(0.5 + random.uniform(0, 0.5))
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
async def send_dm_random_culture(self):
try:
random_culture_piece = self.ldUtil.getRandomCulturePiece()
WebDriverWait(self.driver, float("inf")).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("I wanted to share and get feedback on this cultural piece we wrote: ")
self.add_dm_link(text=random_culture_piece, url=random_culture_piece)
message_input_field.send_keys(". ")
time.sleep(0.5 + random.uniform(0, 0.5))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a new directory for the "see-the-action" screenshots
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_random_dir = os.path.join(env_dir_path, 'sending-messages-to-channels')
os.makedirs(send_random_dir, exist_ok=True)
# screenshot_name = f'screenshot_send_random_culture_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sending-messages-to-channels/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
time.sleep(3)
screenshot_name = f'send-random-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_random_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# Take a first screenshot and save it in the see-the-action directory
screenshot_name = f'send-random-message-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
async def send_dm_random_blog(self):
try:
random_blog_piece = self.ldUtil.getRandomBlogPiece()
WebDriverWait(self.driver, float("inf")).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("I wanted to share this and get some feedback: ")
self.add_dm_link(text=random_blog_piece, url=random_blog_piece)
message_input_field.send_keys(". ")
time.sleep(0.5 + random.uniform(0, 0.5))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a new directory for the "see-the-action" screenshots
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_blog_dir = os.path.join(env_dir_path, 'sending-messages-to-channels')
os.makedirs(send_blog_dir, exist_ok=True)
# screenshot_name = f'screenshot_send_random_blog_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sending-messages-to-channels/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
time.sleep(3)
screenshot_name = f'send-random-blog-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_blog_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# Take a first screenshot and save it in the see-the-action directory
screenshot_name = f'send-random-blog-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
async def send_dm_random_video(self):
try:
random_video = self.ldUtil.getRandomVideo()
WebDriverWait(self.driver, float("inf")).until(EC.visibility_of_element_located((By.XPATH, "//div[@contenteditable='true']")))
time.sleep(0.5)
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("I wanted to share this video and get some feedback: ")
self.add_dm_link(text=random_video, url=random_video)
message_input_field.send_keys(". ")
time.sleep(0.5 + random.uniform(0, 0.5))
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a new directory for the "see-the-action" screenshots
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
send_video_dir = os.path.join(env_dir_path, 'sending-messages-to-channels')
os.makedirs(send_video_dir, exist_ok=True)
# screenshot_name = f'screenshot_send_random_video_{ time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())}.png'
# screenshot_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/sending-messages-to-channels/', screenshot_name)
# self.driver.save_screenshot(screenshot_path)
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
time.sleep(1)
screenshot_name = f'send-random-video-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(send_video_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# Take a first screenshot and save it in the see-the-action directory
screenshot_name = f'send-random-video-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
def send_reminder_dm(self, name):
try:
message_input_field = self.driver.find_element(by=By.XPATH, value="//div[@contenteditable='true']")
message_input_field.clear()
message_input_field.send_keys("Hi " + name + ", when you get the chance, I would love your honest feedback on the suggested collaborators above. i.e. Is the UX clear? Is the data sufficient?")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("Given your background, we want to build something that could be useful to you. If not you, could it be useful to a friend?")
message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
# message_input_field.send_keys(Keys.CONTROL, Keys.ENTER)
message_input_field.send_keys("In general, I’m interested in hearing about the pain points you face when looking for relevant collaborators.")
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.send_keys(Keys.ENTER)
time.sleep(0.5 + random.uniform(0, 0.5))
message_input_field.clear()
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
def get_time_tags(self) -> List[WebElement]:
time_tags = self.driver.find_elements(by=By.XPATH, value="//button[@aria-label='Jump to date']")
return time_tags
def get_date_buttons_on_calendar(self) -> List[WebElement]:
date_buttons = self.driver.find_elements(by=By.CLASS_NAME, value="c-date_picker_calendar__date")
return date_buttons
def get_timestamp_from_time_tag(self, time_tag) -> int:
timestamp = int(time_tag.find_element(by=By.XPATH, value="..").find_element(by=By.XPATH, value="..").get_attribute("data-item-key").split(".")[0])
return int(timestamp / 1000)
def check_timezone(self, local_time):
# check US time zone
us_timezones = pytz.country_timezones["us"]
for timezone_name in us_timezones:
timezone = pytz.timezone(timezone_name)
current_time = datetime.now(timezone)
local_time = current_time.replace(hour=local_time.hour, minute=local_time.minute, second=local_time.second)
minus_5_min = current_time - timedelta(minutes=5)
plus_5_min = current_time + timedelta(minutes=5)
if minus_5_min.timestamp() < local_time.timestamp() < plus_5_min.timestamp():
print("####################### US Timezone ###########################")
return True
# check UK time zone
uk_timezones = pytz.country_timezones["gb"]
for timezone_name in uk_timezones:
timezone = pytz.timezone(timezone_name)
current_time = datetime.now(timezone)
local_time = current_time.replace(hour=local_time.hour, minute=local_time.minute, second=local_time.second)
minus_5_min = current_time - timedelta(minutes=5)
plus_5_min = current_time + timedelta(minutes=5)
if minus_5_min.timestamp() < local_time.timestamp() < plus_5_min.timestamp():
print("####################### UK Timezone ###########################")
return True
# check Canada time zone
canada_timezones = pytz.country_timezones["ca"]
for timezone_name in canada_timezones:
timezone = pytz.timezone(timezone_name)
current_time = datetime.now(timezone)
local_time = current_time.replace(hour=local_time.hour, minute=local_time.minute, second=local_time.second)
minus_5_min = current_time - timedelta(minutes=5)
plus_5_min = current_time + timedelta(minutes=5)
if minus_5_min.timestamp() < local_time.timestamp() < plus_5_min.timestamp():
print("####################### Canada Timezone ###########################")
return True
# check Australia time zone
australia_timezones = pytz.country_timezones["au"]
for timezone_name in australia_timezones:
timezone = pytz.timezone(timezone_name)
current_time = datetime.now(timezone)
local_time = current_time.replace(hour=local_time.hour, minute=local_time.minute, second=local_time.second)
minus_5_min = current_time - timedelta(minutes=5)
plus_5_min = current_time + timedelta(minutes=5)
if minus_5_min.timestamp() < local_time.timestamp() < plus_5_min.timestamp():
print("####################### Australia Timezone ###########################")
return True
# check New Zealand time zone
new_zealand_timezones = pytz.country_timezones["nz"]
for timezone_name in new_zealand_timezones:
timezone = pytz.timezone(timezone_name)
current_time = datetime.now(timezone)
local_time = current_time.replace(hour=local_time.hour, minute=local_time.minute, second=local_time.second)
minus_5_min = current_time - timedelta(minutes=5)
plus_5_min = current_time + timedelta(minutes=5)
if minus_5_min.timestamp() < local_time.timestamp() < plus_5_min.timestamp():
print("####################### New Zealand Timezone ###########################")
return True
return False
async def filter_message(self, channel_name, message_element, message, post_links):
try:
#if re.search("[0-9]+[:][0-9]+ [AP]Mjoined #", message):
# return False
#if "U.S" in message or "US Only" in message or "US-based" in message or "United States" in message or "United Kingdom" in message or "UK" in message or "Australia" in message or "Canada" in message or "New Zealand" in message or "local" in message:
# return False
#if (
# "FTE" in message
# or "employ" in message
# or "PTO" in message
# or "paid time off" in message
# or "401K" in message
# or "recruit" in message
# or "benefit" in message
# or "health" in message
# or "dental" in message
# or "insurance" in message
# or "vision" in message
#):
# return False
#if "agency" in message or "third party" in message:
# return False
#if "firm" in message:
# return False
#if "office" in message or "on-site" in message or "on site" in message or "relocate" in message or "in person" in message or "in-person" in message:
# return False
message_filter_keys = (_key.strip() for _key in MESSAGE_FILTER_KEYS.split(","))
if any(key in message.lower() for key in message_filter_keys):
# topic = classify_topic(message)
# if "yes" in topic.lower():
sender_name_element = message_element.find_element(by=By.XPATH, value=".//button[@data-qa='message_sender_name']")
time.sleep(0.5)
sender_id = sender_name_element.get_attribute("data-message-sender")
message_id = message_element.get_attribute("id")
# check if post already exists in the database
if self.dbManager.check_post_exists(sender_id, message_id):
return False
self.driver.execute_script("arguments[0].click();", sender_name_element)
time.sleep(0.5)
sender_name = sender_name_element.text
time.sleep(2)
sender_local_time_str_container = self.driver.find_element(by=By.XPATH, value="//span[@data-qa='member_profile_field__local_time']")
sender_local_time_str = sender_local_time_str_container.find_element(by=By.XPATH, value=".//span[@class='p-local_time__text']")
sender_local_time_str = sender_local_time_str.text.replace("local time", "").strip()
sender_local_time_str_match = re.search("[0-9]+[:][0-9]+ [AP]M", sender_local_time_str)
sender_local_time_str = sender_local_time_str_match.group(0)
sender_local_time = datetime.strptime(sender_local_time_str, "%I:%M %p")
# send_message_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='member_profile_message_btn']")
slack_message = SlackMessage(message)
for link in post_links:
try:
op1 = webdriver.ChromeOptions()
op1.add_argument("headless")
op1.add_argument("--start-maximized")
link_driver = webdriver.Chrome(options=op1)
link_driver.get(link)
slack_message.body += "\n"
slack_message.body += link_driver.find_element(by=By.TAG_NAME, value="body").text
slack_message.body += "\n"
link_driver.quit()
except Exception as ex:
print(ex)
continue
if len(slack_message.body.split()) > 1500:
return False
# use all text to find type
slack_message.type
print(f"=========== This post is of type========={slack_message.type}")
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see the action directory
see_all_posts_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-all-posts')
os.makedirs(see_all_posts_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
actual_job_post_dir = os.path.join(env_dir_path, 'all-job-posts')
os.makedirs(actual_job_post_dir, exist_ok=True)
screenshot_name = f'sender-script-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(actual_job_post_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
screenshot_name = f'sender-script-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_all_posts_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
if slack_message.type==PostTypes.NEITHER:
return False
# if self.check_timezone(sender_local_time) or slack_message.type==PostTypes.SEEKING:
if self.check_timezone(sender_local_time) or slack_message.type in [PostTypes.HIRING, PostTypes.SEEKING]:
# trigger fetch statement of praise
slack_message.statement_of_praise
# detected_skills = SkUtils.detect_skills(slack_message.body,self.ldUtil.skillList)
detected_skills = detect_skills(slack_message.body,self.ldUtil.skillList)
if not detected_skills and slack_message.type == PostTypes.HIRING:
return False
print("******* Local Time *********: ", sender_local_time_str)
print("******* Message *********: ", slack_message.body)
# Create a new directory with the name of the current date
now = datetime.now()
date_dir_name = now.strftime("%Y-%m-%d")+"-sending-script-run"
date_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name)
os.makedirs(date_dir_path, exist_ok=True)
# Create a see the action directory
see_the_action_dir = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, 'see-the-action')
os.makedirs(see_the_action_dir, exist_ok=True)
# Create a new directory for the current Slack env file
env_dir_name = self.driver.find_element(by=By.CSS_SELECTOR, value='.p-ia__sidebar_header__team_name_text, .p-ia4_home_header_menu__team_name').text
env_dir_path = os.path.join(os.getcwd(), '/home/castro-mbithi/Box/slack-discord-pbc-auto-screenshots/slack/', date_dir_name, env_dir_name)
# env_dir_path = os.path.join(date_dir_name, env_dir_name)
os.makedirs(env_dir_path, exist_ok=True)
# Create additional directories for each activity
actual_job_post_dir = os.path.join(env_dir_path, 'all-job-posts')
os.makedirs(actual_job_post_dir, exist_ok=True)
screenshot_name = f'sender-script-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(actual_job_post_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
screenshot_name = f'sender-script-{now.strftime("%Y-%m-%d-%H-%M-%S")}.png'
screenshot_path = os.path.join(see_the_action_dir, screenshot_name)
self.driver.save_screenshot(screenshot_path)
# default_skills = ["ux design", "aws", "django", "react", "python"]
# if len(detected_skills) == 0:
# detected_skills = default_skills
print("******* Skills *********: ", ", ".join(detected_skills))
current_url = self.driver.current_url
send_message_button = self.driver.find_element(by=By.XPATH, value="//button[@data-qa='member_profile_message_btn']")
self.driver.execute_script("arguments[0].click();", send_message_button)
if not self.dbManager.check_post_exists(sender_id, message_id):
if self.dmCount < 10:
self.dbManager.add_post_history(sender_id, message_id)
await self.send_dm(channel_name, sender_name, detected_skills, current_url,slack_message)
self.dmCount += 1
elif self.dmCount == 10:
print("10 DMs are complete, rest will be done in the next run")
self.dmCount += 1
return True
return False
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, filename, exc_tb.tb_lineno)
return False
async def get_messages_from_channel(self, channel, channel_name):
try:
WebDriverWait(self.driver, 10).until(EC.element_to_be_clickable((By.XPATH, "//button[@aria-label='Jump to date']")))
time.sleep(5 + random.uniform(0, 0.5))
time_tags = self.get_time_tags()
if len(time_tags) > 0:
time_tags[len(time_tags) - 1].click()
time.sleep(1 + random.uniform(0, 0.5))
jump_to_dropdown = self.driver.find_element(by=By.XPATH, value="//div[@data-qa='menu_items']")
jump_elements = jump_to_dropdown.find_elements(by=By.CLASS_NAME, value="c-menu_item__li")
jump_to_specific_date = jump_elements[len(jump_elements) - 1]
jump_to_specific_date.click()
time.sleep(1 + random.uniform(0, 0.5))
today_button = self.driver.find_element(by=By.CLASS_NAME, value="c-date_picker_calendar__date--is_today")
date_buttons = self.get_date_buttons_on_calendar()
today_index = date_buttons.index(today_button)
print("********************** Going To 45 Days Ago ************************")
if today_index > 44:
date_buttons[today_index - 45].click()
else:
prev_month_button = self.driver.find_element(by=By.XPATH, value="//button[@aria-label='Previous month']")
prev_month_button.click()
time.sleep(1 + random.uniform(0, 0.5))
date_buttons = self.get_date_buttons_on_calendar()
date_buttons[today_index - 45].click()
time.sleep(10 + random.uniform(0, 2))
time_tags = self.get_time_tags()
first_timestamp = self.get_timestamp_from_time_tag(time_tags[0])
last_timestamp = self.get_timestamp_from_time_tag(time_tags[-1])
first_date = datetime.fromtimestamp(first_timestamp).date()
last_date = datetime.fromtimestamp(last_timestamp).date()
today = date.today()
days_45ago = today - timedelta(days=45)
if last_date < days_45ago:
print("********************** No New Message ************************")
return
message_panel = self.driver.find_element(by=By.CLASS_NAME, value="c-message_list")
last_message = ""
focus_message = None
messages = message_panel.find_elements(by=By.XPATH, value="//div[(@class='c-virtual_list__item') and (@role='listitem')]")
for message in messages:
avatar = message.find_elements(by=By.CLASS_NAME, value="c-avatar")
if len(avatar) > 0:
focus_message = message
break
print("********************** Checking Messages ************************")
filter_completed = False
final_block = False
dm_message_id = None
# ... existing code ...
while not filter_completed:
messages = message_panel.find_elements(by=By.XPATH, value="//div[(@class='c-virtual_list__item') and (@role='listitem')]")
start_index = messages.index(focus_message) if focus_message is not None else 0
message_element = messages[start_index]
if message_element.get_attribute("id") == dm_message_id:
if len(messages) > start_index + 1:
start_index = start_index + 1
message_element = messages[start_index]
else:
filter_completed = True
continue
post_links = []
self.driver.execute_script("arguments[0].scrollIntoView(true);", messages[start_index])
time.sleep(0.2)
message_content = ""
message_content += strip_tags(messages[start_index].get_attribute("innerHTML"))
for link in messages[start_index].find_elements(by=By.TAG_NAME, value="a"):
post_links.append(link.get_attribute("href"))
time.sleep(0.2 + random.uniform(0, 0.5))
if message_element == messages[-1] and focus_message is None:
filter_completed = True
for message in messages[start_index + 1:]:
avatar = message.find_elements(by=By.CLASS_NAME, value="c-avatar")
if len(avatar) == 0:
message_content += strip_tags(message.get_attribute("innerHTML"))
for link in message.find_elements(by=By.TAG_NAME, value="a"):
post_links.append(link.get_attribute("href"))
if message == messages[-1] and focus_message is None:
filter_completed = True
else:
focus_message = message
break
temp_dm_message_id = message_element.get_attribute("id")
result = await self.filter_message(channel_name, message_element, message_content, post_links)
if result:
focus_message = None
dm_message_id = temp_dm_message_id
time.sleep(0.5 + random.uniform(0, 0.5))
# else:
# return
# time.sleep(1 + random.uniform(0
else:
return
time.sleep(1 + random.uniform(0, 0.5))
except Exception as e:
exc_type, exc_obj, exc_tb = sys.exc_info()
filename = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
# print(exc_type, filename, exc_tb.tb_lineno)
#Table_dict={'post_links': postEditor is loading...
Leave a Comment