restapis.py
user_6242850
plain_text
2 years ago
6.2 kB
7
Indexable
import requests from requests.auth import HTTPBasicAuth from .models import DealerReview # Define the missing variable API_KEY = "AOk7Ln1k62vPK4QYt_dvblE2NKU_fFNG1wNfV6YJzcU8" # Define the get_request function # Cloudant credentials CLOUDANT_USERNAME = '41b72835-e355-48ae-9d54-2ba6dc3c140e-bluemix' CLOUDANT_API_KEY = 'AOk7Ln1k62vPK4QYt_dvblE2NKU_fFNG1wNfV6YJzcU8' # Watson NLU credentials WATSON_NLU_API_KEY = 'KidOOw8m-hso_lc2AgTMLdxmudJdgaJAe-dewXr62x1L' WATSON_NLU_URL = ( 'https://api.us-south.natural-language-understanding.watson.cloud.ibm.com/instances/' 'ea601f46-3769-4375-85f1-9c79b2d0f580' ) def get_request(url, api_key=False, **kwargs): """ Make a GET request to the specified URL. """ print(f"GET from {url}") try: headers = {'Content-Type': 'application/json'} if api_key: # Basic authentication GET response = requests.get( url, headers=headers, params=kwargs, auth=HTTPBasicAuth('apikey', api_key), timeout=10 ) else: # No authentication GET response = requests.get( url, headers=headers, params=kwargs, timeout=10 ) response.raise_for_status() # If the request failed, this will raise a HTTPError return response.json() # Return the JSON response from the server except requests.exceptions.RequestException as e: print(f"An error occurred while making GET request: {e}") return {} # Return an empty dictionary when an exception occurs def post_request(url, json_payload, auth_needed=True, **kwargs): """ Make a POST request to the specified URL with the given JSON payload. - If auth_needed is True, use HTTP Basic Auth with Cloudant credentials. - kwargs can be used to pass additional parameters like headers. """ headers = kwargs.get('headers', {'Content-Type': 'application/json'}) # If authentication is needed, use HTTPBasicAuth if auth_needed: auth = HTTPBasicAuth(CLOUDANT_USERNAME, CLOUDANT_API_KEY) else: auth = None try: response = requests.post( url, json=json_payload, headers=headers, auth=auth, timeout=10, **kwargs ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(f"Error in post_request: {e}") return {} # ... [The rest of your existing methods] def get_dealers_from_cf(): """ Get the list of dealers from the specified URL. """ url = 'https://kstiner101-3000.theiadockernext-0-labs-prod-theiak8s-4-tor01.proxy.cognitiveclass.ai/dealerships/get' try: data = get_request(url) dealers = [] if 'docs' in data: for dealer in data['docs']: # This line might be different based on your data structure dealer_doc = dealer # Updated line as per my instruction # The rest of your code that processes dealer_doc dealers.append({ 'id': dealer_doc['id'], 'name': dealer_doc['name'], 'city': dealer_doc['city'], 'state': dealer_doc['state'], 'st': dealer_doc['st'], 'address': dealer_doc['address'], 'zip': dealer_doc['zip'], 'lat': dealer_doc['lat'], 'long': dealer_doc['long'], 'short_name': dealer_doc['short_name'], 'dealer_type': dealer_doc['dealer_type'], }) return dealers except requests.exceptions.RequestException as e: print(f"Error: {e}") return [] def analyze_review_sentiments(dealerreview): """ Analyze the sentiment of a review text using Watson NLU. """ text = dealerreview.review url = WATSON_NLU_URL + '/v1/analyze' params = { 'text': text, 'version': '2021-08-01', 'features': 'sentiment', 'return_analyzed_text': True } headers = { 'Content-Type': 'application/json', 'Authorization': 'Basic ' + WATSON_NLU_API_KEY } try: response = post_request(url, json_payload=params, headers=headers) sentiment = response.get('sentiment', {}).get('label', 'Unknown') return sentiment except Exception as e: print(f"Error in analyze_review_sentiments: {e}") return 'Unknown' def get_dealer_reviews_from_cf(url, dealer_id): """ Get the reviews for a specific dealer from Cloudant. """ try: data = get_request(url, params={'dealerId': dealer_id}) reviews = [] if 'docs' in data: for doc in data['docs']: review = DealerReview( dealership=doc['dealership'], name=doc['name'], purchase=doc['purchase'], review=doc['review'], purchase_date=doc['purchase_date'], car_make=doc['car_make'], car_model=doc['car_model'], car_year=doc['car_year'], sentiment=analyze_review_sentiments(doc['review']) # Analyze sentiment using Watson NLU ) reviews.append(review) return reviews except requests.exceptions.RequestException as e: print(f"Error: {e}") return [] # Example usage DEALERS_URL = ( "https://41b72835-e355-48ae-9d54-2ba6dc3c140e-bluemix.cloudantnosqldb.appdomain.cloud/" "dealers/_all_docs" ) REVIEWS_URL = ( "https://41b72835-e355-48ae-9d54-2ba6dc3c140e-bluemix.cloudantnosqldb.appdomain.cloud/" "reviews/_find" ) # Get dealers from Cloudant dealers_cf = get_dealers_from_cf(DEALERS_URL) for dealer_cf in dealers_cf: print(dealer_cf) # Get reviews for a specific dealer from Cloudant DEALER_ID = "your-dealer-id" reviews_cf = get_dealer_reviews_from_cf(REVIEWS_URL, DEALER_ID) for review_cf in reviews_cf: print(review_cf)
Editor is loading...
Leave a Comment