Untitled
unknown
plain_text
2 years ago
19 kB
6
Indexable
import json,sys,time,logging,copy,os import boto3,psycopg2 from psycopg2.extras import Json from selenium import webdriver from datetime import datetime, timedelta from dateutil.tz import gettz from dateutil.relativedelta import relativedelta from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) logger = logging.getLogger(__name__) class Scrap_Zomato: URL = "https://www.zomato.com/webroutes/reviews/loadMore?sort=dd&filter=reviews-dd&res_id={}&page={}" def send_plain_email(self,error_text): ses_client = boto3.client("ses",region_name="us-west-2",aws_access_key_id='AKIAJJMDYGYLKO7CYHBQ',aws_secret_access_key='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') CHARSET = "UTF-8" response = ses_client.send_email( Destination={ "ToAddresses": [ "anuingale34@gmail.com","rohit@famepilot.com" ], }, Message={ "Body": { "Text": { "Charset" : CHARSET, "Data" : f"Error Occur Due too the - {error_text}", } }, "Subject": { "Charset" : CHARSET, "Data" : f"Zomato Review Script Status - {datetime.now().date()}", }, }, Source="no_reply@famepilot.com", ) return response def get_connection(self): connection = psycopg2.connect( user = os.getenv('DB_USER'), password = os.getenv('DB_PASSWORD'), host = os.getenv('DB_HOST'), port = os.getenv('DB_PORT'), database = os.getenv('DB_NAME')) return connection def get_driver_proxy(self): desired_capabilities = webdriver.DesiredCapabilities.CHROME.copy() PROXY = 'http://lum-customer-famepilot-zone-static:pvam809j9oxz@zproxy.lum-superproxy.io:24000' desired_capabilities['proxy'] = { "httpProxy" : PROXY, "noProxy" : None, "proxyType" : "MANUAL", "class" : "org.openqa.selenium.Proxy", "autodetect" : False } return desired_capabilities def open_driver(self, url): desired_capabilities = self.get_driver_proxy() browser = webdriver.Remote("http://127.0.0.1:4444/wd/hub",desired_capabilities) # browser = webdriver.Chrome(executable_path="/usr/bin/chromedriver",desired_capabilities=desired_capabilities) browser.get(url) return browser def get_review_create(self, review_time): try: today = datetime.now(tz=gettz('Asia/Kolkata')) if review_time == 'today' or review_time == 'just now': review_date = today elif review_time == 'yesterday': review_date = today - relativedelta(days=1) elif review_time.split()[1] == "minute" or review_time.split()[0] == "minute": review_date = today - relativedelta(minutes=1) elif review_time.split()[1] == "minutes": minutes_count = int(review_time.split()[0]) review_date = today - relativedelta(minutes=minutes_count) elif review_time.split()[1] == "hour" or review_time.split()[0] == "hour": review_date = today - relativedelta(hours=1) elif review_time.split()[1] == "hours": hours_count = int(review_time.split()[0]) review_date = today - relativedelta(hours=hours_count) elif review_time.split()[1] == "day" or review_time.split()[0] == "day": review_date = today - relativedelta(days=1) elif review_time.split()[1] == "days": days_count = int(review_time.split()[0]) review_date = today - relativedelta(days=days_count) elif review_time.split()[1] == "month" or review_time.split()[0] == "month": review_date = today - relativedelta(months=1) elif review_time.split()[1] == "months": month_count = int(review_time.split()[0]) review_date = today - relativedelta(months=month_count) elif review_time.split()[1] == "year" or review_time.split()[0] == "year": review_date = today - relativedelta(years=1) elif review_time.split()[1] == "years": year_count = int(review_time.split()[0]) review_date = today - relativedelta(years=year_count) else: review_time = review_time.replace(",", "") print(review_time) review_date = datetime.strptime(review_time, '%b %d %Y') return review_date except Exception as error: exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) sys.exit(1) def scrapper(self, link): page_id = 1 stop = False formatted_data = list() print(link) try: connection = self.get_connection() cursor = connection.cursor() postgreSQL_select_ids = "select l.branch_id,l.web_portal_id,l.id,b.id from business_links l inner join business_branch br on br.id=l.branch_id inner join business_business b on br.business_id=b.id where l.link='{}'".format(str(link)) cursor.execute(postgreSQL_select_ids) ids = cursor.fetchall() if len(ids) > 0: print(f"Processing for branch_id :{ids[0][0]}") branch_id = ids[0][0] provider_id = ids[0][1] link_id = ids[0][2] business_id = ids[0][3] postgreSQL_select_db_reviews = "select review_id,description from reviews_review where branch_id={} and provider_id={} and link_id={}".format(branch_id, provider_id, link_id) cursor.execute(postgreSQL_select_db_reviews) db_reviews = cursor.fetchall() else: raise ValueError("No Links Available") branch_zomato_res_query = """select res_id from business_branchproviderinfo where branch_id={} and provider='zomato';""".format( branch_id) cursor.execute(branch_zomato_res_query) res_id = cursor.fetchall() if res_id: res_id = res_id[0][0] else: res_id = link.split("zrp_bid=")[1] if "zrp_bid" in link else None except (Exception, psycopg2.Error) as error: print("Error while connecting to PostgreSQL", error) exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) return None while True: try: url = self.URL.format(str(res_id), page_id) browser = self.open_driver(url) #reviews = json.loads(browser.execute_script("return document.body.innerHTML;")[59:-6]) #total_reviews = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("of ")[1] #total_pages = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("-")[1].split("of")[0] try: reviews = json.loads(browser.execute_script("return document.body.innerHTML;")[59:-6]) total_reviews = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("of ")[1] total_pages = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("-")[1].split("of")[0] all_reviews = reviews['entities']['REVIEWS'].keys() except Exception: print("No reviews present") break for key in all_reviews: try: review = reviews['entities']['REVIEWS'][key] description = review['reviewText'].replace("<br/>", ". ") data = dict() review_id = review['reviewId'] data['review_id'] = review_id data['description'] = description author_name = review['userName'] profile_pic = review['userProfilePic'] profile_url = review['userProfileUrl'] user_review_count = review['userReviewsCount'] user_id = review['reviewUserId'] review_url = review['reviewUrl'] tags = review['ratingV2Text'] data['rating'] = int(review['ratingV2']) data['review_id'] = review_id data['branch_id'] = branch_id data['provider_id'] = provider_id data['nps_score'] = 0 data['heading'] = '' data['smiley_value'] = '' data['review_type'] = 'star' data['is_thum_up'] = False data['status'] = 'not_responded' dateval = self.get_review_create(review['timestamp']) if dateval: data['review_create'] = dateval last_year_date = datetime.now(tz=gettz('Asia/Kolkata')) - timedelta(days=7) if dateval <= last_year_date: stop = True data['created_at'] = datetime.now(tz=gettz('Asia/Kolkata')) data['updated_at'] = datetime.now(tz=gettz('Asia/Kolkata')) data['reviewer'] = { 'name': author_name, "profile_pic": profile_pic, "profile_url": profile_url, "user_review_count": user_review_count, "user_id": user_id } extra_data_dic = { "review_link": review_url, "tags": tags } data['extra_data'] = extra_data_dic """checking if review already exist""" reviews_copy = copy.deepcopy(data) review_check_one = (reviews_copy["review_id"], reviews_copy['description']) review_check_two = copy.deepcopy(review_check_one)[1] if review_check_one in db_reviews or review_check_two in db_reviews: print("No more reviews") stop = True break print(data, "\n\n") formatted_data.append(data) except Exception: stop = True break if page_id % 5 == 0: time.sleep(20) print("page no: ", page_id, "\n\n") page_id += 1 if stop or page_id == int(total_pages): browser.quit() break except Exception as error: browser.quit() exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) break review_count = 1 for dic in formatted_data: try: postgres_insert_query = """ INSERT INTO reviews_review (review_id,nps_score,description,reviewer,extra_data, review_type,is_thum_up,status,heading,branch_id,provider_id,link_id,created_at,updated_at,rating,review_create) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" values = ( dic.get("review_id"), dic.get("nps_score"), dic.get('description', ''), Json(dic.get('reviewer')), Json(dic.get("extra_data")), dic.get('review_type'), dic.get('is_thum_up'), dic.get('status'), dic.get('heading'), branch_id, provider_id, link_id, dic.get('created_at'), dic.get('updated_at'), dic.get('rating'), dic.get('review_create') ) cursor.execute(postgres_insert_query, values) print("Saving review :", review_count) print(dic) print() review_count = review_count + 1 except Exception as error: cursor.execute("rollback") connection.commit() exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) pass review_type = "TYPE_" + dic.get("extra_data").get("tags") fetch_review_tag_query = """select id from reviews_tags where name='{}'""".format(str(review_type)) cursor.execute(fetch_review_tag_query) tag_value = cursor.fetchall() if not tag_value: update_review_tag_query = """INSERT INTO reviews_tags (business_id, name, content_type) VALUES (%s,%s,%s)""" values = ( business_id, review_type, "review" ) cursor.execute(update_review_tag_query, values) fetch_review_tag_query = """select id from reviews_tags where name='{}'""".format(review_type) cursor.execute(fetch_review_tag_query) tag_value = cursor.fetchall() fetch_review_query = """select id from reviews_review where review_id='{}'""".format(str(dic['review_id'])) cursor.execute(fetch_review_query) review = cursor.fetchall() update_tag_on_review_query = """INSERT INTO reviews_review_tag (review_id, tags_id) VALUES (%s, %s)""" values = ( review[0][0], tag_value[0][0] ) try: cursor.execute(update_tag_on_review_query, values) connection.commit() except Exception as error: error_text = f'Error happens on the review_id {review[0][0]} with tag_id {tag_value[0][0]}' exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error_text) self.send_plain_email(error_text_line) pass return formatted_data def call_scraper(self, business_id=None, branch_id=None): try: connection = self.get_connection() cursor = connection.cursor() start_time = datetime.now() print(f"Start Time :{start_time}") if business_id: branches_query = "select br.id from business_branch br inner join business_business b on br.business_id=b.id where br.business_id='{}' and b.account_suspended=false".format( business_id) cursor.execute(branches_query) branch_ids = cursor.fetchall() else: branches = "select br.id from business_branch br inner join business_business b on br.business_id=b.id where b.id<350 and /*b.id not in (258,296,297,310,311,312,313,314,315,380,381,390,391,392,393,394,316,38,436,437,438,439,440,441,446,238,459,468,461,467,474) and*/ b.account_suspended=false order by br.id" cursor.execute(branches) branch_ids = cursor.fetchall() if branch_id: branch_ids = [(branch_id,)] provider_id_query = "select id from business_webportal where provider='zomato'" cursor.execute(provider_id_query) zomato_id = cursor.fetchall()[0][0] branch_list = [] for branch_id in branch_ids: branch_list.append(branch_id[0]) links_query = "select link from business_links where web_portal_id='{}'and is_active=True and branch_id::varchar= any(string_to_array({}, ','))".format( zomato_id, "'" + ",".join([str(n) for n in branch_list]) + "'") cursor.execute(links_query) all_links = cursor.fetchall() if connection: cursor.close() connection.close() print("total links to scrap =", len(all_links)) for link in all_links: self.scrapper(link[0]) end_time = datetime.now() print(f"End Time:{end_time}") final_time = end_time - start_time print(f'Final Execution Time:{final_time}') except Exception as error: print(error) exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) sys.exit(1) business_id = branch_id = '' try: business_id = sys.argv[1] branch_id = sys.argv[2] except Exception: pass Scrap_Zomato().call_scraper(business_id, branch_id)
Editor is loading...