1
Fork 0
mirror of https://github.com/Steffo99/greed.git synced 2024-11-29 00:54:18 +00:00

Fix #27 by removing the character limit on regexes

This commit is contained in:
Steffo 2020-03-13 02:54:23 +01:00
parent 16e6b16fb0
commit 2b23bd5661
2 changed files with 52 additions and 51 deletions

View file

@ -28,7 +28,8 @@ else:
class Price:
"""The base class for the prices in greed.
Its int value is in minimum units, while its float and str values are in decimal format.int("""
def __init__(self, value: typing.Union[int, float, str, "Price"]=0):
def __init__(self, value: typing.Union[int, float, str, "Price"] = 0):
if isinstance(value, int):
# Keep the value as it is
self.value = int(value)
@ -47,8 +48,8 @@ class Price:
def __str__(self):
return strings.currency_format_string.format(symbol=strings.currency_symbol,
value="{0:.2f}".format(
self.value / (10 ** int(config["Payments"]["currency_exp"]))))
value="{0:.2f}".format(
self.value / (10 ** int(config["Payments"]["currency_exp"]))))
def __int__(self):
return self.value
@ -112,14 +113,15 @@ class Price:
def telegram_html_escape(string: str):
return string.replace("<", "&lt;")\
.replace(">", "&gt;")\
.replace("&", "&amp;")\
.replace('"', "&quot;")
return string.replace("<", "&lt;") \
.replace(">", "&gt;") \
.replace("&", "&amp;") \
.replace('"', "&quot;")
def catch_telegram_errors(func):
"""Decorator, can be applied to any function to retry in case of Telegram errors."""
def result_func(*args, **kwargs):
while True:
try:
@ -137,9 +139,8 @@ def catch_telegram_errors(func):
except telegram.error.NetworkError as error:
print(f"Network error while calling {func.__name__}(),"
f" retrying in {config['Telegram']['error_pause']} secs...")
# Display the full NetworkError if in debug mode
if __debug__:
print(f"Full error: {error.message}")
# Display the full NetworkError
print(f"Full error: {error.message}")
time.sleep(int(config["Telegram"]["error_pause"]))
# Unknown error
except telegram.error.TelegramError as error:
@ -154,13 +155,13 @@ def catch_telegram_errors(func):
else:
print(f"Telegram error while calling {func.__name__}(),"
f" retrying in {config['Telegram']['error_pause']} secs...")
# Display the full TelegramError if in debug mode
if __debug__:
print(f"Full error: {error.message}")
# Display the full TelegramError
print(f"Full error: {error.message}")
# Send the error to the Sentry server
elif sentry_client is not None:
if sentry_client is not None:
sentry_client.captureException(exc_info=sys.exc_info())
time.sleep(int(config["Telegram"]["error_pause"]))
return result_func

View file

