Untitled

mail@pastecode.io avatar
unknown
python
a year ago
2.9 kB
1
Indexable
Never
from flask import current_app, Request
from google.cloud import datastore
import feedparser
import tweepy
import time
import html
import re

def tweet_rss(request: Request):
    try:
        # Get config from HTTP request
        rss_url = request.args.get('rss_url', 'Fail') 
        consumer_api_key = request.args.get('consumer_key', 'Fail') 
        consumer_api_secret = request.args.get('consumer_secret', 'Fail')
        api_access_token = request.args.get('access_token', 'Fail')
        api_access_token_secret = request.args.get('access_token_secret', 'Fail')

        client = tweepy.Client(
            consumer_key = consumer_api_key,
            consumer_secret = consumer_api_secret,
            access_token = api_access_token,
            access_token_secret = api_access_token_secret
        )

        # Datastore client
        db = datastore.Client()

        # Fetch latest entry posted
        key = db.key('LatestEntry', 'latest')
        latest_entry = db.get(key)
        latest_entry_id = latest_entry['entry_id'] if latest_entry else ''

        # Fetch the feed
        feed = feedparser.parse(rss_url)

        # Get new entries
        new_entries = []
        for entry in feed.entries:
            # Check if we've already collected 3 new entries
            if len(new_entries) >= 3:
                break

            # Use the title attribute if it exists, otherwise use the description attribute
            title = entry.title if hasattr(entry, 'title') else entry.description
            # Clean up HTML tags and unescape HTML entities
            clean_title = html.unescape(re.sub('<[^<]+?>', '', title))
            if hasattr(entry, 'id') and hasattr(entry, 'link'):
                if entry.id != latest_entry_id:
                    entry.title = clean_title
                    new_entries.append(entry)
                else:
                    break

        # Post to Twitter
        for i, entry in enumerate(new_entries):
            time.sleep(i * 1 * 60)  # wait for 1 minute
            status = f"{entry.title}\n{entry.link}"
            client.create_tweet(text=status)  # Twitter has a 280 character limit
        
        # Update the latest entry posted
        tweeted_entries = []
        if new_entries:
            latest_entry = datastore.Entity(key=key)
            latest_entry.update({
                'entry_id': new_entries[0].id
            })
            db.put(latest_entry)
            tweeted_entries = [entry.title for entry in new_entries]

        if tweeted_entries:
            return f"Successfully tweeted {len(tweeted_entries)} entries: {', '.join(tweeted_entries)}", 200
        else:
            return "No new entries to tweet. The feed was already up to date.", 200

    except Exception as e:
        current_app.logger.error(f'Error: {e}')
        raise e