Untitled
unknown
python
2 years ago
2.9 kB
4
Indexable
from flask import current_app, Request from google.cloud import datastore import feedparser import tweepy import time import html import re def tweet_rss(request: Request): try: # Get config from HTTP request rss_url = request.args.get('rss_url', 'Fail') consumer_api_key = request.args.get('consumer_key', 'Fail') consumer_api_secret = request.args.get('consumer_secret', 'Fail') api_access_token = request.args.get('access_token', 'Fail') api_access_token_secret = request.args.get('access_token_secret', 'Fail') client = tweepy.Client( consumer_key = consumer_api_key, consumer_secret = consumer_api_secret, access_token = api_access_token, access_token_secret = api_access_token_secret ) # Datastore client db = datastore.Client() # Fetch latest entry posted key = db.key('LatestEntry', 'latest') latest_entry = db.get(key) latest_entry_id = latest_entry['entry_id'] if latest_entry else '' # Fetch the feed feed = feedparser.parse(rss_url) # Get new entries new_entries = [] for entry in feed.entries: # Check if we've already collected 3 new entries if len(new_entries) >= 3: break # Use the title attribute if it exists, otherwise use the description attribute title = entry.title if hasattr(entry, 'title') else entry.description # Clean up HTML tags and unescape HTML entities clean_title = html.unescape(re.sub('<[^<]+?>', '', title)) if hasattr(entry, 'id') and hasattr(entry, 'link'): if entry.id != latest_entry_id: entry.title = clean_title new_entries.append(entry) else: break # Post to Twitter for i, entry in enumerate(new_entries): time.sleep(i * 1 * 60) # wait for 1 minute status = f"{entry.title}\n{entry.link}" client.create_tweet(text=status) # Twitter has a 280 character limit # Update the latest entry posted tweeted_entries = [] if new_entries: latest_entry = datastore.Entity(key=key) latest_entry.update({ 'entry_id': new_entries[0].id }) db.put(latest_entry) tweeted_entries = [entry.title for entry in new_entries] if tweeted_entries: return f"Successfully tweeted {len(tweeted_entries)} entries: {', '.join(tweeted_entries)}", 200 else: return "No new entries to tweet. The feed was already up to date.", 200 except Exception as e: current_app.logger.error(f'Error: {e}') raise e
Editor is loading...