mirror of
https://github.com/Steffo99/greed.git
synced 2024-11-25 15:24:17 +00:00
Restructure the ChatWorker class
This commit is contained in:
parent
1ee792e9ee
commit
edefc57147
2 changed files with 42 additions and 46 deletions
2
core.py
2
core.py
|
@ -62,7 +62,7 @@ def main():
|
|||
# Otherwise, forward the update to the corresponding worker
|
||||
receiving_worker = chat_workers.get(update.message.chat.id)
|
||||
# Ensure a worker exists for the chat and is alive
|
||||
if receiving_worker is None or not receiving_worker.thread.is_alive():
|
||||
if receiving_worker is None or not receiving_worker.is_alive():
|
||||
# Suggest that the user restarts the chat with /start
|
||||
bot.send_message(update.message.chat.id, strings.error_no_worker_for_chat)
|
||||
# Skip the update
|
||||
|
|
86
worker.py
86
worker.py
|
@ -15,60 +15,56 @@ class StopSignal:
|
|||
self.reason = reason
|
||||
|
||||
|
||||
class ChatWorker:
|
||||
"""A worker for a single conversation. A new one should be created every time the /start command is sent."""
|
||||
class ChatWorker(threading.Thread):
|
||||
"""A worker for a single conversation. A new one is created every time the /start command is sent."""
|
||||
|
||||
def __init__(self, bot: telegram.Bot, chat: telegram.Chat):
|
||||
def __init__(self, bot: telegram.Bot, chat: telegram.Chat, *args, **kwargs):
|
||||
# Initialize the thread
|
||||
super().__init__(name=f"ChatThread {chat.first_name}", *args, **kwargs)
|
||||
# Store the bot and chat info inside the class
|
||||
self.bot = bot
|
||||
self.chat = chat
|
||||
# The sending pipe is stored in the ChatWorker class, allowing the forwarding of messages to the chat process
|
||||
self.queue = queuem.Queue()
|
||||
# A new thread running the conversation handler is created, and the queue is passed to its arguments to enable the receiving of messages
|
||||
self.thread = threading.Thread(target=conversation_handler, args=(bot, chat, self.queue))
|
||||
|
||||
def start(self):
|
||||
"""Start the worker process."""
|
||||
self.thread.start()
|
||||
def run(self):
|
||||
"""The conversation code."""
|
||||
# TODO: catch all the possible exceptions
|
||||
# Welcome the user to the bot
|
||||
self.bot.send_message(self.chat.id, strings.conversation_after_start)
|
||||
# TODO: Send a command list or something
|
||||
while True:
|
||||
# For now, echo the sent message
|
||||
update = self._receive_next_update()
|
||||
self.bot.send_message(self.chat.id, f"{threading.current_thread().name} {update.message.text}")
|
||||
|
||||
def stop(self, reason: str=""):
|
||||
"""Gracefully stop the worker process"""
|
||||
# Send a stop message to the thread
|
||||
self.queue.put(StopSignal(reason))
|
||||
# Wait for the thread to stop
|
||||
self.thread.join()
|
||||
self.join()
|
||||
|
||||
# TODO: maybe move these functions to a class
|
||||
def _receive_next_update(self) -> telegram.Update:
|
||||
"""Get the next update from the queue.
|
||||
If no update is found, block the process until one is received.
|
||||
If a stop signal is sent, try to gracefully stop the thread."""
|
||||
# Pop data from the queue
|
||||
try:
|
||||
data = self.queue.get(timeout=int(config["Telegram"]["conversation_timeout"]))
|
||||
except queuem.Empty:
|
||||
# If the conversation times out, gracefully stop the thread
|
||||
self._graceful_stop()
|
||||
# Check if the data is a stop signal instance
|
||||
if isinstance(data, StopSignal):
|
||||
# Gracefully stop the process
|
||||
self._graceful_stop()
|
||||
# Return the received update
|
||||
return data
|
||||
|
||||
def graceful_stop(bot: telegram.Bot, chat: telegram.Chat, queue):
|
||||
"""Handle the graceful stop of the process."""
|
||||
# Notify the user that the session has expired
|
||||
bot.send_message(chat.id, strings.conversation_expired)
|
||||
# End the process
|
||||
sys.exit(0)
|
||||
|
||||
def receive_next_update(bot: telegram.Bot, chat: telegram.Chat, queue) -> telegram.Update:
|
||||
"""Get the next update from a pipe.
|
||||
If no update is found, block the process until one is received.
|
||||
If a stop signal is sent, try to gracefully stop the process."""
|
||||
# Pop data from the queue
|
||||
try:
|
||||
data = queue.get(timeout=int(config["Telegram"]["conversation_timeout"]))
|
||||
except queuem.Empty:
|
||||
# If the conversation times out, gracefully stop the thread
|
||||
graceful_stop(bot, chat, queue)
|
||||
# Check if the data is a stop signal instance
|
||||
if isinstance(data, StopSignal):
|
||||
# Gracefully stop the process
|
||||
graceful_stop(bot, chat, queue)
|
||||
# Return the received update
|
||||
return data
|
||||
|
||||
|
||||
def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, queue):
|
||||
"""This function is ran once for every conversation (/start command) by a separate process."""
|
||||
# TODO: catch all the possible exceptions
|
||||
# Welcome the user to the bot
|
||||
bot.send_message(chat.id, strings.conversation_after_start)
|
||||
# TODO: Send a command list or something
|
||||
while True:
|
||||
# For now, echo the sent message
|
||||
update = receive_next_update(bot, chat, queue)
|
||||
bot.send_message(chat.id, f"{threading.current_thread().name} {update.message.text}")
|
||||
def _graceful_stop(self):
|
||||
"""Handle the graceful stop of the thread."""
|
||||
# Notify the user that the session has expired
|
||||
self.bot.send_message(self.chat.id, strings.conversation_expired)
|
||||
# End the process
|
||||
sys.exit(0)
|
||||
|
|
Loading…
Reference in a new issue