@ -1,5 +1,5 @@
import threading
import typing
from typing import *
import uuid
import datetime
import telegram
@ -17,10 +17,11 @@ import importlib
language = configloader.config["Config"]["language"]
strings = importlib.import_module("strings." + language)
class StopSignal:
"""A data class that should be sent to the worker when the conversation has to be stopped abnormally."""
def __init__(self, reason: str=""):
def __init__(self, reason: str = ""):
self.reason = reason
@ -41,15 +42,15 @@ class ChatWorker(threading.Thread):
# Open a new database session
self.session = db.Session()
# Get the user db data from the users and admin tables
self.user: typing.Optional[db.User] = None
self.admin: typing.Optional[db.Admin] = None
self.user: Optional[db.User] = None
self.admin: Optional[db.Admin] = None
# The sending pipe is stored in the ChatWorker class, allowing the forwarding of messages to the chat process
self.queue = queuem.Queue()
# The current active invoice payload; reject all invoices with a different payload
self.invoice_payload = None
# The Sentry client for reporting errors encountered by the user
if configloader.config["Error Reporting"]["sentry_token"] != \
"https://00000000000000000000000000000000:00000000000000000000000000000000@sentry.io/0000000":
"https://00000000000000000000000000000000:00000000000000000000000000000000@sentry.io/0000000":
import raven
self.sentry_client = raven.Client(configloader.config["Error Reporting"]["sentry_token"],
release=raven.fetch_git_sha(os.path.dirname(__file__)),
@ -93,7 +94,7 @@ class ChatWorker(threading.Thread):
try:
# If the user is not an admin, send him to the user menu
if self.admin is None:
self.__user_menu()
self.__user_menu()
# If the user is an admin, send him to the admin menu
else:
# Clear the live orders flag
@ -112,7 +113,7 @@ class ChatWorker(threading.Thread):
if self.sentry_client is not None:
self.sentry_client.captureException()
def stop(self, reason: str=""):
def stop(self, reason: str = ""):
"""Gracefully stop the worker process"""
# Send a stop message to the thread
self.queue.put(StopSignal(reason))
@ -144,8 +145,8 @@ class ChatWorker(threading.Thread):
return data
def __wait_for_specific_message(self,
items: typing.List[str],
cancellable: bool=False) -> typing.Union[str, CancelSignal]:
items: List[str],
cancellable: bool = False) -> Union[str, CancelSignal]:
"""Continue getting updates until until one of the strings contained in the list is received as a message."""
while True:
# Get the next update
@ -170,7 +171,7 @@ class ChatWorker(threading.Thread):
# Return the message text
return update.message.text
def __wait_for_regex(self, regex: str, cancellable: bool=False) -> typing.Union[str, CancelSignal]:
def __wait_for_regex(self, regex: str, cancellable: bool = False) -> Union[str, CancelSignal]:
"""Continue getting updates until the regex finds a match in a message, then return the first capture group."""
while True:
# Get the next update
@ -194,7 +195,7 @@ class ChatWorker(threading.Thread):
return match.group(1)
def __wait_for_precheckoutquery(self,
cancellable: bool=False) -> typing.Union[telegram.PreCheckoutQuery, CancelSignal]:
cancellable: bool = False) -> Union[telegram.PreCheckoutQuery, CancelSignal]:
"""Continue getting updates until a precheckoutquery is received.
The payload is checked by the core before forwarding the message."""
while True:
@ -224,7 +225,7 @@ class ChatWorker(threading.Thread):
# Return the successfulpayment
return update.message.successful_payment
def __wait_for_photo(self, cancellable: bool=False) -> typing.Union[typing.List[telegram.PhotoSize], CancelSignal]:
def __wait_for_photo(self, cancellable: bool = False) -> Union[List[telegram.PhotoSize], CancelSignal]:
"""Continue getting updates until a photo is received, then return it."""
while True:
# Get the next update
@ -242,8 +243,8 @@ class ChatWorker(threading.Thread):
# Return the photo array
return update.message.photo
def __wait_for_inlinekeyboard_callback(self, cancellable: bool=True) \
-> typing.Union[telegram.CallbackQuery, CancelSignal]:
def __wait_for_inlinekeyboard_callback(self, cancellable: bool = True) \
-> Union[telegram.CallbackQuery, CancelSignal]:
"""Continue getting updates until an inline keyboard callback is received, then return it."""
while True:
# Get the next update
@ -260,7 +261,7 @@ class ChatWorker(threading.Thread):
# Return the callbackquery
return update.callback_query
def __user_select(self) -> typing.Union[db.User, CancelSignal]:
def __user_select(self) -> Union[db.User, CancelSignal]:
"""Select an user from the ones in the database."""
# Find all the users in the database
users = self.session.query(db.User).order_by(db.User.user_id).all()
@ -335,7 +336,7 @@ class ChatWorker(threading.Thread):
products = self.session.query(db.Product).filter_by(deleted=False).all()
# Create a dict to be used as 'cart'
# The key is the message id of the product list
cart: typing.Dict[typing.List[db.Product, int]] = {}
cart: Dict[List[db.Product, int]] = {}
# Initialize the products list
for product in products:
# If the product is not for sale, don't display it
@ -363,7 +364,7 @@ class ChatWorker(threading.Thread):
inline_keyboard = telegram.InlineKeyboardMarkup([[telegram.InlineKeyboardButton(strings.menu_cancel,
callback_data="cart_cancel")]])
# Send a message containing the button to cancel or pay
final = self.bot.send_message(self.chat.id, strings.conversation_cart_actions, reply_markup=inline_keyboard)
final_msg = self.bot.send_message(self.chat.id, strings.conversation_cart_actions, reply_markup=inline_keyboard)
# Wait for user input
while True:
callback = self.__wait_for_inlinekeyboard_callback()
@ -411,7 +412,7 @@ class ChatWorker(threading.Thread):
if cart[product_id][1] > 0:
product_list += cart[product_id][0].text(style="short", cart_qty=cart[product_id][1]) + "\n"
total_cost += cart[product_id][0].price * cart[product_id][1]
self.bot.edit_message_text(chat_id=self.chat.id, message_id=final.message_id,
self.bot.edit_message_text(chat_id=self.chat.id, message_id=final_msg.message_id,
text=strings.conversation_confirm_cart.format(product_list=product_list,
total_cost=str(total_cost)),
reply_markup=final_inline_keyboard)
@ -459,7 +460,7 @@ class ChatWorker(threading.Thread):
if cart[product_id][1] > 0:
product_list += cart[product_id][0].text(style="short", cart_qty=cart[product_id][1]) + "\n"
total_cost += cart[product_id][0].price * cart[product_id][1]
self.bot.edit_message_text(chat_id=self.chat.id, message_id=final.message_id,
self.bot.edit_message_text(chat_id=self.chat.id, message_id=final_msg.message_id,
text=strings.conversation_confirm_cart.format(product_list=product_list,
total_cost=str(total_cost)),
reply_markup=final_inline_keyboard)
@ -526,10 +527,10 @@ class ChatWorker(threading.Thread):
def __order_status(self):
"""Display the status of the sent orders."""
# Find the latest orders
orders = self.session.query(db.Order)\
.filter(db.Order.user == self.user)\
.order_by(db.Order.creation_date.desc())\
.limit(20)\
orders = self.session.query(db.Order) \
.filter(db.Order.user == self.user) \
.order_by(db.Order.creation_date.desc()) \
.limit(20) \
.all()
# Ensure there is at least one order to display
if len(orders) == 0:
@ -586,8 +587,7 @@ class ChatWorker(threading.Thread):
self.bot.send_message(self.chat.id, strings.payment_cc_amount,
reply_markup=telegram.ReplyKeyboardMarkup(keyboard, one_time_keyboard=True))
# Wait until a valid amount is sent
# TODO: check and debug the regex
selection = self.__wait_for_regex(r"([0-9]{1,3}(?:[.,][0-9]+)?|" + strings.menu_cancel + r")")
selection = self.__wait_for_regex(r"([0-9]+(?:[.,][0-9]+)?|" + strings.menu_cancel + r")")
# If the user cancelled the action
if selection == strings.menu_cancel:
# Exit the loop
@ -764,7 +764,7 @@ class ChatWorker(threading.Thread):
# Open the edit menu for that specific product
self.__edit_product_menu(product=product)
def __edit_product_menu(self, product: typing.Optional[db.Product]=None):
def __edit_product_menu(self, product: Optional[db.Product] = None):
"""Add a product to the database or edit an existing one."""
# Create an inline keyboard with a single skip button
cancel = telegram.InlineKeyboardMarkup([[telegram.InlineKeyboardButton(strings.menu_skip,
@ -780,7 +780,7 @@ class ChatWorker(threading.Thread):
# Wait for an answer
name = self.__wait_for_regex(r"(.*)", cancellable=bool(product))
# Ensure a product with that name doesn't already exist
if (product and isinstance(name, CancelSignal)) or\
if (product and isinstance(name, CancelSignal)) or \
self.session.query(db.Product).filter_by(name=name, deleted=False).one_or_none() in [None, product]:
# Exit the loop
break
@ -805,7 +805,7 @@ class ChatWorker(threading.Thread):
if product.price is not None else 'Non in vendita')),
reply_markup=cancel)
# Wait for an answer
price = self.__wait_for_regex(r"([0-9]{1,3}(?:[.,][0-9]{1,2})?|[Xx])",
price = self.__wait_for_regex(r"([0-9]+(?:[.,][0-9]{1,2})?|[Xx])",
cancellable=True)
# If the price is skipped
if isinstance(price, CancelSignal):
@ -894,10 +894,10 @@ class ChatWorker(threading.Thread):
[telegram.InlineKeyboardButton(strings.menu_refund,
callback_data="order_refund")]])
# Display the past pending orders
orders = self.session.query(db.Order)\
.filter_by(delivery_date=None, refund_date=None)\
.join(db.Transaction)\
.join(db.User)\
orders = self.session.query(db.Order) \
.filter_by(delivery_date=None, refund_date=None) \
.join(db.Transaction) \
.join(db.User) \
.all()
# Create a message for every one of them
for order in orders:
@ -1045,10 +1045,10 @@ class ChatWorker(threading.Thread):
# Loop used to move between pages
while True:
# Retrieve the 10 transactions in that page
transactions = self.session.query(db.Transaction)\
.order_by(db.Transaction.transaction_id.desc())\
.limit(10)\
.offset(10 * page)\
transactions = self.session.query(db.Transaction) \
.order_by(db.Transaction.transaction_id.desc()) \
.limit(10) \
.offset(10 * page) \
.all()
# Create a list to be converted in inline keyboard markup
inline_keyboard_list = [[]]
@ -1070,7 +1070,7 @@ class ChatWorker(threading.Thread):
inline_keyboard = telegram.InlineKeyboardMarkup(inline_keyboard_list)
# Create the message text
transactions_string = "\n".join([str(transaction) for transaction in transactions])
text = strings.transactions_page.format(page=page+1,
text = strings.transactions_page.format(page=page + 1,
transactions=transactions_string)
# Update the previously sent message
self.bot.edit_message_text(chat_id=self.chat.id, message_id=message.message_id, text=text,