1
Fork 0
mirror of https://github.com/pds-nest/nest.git synced 2024-10-16 20:17:25 +00:00
pds-2021-g2-nest/nest_crawler/alert_trigger.py
2021-05-27 11:21:51 +02:00

66 lines
3 KiB
Python

from datetime import datetime, timedelta
from nest_backend.database import *
from authentication import authenticate
import smtplib
def is_repo_alert_triggered(repository_id):
repo = Repository.query.filter_by(id=repository_id).first()
if repo is None:
print("Non esiste una repository con questo id")
return False
alerts = [alert for alert in repo.alerts]
alerts_triggered = []
for alert in alerts:
evaluation_mode = alert.evaluation_mode
conditions = [condition.condition for condition in alert.conditions]
repo_tweets = [tweet.tweet for tweet in repo.tweets]
alert_tweets = set(repo_tweets)
if evaluation_mode == ConditionMode.all_and:
for condition in conditions:
alert_tweets.intersection(set([tweet.tweet for tweet in condition.tweets]))
elif evaluation_mode == ConditionMode.all_or:
conditions_tweet = set()
for condition in conditions:
conditions_tweet.update([tweet.tweet for tweet in condition.tweets])
alert_tweets = alert_tweets.intersection(conditions_tweet)
end_time = datetime.now()
window_size_hours = timedelta(hours = alert.window_size)
last_notification_time = min([notification.ora for notification in alert.notifications] if len(alert.notifications)>0 else [end_time - window_size_hours])
start_time = max(end_time - window_size_hours, last_notification_time)
alert_tweets = [tweet for tweet in alert_tweets if (end_time > tweet.insert_time > start_time)]
print(f"I tweet corrispondenti sono:{len(alert_tweets)}")
if len(alert_tweets) >= alert.limit:
alert_notification = Notification(ora=str(datetime.now()), alert_id=alert.id)
ext.session.add(alert_notification)
ext.session.commit()
print("alert triggered")
alerts_triggered.append(alert)
send_notification_email(alert)
send_notification_tweet(alert)
def send_notification_email(alert):
owner_repo = alert.repository.owner
conditions_string = ''
for condition in alert.conditions:
conditions_string += condition.condition.content + ','
conditions_string = conditions_string[:-1]
try:
smtpObj = smtplib.SMTP('localhost')
smtpObj.sendmail("alert@nest.com", owner_repo.email, "Alert triggered")
print("Successfully sent email")
except smtplib.SMTPException:
print("Error: unable to send email")
finally:
if smtpObj is not None:
smtpObj.close()
def send_notification_tweet(alert):
api = authenticate()
conditions_string = ''
for condition in alert.conditions:
conditions_string += condition.condition.content + ','
conditions_string = conditions_string[:-1]
print(conditions_string)
api.update_status(f"L'alert {alert.name} è stato attivato! C'è stato un incremento di popolarità negli argomenti di ricerca {conditions_string}")