restapis.py

 avatar
user_6242850
plain_text
2 years ago
6.2 kB
7
Indexable
import requests
from requests.auth import HTTPBasicAuth
from .models import DealerReview

# Define the missing variable
API_KEY = "AOk7Ln1k62vPK4QYt_dvblE2NKU_fFNG1wNfV6YJzcU8"

# Define the get_request function
# Cloudant credentials
CLOUDANT_USERNAME = '41b72835-e355-48ae-9d54-2ba6dc3c140e-bluemix'
CLOUDANT_API_KEY = 'AOk7Ln1k62vPK4QYt_dvblE2NKU_fFNG1wNfV6YJzcU8'

# Watson NLU credentials
WATSON_NLU_API_KEY = 'KidOOw8m-hso_lc2AgTMLdxmudJdgaJAe-dewXr62x1L'
WATSON_NLU_URL = (
    'https://api.us-south.natural-language-understanding.watson.cloud.ibm.com/instances/'
    'ea601f46-3769-4375-85f1-9c79b2d0f580'
)

def get_request(url, api_key=False, **kwargs):
    """
    Make a GET request to the specified URL.
    """
    print(f"GET from {url}")
    try:
        headers = {'Content-Type': 'application/json'}
        if api_key:
            # Basic authentication GET
            response = requests.get(
                url,
                headers=headers,
                params=kwargs,
                auth=HTTPBasicAuth('apikey', api_key),
                timeout=10
            )
        else:
            # No authentication GET
            response = requests.get(
                url,
                headers=headers,
                params=kwargs,
                timeout=10
            )
        response.raise_for_status()  # If the request failed, this will raise a HTTPError
        return response.json()  # Return the JSON response from the server
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while making GET request: {e}")
        return {}  # Return an empty dictionary when an exception occurs

def post_request(url, json_payload, auth_needed=True, **kwargs):
    """
    Make a POST request to the specified URL with the given JSON payload.
    - If auth_needed is True, use HTTP Basic Auth with Cloudant credentials.
    - kwargs can be used to pass additional parameters like headers.
    """
    headers = kwargs.get('headers', {'Content-Type': 'application/json'})

    # If authentication is needed, use HTTPBasicAuth
    if auth_needed:
        auth = HTTPBasicAuth(CLOUDANT_USERNAME, CLOUDANT_API_KEY)
    else:
        auth = None

    try:
        response = requests.post(
            url,
            json=json_payload,
            headers=headers,
            auth=auth,
            timeout=10,
            **kwargs
        )
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error in post_request: {e}")
        return {}

# ... [The rest of your existing methods]

def get_dealers_from_cf():
    """
    Get the list of dealers from the specified URL.
    """
    url = 'https://kstiner101-3000.theiadockernext-0-labs-prod-theiak8s-4-tor01.proxy.cognitiveclass.ai/dealerships/get'
    try:
        data = get_request(url)
        dealers = []
        if 'docs' in data:
            for dealer in data['docs']:  # This line might be different based on your data structure
                dealer_doc = dealer  # Updated line as per my instruction
                # The rest of your code that processes dealer_doc
                dealers.append({
                    'id': dealer_doc['id'],
                    'name': dealer_doc['name'],
                    'city': dealer_doc['city'],
                    'state': dealer_doc['state'],
                    'st': dealer_doc['st'],
                    'address': dealer_doc['address'],
                    'zip': dealer_doc['zip'],
                    'lat': dealer_doc['lat'],
                    'long': dealer_doc['long'],
                    'short_name': dealer_doc['short_name'],
                    'dealer_type': dealer_doc['dealer_type'],
                })
        return dealers
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        return []


def analyze_review_sentiments(dealerreview):
    """
    Analyze the sentiment of a review text using Watson NLU.
    """
    text = dealerreview.review
    url = WATSON_NLU_URL + '/v1/analyze'
    params = {
        'text': text,
        'version': '2021-08-01',
        'features': 'sentiment',
        'return_analyzed_text': True
    }
    headers = {
        'Content-Type': 'application/json',
        'Authorization': 'Basic ' + WATSON_NLU_API_KEY
    }
    try:
        response = post_request(url, json_payload=params, headers=headers)
        sentiment = response.get('sentiment', {}).get('label', 'Unknown')
        return sentiment
    except Exception as e:
        print(f"Error in analyze_review_sentiments: {e}")
        return 'Unknown'

def get_dealer_reviews_from_cf(url, dealer_id):
    """
    Get the reviews for a specific dealer from Cloudant.
    """
    try:
        data = get_request(url, params={'dealerId': dealer_id})
        reviews = []
        if 'docs' in data:
            for doc in data['docs']:
                review = DealerReview(
                    dealership=doc['dealership'],
                    name=doc['name'],
                    purchase=doc['purchase'],
                    review=doc['review'],
                    purchase_date=doc['purchase_date'],
                    car_make=doc['car_make'],
                    car_model=doc['car_model'],
                    car_year=doc['car_year'],
                    sentiment=analyze_review_sentiments(doc['review'])  # Analyze sentiment using Watson NLU
                )
                reviews.append(review)
        return reviews
    except requests.exceptions.RequestException as e:
        print(f"Error: {e}")
        return []

# Example usage
DEALERS_URL = (
    "https://41b72835-e355-48ae-9d54-2ba6dc3c140e-bluemix.cloudantnosqldb.appdomain.cloud/"
    "dealers/_all_docs"
)
REVIEWS_URL = (
    "https://41b72835-e355-48ae-9d54-2ba6dc3c140e-bluemix.cloudantnosqldb.appdomain.cloud/"
    "reviews/_find"
)

# Get dealers from Cloudant
dealers_cf = get_dealers_from_cf(DEALERS_URL)
for dealer_cf in dealers_cf:
    print(dealer_cf)

# Get reviews for a specific dealer from Cloudant
DEALER_ID = "your-dealer-id"
reviews_cf = get_dealer_reviews_from_cf(REVIEWS_URL, DEALER_ID)
for review_cf in reviews_cf:
    print(review_cf)
Editor is loading...
Leave a Comment