Untitled
unknown
python
2 years ago
2.4 kB
13
Indexable
from flask import current_app, Request
from google.cloud import datastore
import feedparser
import tweepy
import time
import html
import re
def tweet_rss(request: Request):
try:
# Get config from HTTP request
rss_url = request.args.get('rss_url', 'Fail')
consumer_api_key = request.args.get('consumer_key', 'Fail')
consumer_api_secret = request.args.get('consumer_secret', 'Fail')
api_access_token = request.args.get('access_token', 'Fail')
api_access_token_secret = request.args.get('access_token_secret', 'Fail')
client = tweepy.Client(
consumer_key = consumer_api_key,
consumer_secret = consumer_api_secret,
access_token = api_access_token,
access_token_secret = api_access_token_secret
)
# Datastore client
db = datastore.Client()
# Fetch latest entry posted
key = db.key('LatestEntry', 'latest')
latest_entry = db.get(key)
latest_entry_id = latest_entry['entry_id'] if latest_entry else ''
# Fetch the feed
feed = feedparser.parse(rss_url)
# Get new entries
new_entries = []
for entry in feed.entries:
# Use the title attribute if it exists, otherwise use the description attribute
title = entry.title if hasattr(entry, 'title') else entry.description
# Clean up HTML tags and unescape HTML entities
clean_title = html.unescape(re.sub('<[^<]+?>', '', title))
if hasattr(entry, 'id') and hasattr(entry, 'link'):
if entry.id != latest_entry_id:
entry.title = clean_title
new_entries.append(entry)
else:
break
# Post to Twitter
for i, entry in enumerate(new_entries):
time.sleep(i * 5 * 60) # wait for 5 minutes
status = f"{entry.title}\n{entry.link}"
client.create_tweet(text=status) # Twitter has a 280 character limit
# Update the latest entry posted
if new_entries:
latest_entry = datastore.Entity(key=key)
latest_entry.update({
'entry_id': new_entries[0].id
})
db.put(latest_entry)
except Exception as e:
current_app.logger.error(f'Error: {e}')
raise eEditor is loading...