From edefc5714762aecee3d98d209aa9d0a3b2db0d59 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Thu, 14 Dec 2017 09:40:03 +0100 Subject: [PATCH] Restructure the ChatWorker class --- core.py | 2 +- worker.py | 86 ++++++++++++++++++++++++++----------------------------- 2 files changed, 42 insertions(+), 46 deletions(-) diff --git a/core.py b/core.py index fb5f1fb..17a3ba6 100644 --- a/core.py +++ b/core.py @@ -62,7 +62,7 @@ def main(): # Otherwise, forward the update to the corresponding worker receiving_worker = chat_workers.get(update.message.chat.id) # Ensure a worker exists for the chat and is alive - if receiving_worker is None or not receiving_worker.thread.is_alive(): + if receiving_worker is None or not receiving_worker.is_alive(): # Suggest that the user restarts the chat with /start bot.send_message(update.message.chat.id, strings.error_no_worker_for_chat) # Skip the update diff --git a/worker.py b/worker.py index f2e533a..8c0f588 100644 --- a/worker.py +++ b/worker.py @@ -15,60 +15,56 @@ class StopSignal: self.reason = reason -class ChatWorker: - """A worker for a single conversation. A new one should be created every time the /start command is sent.""" +class ChatWorker(threading.Thread): + """A worker for a single conversation. A new one is created every time the /start command is sent.""" - def __init__(self, bot: telegram.Bot, chat: telegram.Chat): + def __init__(self, bot: telegram.Bot, chat: telegram.Chat, *args, **kwargs): + # Initialize the thread + super().__init__(name=f"ChatThread {chat.first_name}", *args, **kwargs) + # Store the bot and chat info inside the class + self.bot = bot + self.chat = chat # The sending pipe is stored in the ChatWorker class, allowing the forwarding of messages to the chat process self.queue = queuem.Queue() - # A new thread running the conversation handler is created, and the queue is passed to its arguments to enable the receiving of messages - self.thread = threading.Thread(target=conversation_handler, args=(bot, chat, self.queue)) - def start(self): - """Start the worker process.""" - self.thread.start() + def run(self): + """The conversation code.""" + # TODO: catch all the possible exceptions + # Welcome the user to the bot + self.bot.send_message(self.chat.id, strings.conversation_after_start) + # TODO: Send a command list or something + while True: + # For now, echo the sent message + update = self._receive_next_update() + self.bot.send_message(self.chat.id, f"{threading.current_thread().name} {update.message.text}") def stop(self, reason: str=""): """Gracefully stop the worker process""" # Send a stop message to the thread self.queue.put(StopSignal(reason)) # Wait for the thread to stop - self.thread.join() + self.join() -# TODO: maybe move these functions to a class + def _receive_next_update(self) -> telegram.Update: + """Get the next update from the queue. + If no update is found, block the process until one is received. + If a stop signal is sent, try to gracefully stop the thread.""" + # Pop data from the queue + try: + data = self.queue.get(timeout=int(config["Telegram"]["conversation_timeout"])) + except queuem.Empty: + # If the conversation times out, gracefully stop the thread + self._graceful_stop() + # Check if the data is a stop signal instance + if isinstance(data, StopSignal): + # Gracefully stop the process + self._graceful_stop() + # Return the received update + return data -def graceful_stop(bot: telegram.Bot, chat: telegram.Chat, queue): - """Handle the graceful stop of the process.""" - # Notify the user that the session has expired - bot.send_message(chat.id, strings.conversation_expired) - # End the process - sys.exit(0) - -def receive_next_update(bot: telegram.Bot, chat: telegram.Chat, queue) -> telegram.Update: - """Get the next update from a pipe. - If no update is found, block the process until one is received. - If a stop signal is sent, try to gracefully stop the process.""" - # Pop data from the queue - try: - data = queue.get(timeout=int(config["Telegram"]["conversation_timeout"])) - except queuem.Empty: - # If the conversation times out, gracefully stop the thread - graceful_stop(bot, chat, queue) - # Check if the data is a stop signal instance - if isinstance(data, StopSignal): - # Gracefully stop the process - graceful_stop(bot, chat, queue) - # Return the received update - return data - - -def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, queue): - """This function is ran once for every conversation (/start command) by a separate process.""" - # TODO: catch all the possible exceptions - # Welcome the user to the bot - bot.send_message(chat.id, strings.conversation_after_start) - # TODO: Send a command list or something - while True: - # For now, echo the sent message - update = receive_next_update(bot, chat, queue) - bot.send_message(chat.id, f"{threading.current_thread().name} {update.message.text}") \ No newline at end of file + def _graceful_stop(self): + """Handle the graceful stop of the thread.""" + # Notify the user that the session has expired + self.bot.send_message(self.chat.id, strings.conversation_expired) + # End the process + sys.exit(0)