diff --git a/core.py b/core.py index b7cf678..fb5f1fb 100644 --- a/core.py +++ b/core.py @@ -62,13 +62,13 @@ def main(): # Otherwise, forward the update to the corresponding worker receiving_worker = chat_workers.get(update.message.chat.id) # Ensure a worker exists for the chat and is alive - if receiving_worker is None or not receiving_worker.process.is_alive(): + if receiving_worker is None or not receiving_worker.thread.is_alive(): # Suggest that the user restarts the chat with /start bot.send_message(update.message.chat.id, strings.error_no_worker_for_chat) # Skip the update continue # Forward the update to the worker - receiving_worker.pipe.send(update) + receiving_worker.queue.put(update) # If the update is a inline keyboard press... if update.inline_query is not None: # Forward the update to the corresponding worker @@ -80,7 +80,7 @@ def main(): # Skip the update continue # Forward the update to the worker - receiving_worker.pipe.send(update) + receiving_worker.queue.put(update) # If there were any updates... if len(updates): # Mark them as read by increasing the update_offset diff --git a/worker.py b/worker.py index 2b8ed34..f2e533a 100644 --- a/worker.py +++ b/worker.py @@ -1,8 +1,9 @@ -import multiprocessing +import threading import telegram import strings import configloader import sys +import queue as queuem # Load the configuration config = configloader.load_config() @@ -18,52 +19,50 @@ class ChatWorker: """A worker for a single conversation. A new one should be created every time the /start command is sent.""" def __init__(self, bot: telegram.Bot, chat: telegram.Chat): - # A pipe connecting the main process to the chat process is created - out_pipe, in_pipe = multiprocessing.Pipe(duplex=False) # The sending pipe is stored in the ChatWorker class, allowing the forwarding of messages to the chat process - self.pipe = in_pipe - # A new process running the conversation handler is created, and the receiving pipe is passed to its arguments to enable the receiving of messages - self.process = multiprocessing.Process(target=conversation_handler, args=(bot, chat, out_pipe)) + self.queue = queuem.Queue() + # A new thread running the conversation handler is created, and the queue is passed to its arguments to enable the receiving of messages + self.thread = threading.Thread(target=conversation_handler, args=(bot, chat, self.queue)) def start(self): """Start the worker process.""" - self.process.start() + self.thread.start() def stop(self, reason: str=""): """Gracefully stop the worker process""" - # Send a stop message to the process - self.pipe.send(StopSignal(reason)) - # Wait for the process to stop - self.process.join() + # Send a stop message to the thread + self.queue.put(StopSignal(reason)) + # Wait for the thread to stop + self.thread.join() # TODO: maybe move these functions to a class -def graceful_stop(bot: telegram.Bot, chat: telegram.Chat, pipe): +def graceful_stop(bot: telegram.Bot, chat: telegram.Chat, queue): """Handle the graceful stop of the process.""" # Notify the user that the session has expired bot.send_message(chat.id, strings.conversation_expired) # End the process sys.exit(0) -def receive_next_update(bot: telegram.Bot, chat: telegram.Chat, pipe) -> telegram.Update: +def receive_next_update(bot: telegram.Bot, chat: telegram.Chat, queue) -> telegram.Update: """Get the next update from a pipe. If no update is found, block the process until one is received. If a stop signal is sent, try to gracefully stop the process.""" - # Wait until some data is present in the pipe or the wait time runs out - if not pipe.poll(int(config["Telegram"]["conversation_timeout"])): - # If the conversation times out, gracefully stop the process - graceful_stop(bot, chat, pipe) - # Receive data from the pipe - data = pipe.recv() + # Pop data from the queue + try: + data = queue.get(timeout=int(config["Telegram"]["conversation_timeout"])) + except queuem.Empty: + # If the conversation times out, gracefully stop the thread + graceful_stop(bot, chat, queue) # Check if the data is a stop signal instance if isinstance(data, StopSignal): # Gracefully stop the process - graceful_stop(bot, chat, pipe) + graceful_stop(bot, chat, queue) # Return the received update return data -def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, pipe): +def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, queue): """This function is ran once for every conversation (/start command) by a separate process.""" # TODO: catch all the possible exceptions # Welcome the user to the bot @@ -71,5 +70,5 @@ def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, pipe): # TODO: Send a command list or something while True: # For now, echo the sent message - update = receive_next_update(bot, chat, pipe) - bot.send_message(chat.id, f"{multiprocessing.current_process().name} {update.message.text}") \ No newline at end of file + update = receive_next_update(bot, chat, queue) + bot.send_message(chat.id, f"{threading.current_thread().name} {update.message.text}") \ No newline at end of file