Untitled
unknown
python
2 years ago
20 kB
15
Indexable
import json,sys,time,logging,copy,os import boto3,psycopg2 from psycopg2.extras import Json from selenium import webdriver from datetime import datetime, timedelta from dateutil.tz import gettz from dateutil.relativedelta import relativedelta from dotenv import load_dotenv, find_dotenv load_dotenv(find_dotenv()) logger = logging.getLogger(__name__) class Scrap_Zomato: URL = "https://www.zomato.com/webroutes/reviews/loadMore?sort=dd&filter=reviews-dd&res_id={}&page={}" def send_plain_email(self,error_text): ses_client = boto3.client("ses",region_name="us-west-2",aws_access_key_id='AKIAJJMDYGYLKO7CYHBQ',aws_secret_access_key='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') CHARSET = "UTF-8" response = ses_client.send_email( Destination={ "ToAddresses": [ "anuingale34@gmail.com","rohit@famepilot.com" ], }, Message={ "Body": { "Text": { "Charset" : CHARSET, "Data" : f"Error Occur Due too the - {error_text}", } }, "Subject": { "Charset" : CHARSET, "Data" : f"Zomato Review Script Status - {datetime.now().date()}", }, }, Source="no_reply@famepilot.com", ) return response def get_connection(self): connection = psycopg2.connect( user = os.getenv('DB_USER'), password = os.getenv('DB_PASSWORD'), host = os.getenv('DB_HOST'), port = os.getenv('DB_PORT'), database = os.getenv('DB_NAME')) return connection def get_driver_proxy(self): desired_capabilities = webdriver.DesiredCapabilities.CHROME.copy() PROXY = 'http://lum-customer-famepilot-zone-static:pvam809j9oxz@zproxy.lum-superproxy.io:24000' desired_capabilities['proxy'] = { "httpProxy" : PROXY, "noProxy" : None, "proxyType" : "MANUAL", "class" : "org.openqa.selenium.Proxy", "autodetect" : False } return desired_capabilities def open_driver(self, url): desired_capabilities = self.get_driver_proxy() browser = webdriver.Remote("http://127.0.0.1:4444/wd/hub",desired_capabilities) # browser = webdriver.Chrome(executable_path="/usr/bin/chromedriver",desired_capabilities=desired_capabilities) browser.get(url) return browser def get_review_create(self, review_time): try: today = datetime.now(tz=gettz('Asia/Kolkata')) if review_time == 'today' or review_time == 'just now': review_date = today elif review_time == 'yesterday': review_date = today - relativedelta(days=1) elif review_time.split()[1] == "minute" or review_time.split()[0] == "minute": review_date = today - relativedelta(minutes=1) elif review_time.split()[1] == "minutes": minutes_count = int(review_time.split()[0]) review_date = today - relativedelta(minutes=minutes_count) elif review_time.split()[1] == "hour" or review_time.split()[0] == "hour": review_date = today - relativedelta(hours=1) elif review_time.split()[1] == "hours": hours_count = int(review_time.split()[0]) review_date = today - relativedelta(hours=hours_count) elif review_time.split()[1] == "day" or review_time.split()[0] == "day": review_date = today - relativedelta(days=1) elif review_time.split()[1] == "days": days_count = int(review_time.split()[0]) review_date = today - relativedelta(days=days_count) elif review_time.split()[1] == "month" or review_time.split()[0] == "month": review_date = today - relativedelta(months=1) elif review_time.split()[1] == "months": month_count = int(review_time.split()[0]) review_date = today - relativedelta(months=month_count) elif review_time.split()[1] == "year" or review_time.split()[0] == "year": review_date = today - relativedelta(years=1) elif review_time.split()[1] == "years": year_count = int(review_time.split()[0]) review_date = today - relativedelta(years=year_count) else: review_time = review_time.replace(",", "") print(review_time) review_date = datetime.strptime(review_time, '%b %d %Y') return review_date except Exception as error: exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) sys.exit(1) def scrapper(self, link): page_id = 1 stop = False formatted_data = list() print(link) try: connection = self.get_connection() cursor = connection.cursor() postgreSQL_select_ids = "select l.branch_id,l.web_portal_id,l.id,b.id from business_links l inner join business_branch br on br.id=l.branch_id inner join business_business b on br.business_id=b.id where l.link='{}'".format(str(link)) cursor.execute(postgreSQL_select_ids) ids = cursor.fetchall() if len(ids) > 0: print(f"Processing for branch_id :{ids[0][0]}") branch_id = ids[0][0] provider_id = ids[0][1] link_id = ids[0][2] business_id = ids[0][3] postgreSQL_select_db_reviews = "select review_id,description from reviews_review where branch_id={} and provider_id={} and link_id={}".format(branch_id, provider_id, link_id) cursor.execute(postgreSQL_select_db_reviews) db_reviews = cursor.fetchall() else: raise ValueError("No Links Available") branch_zomato_res_query = """select res_id from business_branchproviderinfo where branch_id={} and provider='zomato';""".format( branch_id) cursor.execute(branch_zomato_res_query) res_id = cursor.fetchall() if res_id: res_id = res_id[0][0] else: res_id = link.split("zrp_bid=")[1] if "zrp_bid" in link else None except (Exception, psycopg2.Error) as error: print("Error while connecting to PostgreSQL", error) exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) return None while True: try: url = self.URL.format(str(res_id), page_id) browser = self.open_driver(url) #reviews = json.loads(browser.execute_script("return document.body.innerHTML;")[59:-6]) #total_reviews = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("of ")[1] #total_pages = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("-")[1].split("of")[0] try: reviews = json.loads(browser.execute_script("return document.body.innerHTML;")[59:-6]) total_reviews = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("of ")[1] total_pages = reviews["page_data"]['sections']['SECTION_REVIEWS']['pageReviewsText'].split(" reviews")[0].split("-")[1].split("of")[0] all_reviews = reviews['entities']['REVIEWS'].keys() except Exception: print("No reviews present") break for key in all_reviews: try: review = reviews['entities']['REVIEWS'][key] description = review['reviewText'].replace("<br/>", ". ") data = dict() review_id = review['reviewId'] data['review_id'] = review_id data['description'] = description author_name = review['userName'] profile_pic = review['userProfilePic'] profile_url = review['userProfileUrl'] user_review_count = review['userReviewsCount'] user_id = review['reviewUserId'] review_url = review['reviewUrl'] tags = review['ratingV2Text'] data['rating'] = int(review['ratingV2']) data['review_id'] = review_id data['branch_id'] = branch_id data['provider_id'] = provider_id data['nps_score'] = 0 data['heading'] = '' data['smiley_value'] = '' data['review_type'] = 'star' data['is_thum_up'] = False data['status'] = 'not_responded' dateval = self.get_review_create(review['timestamp']) if dateval: data['review_create'] = dateval last_year_date = datetime.now(tz=gettz('Asia/Kolkata')) - timedelta(days=7) if dateval <= last_year_date: stop = True data['created_at'] = datetime.now(tz=gettz('Asia/Kolkata')) data['updated_at'] = datetime.now(tz=gettz('Asia/Kolkata')) data['reviewer'] = { 'name': author_name, "profile_pic": profile_pic, "profile_url": profile_url, "user_review_count": user_review_count, "user_id": user_id } extra_data_dic = { "review_link": review_url, "tags": tags } images_array = [] if len(review['reviewPhotos']) > 0: for image_entity in review['reviewPhotos']['entities'][0]['entity_ids']: image_obj = reviews['entities']['IMAGES'][image_entity] images_array.append(str(image_obj['url'])) extra_data_dic['image_links'] = images_array data['extra_data'] = extra_data_dic """checking if review already exist""" reviews_copy = copy.deepcopy(data) review_check_one = (reviews_copy["review_id"], reviews_copy['description']) review_check_two = copy.deepcopy(review_check_one)[1] if review_check_one in db_reviews or review_check_two in db_reviews: print("No more reviews") stop = True break print(data, "\n\n") formatted_data.append(data) except Exception: stop = True break if page_id % 5 == 0: time.sleep(20) print("page no: ", page_id, "\n\n") page_id += 1 if stop or page_id == int(total_pages): browser.quit() break except Exception as error: browser.quit() exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) break review_count = 1 for dic in formatted_data: try: postgres_insert_query = """ INSERT INTO reviews_review (review_id,nps_score,description,reviewer,extra_data, review_type,is_thum_up,status,heading,branch_id,provider_id,link_id,created_at,updated_at,rating,review_create) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" values = ( dic.get("review_id"), dic.get("nps_score"), dic.get('description', ''), Json(dic.get('reviewer')), Json(dic.get("extra_data")), dic.get('review_type'), dic.get('is_thum_up'), dic.get('status'), dic.get('heading'), branch_id, provider_id, link_id, dic.get('created_at'), dic.get('updated_at'), dic.get('rating'), dic.get('review_create') ) cursor.execute(postgres_insert_query, values) print("Saving review :", review_count) print(dic) print() review_count = review_count + 1 except Exception as error: cursor.execute("rollback") connection.commit() exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) pass review_type = "TYPE_" + dic.get("extra_data").get("tags") fetch_review_tag_query = """select id from reviews_tags where name='{}'""".format(str(review_type)) cursor.execute(fetch_review_tag_query) tag_value = cursor.fetchall() if not tag_value: update_review_tag_query = """INSERT INTO reviews_tags (business_id, name, content_type) VALUES (%s,%s,%s)""" values = ( business_id, review_type, "review" ) cursor.execute(update_review_tag_query, values) fetch_review_tag_query = """select id from reviews_tags where name='{}'""".format(review_type) cursor.execute(fetch_review_tag_query) tag_value = cursor.fetchall() fetch_review_query = """select id from reviews_review where review_id='{}'""".format(str(dic['review_id'])) cursor.execute(fetch_review_query) review = cursor.fetchall() update_tag_on_review_query = """INSERT INTO reviews_review_tag (review_id, tags_id) VALUES (%s, %s)""" values = ( review[0][0], tag_value[0][0] ) try: cursor.execute(update_tag_on_review_query, values) connection.commit() except Exception as error: error_text = f'Error happens on the review_id {review[0][0]} with tag_id {tag_value[0][0]}' exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error_text) self.send_plain_email(error_text_line) pass return formatted_data def call_scraper(self, business_id=None, branch_id=None): try: connection = self.get_connection() cursor = connection.cursor() start_time = datetime.now() print(f"Start Time :{start_time}") if business_id: branches_query = "select br.id from business_branch br inner join business_business b on br.business_id=b.id where br.business_id='{}' and b.account_suspended=false".format( business_id) cursor.execute(branches_query) branch_ids = cursor.fetchall() else: branches = "select br.id from business_branch br inner join business_business b on br.business_id=b.id where b.id<350 and /*b.id not in (258,296,297,310,311,312,313,314,315,380,381,390,391,392,393,394,316,38,436,437,438,439,440,441,446,238,459,468,461,467,474) and*/ b.account_suspended=false order by br.id" cursor.execute(branches) branch_ids = cursor.fetchall() if branch_id: branch_ids = [(branch_id,)] provider_id_query = "select id from business_webportal where provider='zomato'" cursor.execute(provider_id_query) zomato_id = cursor.fetchall()[0][0] branch_list = [] for branch_id in branch_ids: branch_list.append(branch_id[0]) links_query = "select link from business_links where web_portal_id='{}'and is_active=True and branch_id::varchar= any(string_to_array({}, ','))".format( zomato_id, "'" + ",".join([str(n) for n in branch_list]) + "'") cursor.execute(links_query) all_links = cursor.fetchall() if connection: cursor.close() connection.close() print("total links to scrap =", len(all_links)) for link in all_links: self.scrapper(link[0]) end_time = datetime.now() print(f"End Time:{end_time}") final_time = end_time - start_time print(f'Final Execution Time:{final_time}') except Exception as error: print(error) exception_type, exception_object, exception_traceback = sys.exc_info() filename = exception_traceback.tb_frame.f_code.co_filename line_number = exception_traceback.tb_lineno error_text_line = str(error) + "LINE NO:" + str(line_number) self.send_plain_email(error_text_line) sys.exit(1) business_id = branch_id = '' try: business_id = sys.argv[1] branch_id = sys.argv[2] except Exception: pass Scrap_Zomato().call_scraper(business_id, branch_id)
Editor is loading...