Untitled
unknown
plain_text
2 years ago
19 kB
10
Indexable
import json,sys,time,logging,copy,os
import boto3,psycopg2
from psycopg2.extras import Json
from selenium import webdriver
from datetime import datetime, timedelta
from dateutil.tz import gettz
from dateutil.relativedelta import relativedelta
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
logger = logging.getLogger(__name__)
class Scrap_Zomato:
URL = "https://www.zomato.com/webroutes/reviews/loadMore?sort=dd&filter=reviews-dd&res_id={}&page={}"
def send_plain_email(self,error_text):
ses_client = boto3.client("ses",region_name="us-west-2",aws_access_key_id='AKIAJJMDYGYLKO7CYHBQ',aws_secret_access_key='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')
CHARSET = "UTF-8"
response = ses_client.send_email(
Destination={
"ToAddresses": [
"anuingale34@gmail.com","rohit@famepilot.com"
],
},
Message={
"Body": {
"Text": {
"Charset" : CHARSET,
"Data" : f"Error Occur Due too the - {error_text}",
}
},
"Subject": {
"Charset" : CHARSET,
"Data" : f"Zomato Review Script Status - {datetime.now().date()}",
},
},
Source="no_reply@famepilot.com",
)
return response
def get_connection(self):
connection = psycopg2.connect(
user = os.getenv('DB_USER'),
password = os.getenv('DB_PASSWORD'),
host = os.getenv('DB_HOST'),
port = os.getenv('DB_PORT'),
database = os.getenv('DB_NAME'))
return connection
def get_driver_proxy(self):
desired_capabilities = webdriver.DesiredCapabilities.CHROME.copy()
PROXY = 'http://lum-customer-famepilot-zone-static:pvam809j9oxz@zproxy.lum-superproxy.io:24000'
desired_capabilities['proxy'] = {
"httpProxy" : PROXY,
"noProxy" : None,
"proxyType" : "MANUAL",
"class" : "org.openqa.selenium.Proxy",
"autodetect" : False
}
return desired_capabilities
def open_driver(self, url):
desired_capabilities = self.get_driver_proxy()
browser = webdriver.Remote("http://127.0.0.1:4444/wd/hub",desired_capabilities)
# browser = webdriver.Chrome(executable_path="/usr/bin/chromedriver",desired_capabilities=desired_capabilities)
browser.get(url)
return browser
def get_review_create(self, review_time):
try:
today = datetime.now(tz=gettz('Asia/Kolkata'))
if review_time == 'today' or review_time == 'just now':
review_date = today
elif review_time == 'yesterday':
review_date = today - relativedelta(days=1)
elif review_time.split()[1] == "minute" or review_time.split()[0] == "minute":
review_date = today - relativedelta(minutes=1)
elif review_time.split()[1] == "minutes":
minutes_count = int(review_time.split()[0])
review_date = today - relativedelta(minutes=minutes_count)
elif review_time.split()[1] == "hour" or review_time.split()[0] == "hour":
review_date = today - relativedelta(hours=1)
elif review_time.split()[1] == "hours":
hours_count = int(review_time.split()[0])
review_date = today - relativedelta(hours=hours_count)
elif review_time.split()[1] == "day" or review_time.split()[0] == "day":
review_date = today - relativedelta(days=1)
elif review_time.split()[1] == "days":
days_count = int(review_time.split()[0])
review_date = today - relativedelta(days=days_count)
elif review_time.split()[1] == "month" or review_time.split()[0] == "month":
review_date = today - relativedelta(months=1)
elif review_time.split()[1] == "months":
month_count = int(review_time.split()[0])
review_date = today - relativedelta(months=month_count)
elif review_time.split()[1] == "year" or review_time.split()[0] == "year":
review_date = today - relativedelta(years=1)
elif review_time.split()[1] == "years":
year_count = int(review_time.split()[0])
review_date = today - relativedelta(years=year_count)
else:
review_time = review_time.replace(",", "")
print(review_time)
review_date = datetime.strptime(review_time, '%b %d %Y')
return review_date
except Exception as error:
exception_type, exception_object, exception_traceback = sys.exc_info()
filename = exception_traceback.tb_frame.f_code.co_filename
line_number = exception_traceback.tb_lineno
error_text_line = str(error) + "LINE NO:" + str(line_number)
self.send_plain_email(error_text_line)
sys.exit(1)
def scrapper(self, link):
page_id = 1
stop = False
formatted_data = list()
print(link)
try:
connection = self.get_connection()
cursor = connection.cursor()
postgreSQL_select_ids = "select l.branch_id,l.web_portal_id,l.id,b.id from business_links l inner join business_branch br on br.id=l.branch_id inner join business_business b on br.business_id=b.id where l.link='{}'".format(str(link))
cursor.execute(postgreSQL_select_ids)
ids = cursor.fetchall()
if len(ids) > 0:
print(f"Processing for branch_id :{ids[0][0]}")
branch_id = ids[0][0]
provider_id = ids[0][1]
link_id = ids[0][2]
business_id = ids[0][3]
postgreSQL_select_db_reviews = "select review_id,description from reviews_review where branch_id={} and provider_id={} and link_id={}".format(branch_id, provider_id, link_id)
cursor.execute(postgreSQL_select_db_reviews)
db_reviews = cursor.fetchall()
else:
raise ValueError("No Links Available")
branch_zomato_res_query = """select res_id from business_branchproviderinfo where branch_id={} and provider='zomato';""".format(
branch_id)
cursor.execute(branch_zomato_res_query)
res_id = cursor.fetchall()
if res_id:
res_id = res_id[0][0]
else:
res_id = link.split("zrp_bid=")[1] if "zrp_bid" in link else None
except (Exception, psycopg2.Error) as error:
print("Error while connecting to PostgreSQL", error)
exception_type, exception_object, exception_traceback = sys.exc_info()
filename = exception_traceback.tb_frame.f_code.co_filename
line_number = exception_traceback.tb_lineno
error_text_line = str(error) + "LINE NO:" + str(line_number)
self.send_plain_email(error_text_line)
return None
while True:
try:
url = self.URL.format(str(res_id), page_id)
browser = self.open_driver(url)
#reviews = json.loads(browser.execute_script("return document.body.innerHTML;")[59:-6])
#total_reviews = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("of ")[1]
#total_pages = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("-")[1].split("of")[0]
try:
reviews = json.loads(browser.execute_script("return document.body.innerHTML;")[59:-6])
total_reviews = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("of ")[1]
total_pages = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("-")[1].split("of")[0]
all_reviews = reviews['entities']['REVIEWS'].keys()
except Exception:
print("No reviews present")
break
for key in all_reviews:
try:
review = reviews['entities']['REVIEWS'][key]
description = review['reviewText'].replace("<br/>", ". ")
data = dict()
review_id = review['reviewId']
data['review_id'] = review_id
data['description'] = description
author_name = review['userName']
profile_pic = review['userProfilePic']
profile_url = review['userProfileUrl']
user_review_count = review['userReviewsCount']
user_id = review['reviewUserId']
review_url = review['reviewUrl']
tags = review['ratingV2Text']
data['rating'] = int(review['ratingV2'])
data['review_id'] = review_id
data['branch_id'] = branch_id
data['provider_id'] = provider_id
data['nps_score'] = 0
data['heading'] = ''
data['smiley_value'] = ''
data['review_type'] = 'star'
data['is_thum_up'] = False
data['status'] = 'not_responded'
dateval = self.get_review_create(review['timestamp'])
if dateval:
data['review_create'] = dateval
last_year_date = datetime.now(tz=gettz('Asia/Kolkata')) - timedelta(days=7)
if dateval <= last_year_date:
stop = True
data['created_at'] = datetime.now(tz=gettz('Asia/Kolkata'))
data['updated_at'] = datetime.now(tz=gettz('Asia/Kolkata'))
data['reviewer'] = {
'name': author_name, "profile_pic": profile_pic, "profile_url": profile_url,
"user_review_count": user_review_count, "user_id": user_id
}
extra_data_dic = {
"review_link": review_url,
"tags": tags
}
data['extra_data'] = extra_data_dic
"""checking if review already exist"""
reviews_copy = copy.deepcopy(data)
review_check_one = (reviews_copy["review_id"], reviews_copy['description'])
review_check_two = copy.deepcopy(review_check_one)[1]
if review_check_one in db_reviews or review_check_two in db_reviews:
print("No more reviews")
stop = True
break
print(data, "\n\n")
formatted_data.append(data)
except Exception:
stop = True
break
if page_id % 5 == 0:
time.sleep(20)
print("page no: ", page_id, "\n\n")
page_id += 1
if stop or page_id == int(total_pages):
browser.quit()
break
except Exception as error:
browser.quit()
exception_type, exception_object, exception_traceback = sys.exc_info()
filename = exception_traceback.tb_frame.f_code.co_filename
line_number = exception_traceback.tb_lineno
error_text_line = str(error) + "LINE NO:" + str(line_number)
self.send_plain_email(error_text_line)
break
review_count = 1
for dic in formatted_data:
try:
postgres_insert_query = """ INSERT INTO reviews_review (review_id,nps_score,description,reviewer,extra_data, review_type,is_thum_up,status,heading,branch_id,provider_id,link_id,created_at,updated_at,rating,review_create) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"""
values = (
dic.get("review_id"),
dic.get("nps_score"),
dic.get('description', ''),
Json(dic.get('reviewer')),
Json(dic.get("extra_data")),
dic.get('review_type'),
dic.get('is_thum_up'),
dic.get('status'),
dic.get('heading'), branch_id, provider_id, link_id,
dic.get('created_at'),
dic.get('updated_at'),
dic.get('rating'),
dic.get('review_create')
)
cursor.execute(postgres_insert_query, values)
print("Saving review :", review_count)
print(dic)
print()
review_count = review_count + 1
except Exception as error:
cursor.execute("rollback")
connection.commit()
exception_type, exception_object, exception_traceback = sys.exc_info()
filename = exception_traceback.tb_frame.f_code.co_filename
line_number = exception_traceback.tb_lineno
error_text_line = str(error) + "LINE NO:" + str(line_number)
self.send_plain_email(error_text_line)
pass
review_type = "TYPE_" + dic.get("extra_data").get("tags")
fetch_review_tag_query = """select id from reviews_tags where name='{}'""".format(str(review_type))
cursor.execute(fetch_review_tag_query)
tag_value = cursor.fetchall()
if not tag_value:
update_review_tag_query = """INSERT INTO reviews_tags (business_id, name, content_type) VALUES (%s,%s,%s)"""
values = (
business_id,
review_type,
"review"
)
cursor.execute(update_review_tag_query, values)
fetch_review_tag_query = """select id from reviews_tags where name='{}'""".format(review_type)
cursor.execute(fetch_review_tag_query)
tag_value = cursor.fetchall()
fetch_review_query = """select id from reviews_review where review_id='{}'""".format(str(dic['review_id']))
cursor.execute(fetch_review_query)
review = cursor.fetchall()
update_tag_on_review_query = """INSERT INTO reviews_review_tag (review_id, tags_id) VALUES (%s, %s)"""
values = (
review[0][0],
tag_value[0][0]
)
try:
cursor.execute(update_tag_on_review_query, values)
connection.commit()
except Exception as error:
error_text = f'Error happens on the review_id {review[0][0]} with tag_id {tag_value[0][0]}'
exception_type, exception_object, exception_traceback = sys.exc_info()
filename = exception_traceback.tb_frame.f_code.co_filename
line_number = exception_traceback.tb_lineno
error_text_line = str(error_text)
self.send_plain_email(error_text_line)
pass
return formatted_data
def call_scraper(self, business_id=None, branch_id=None):
try:
connection = self.get_connection()
cursor = connection.cursor()
start_time = datetime.now()
print(f"Start Time :{start_time}")
if business_id:
branches_query = "select br.id from business_branch br inner join business_business b on br.business_id=b.id where br.business_id='{}' and b.account_suspended=false".format(
business_id)
cursor.execute(branches_query)
branch_ids = cursor.fetchall()
else:
branches = "select br.id from business_branch br inner join business_business b on br.business_id=b.id where b.id<350 and /*b.id not in (258,296,297,310,311,312,313,314,315,380,381,390,391,392,393,394,316,38,436,437,438,439,440,441,446,238,459,468,461,467,474) and*/ b.account_suspended=false order by br.id"
cursor.execute(branches)
branch_ids = cursor.fetchall()
if branch_id:
branch_ids = [(branch_id,)]
provider_id_query = "select id from business_webportal where provider='zomato'"
cursor.execute(provider_id_query)
zomato_id = cursor.fetchall()[0][0]
branch_list = []
for branch_id in branch_ids:
branch_list.append(branch_id[0])
links_query = "select link from business_links where web_portal_id='{}'and is_active=True and branch_id::varchar= any(string_to_array({}, ','))".format(
zomato_id, "'" + ",".join([str(n) for n in branch_list]) + "'")
cursor.execute(links_query)
all_links = cursor.fetchall()
if connection:
cursor.close()
connection.close()
print("total links to scrap =", len(all_links))
for link in all_links:
self.scrapper(link[0])
end_time = datetime.now()
print(f"End Time:{end_time}")
final_time = end_time - start_time
print(f'Final Execution Time:{final_time}')
except Exception as error:
print(error)
exception_type, exception_object, exception_traceback = sys.exc_info()
filename = exception_traceback.tb_frame.f_code.co_filename
line_number = exception_traceback.tb_lineno
error_text_line = str(error) + "LINE NO:" + str(line_number)
self.send_plain_email(error_text_line)
sys.exit(1)
business_id = branch_id = ''
try:
business_id = sys.argv[1]
branch_id = sys.argv[2]
except Exception:
pass
Scrap_Zomato().call_scraper(business_id, branch_id)
Editor is loading...