1
Fork 0
mirror of https://github.com/Steffo99/greed.git synced 2024-11-22 22:14:19 +00:00

Use threads instead of processes

This commit is contained in:
Steffo 2017-12-14 08:53:16 +01:00
parent 15e80ad644
commit 1ee792e9ee
2 changed files with 25 additions and 26 deletions

View file

@ -62,13 +62,13 @@ def main():
# Otherwise, forward the update to the corresponding worker # Otherwise, forward the update to the corresponding worker
receiving_worker = chat_workers.get(update.message.chat.id) receiving_worker = chat_workers.get(update.message.chat.id)
# Ensure a worker exists for the chat and is alive # Ensure a worker exists for the chat and is alive
if receiving_worker is None or not receiving_worker.process.is_alive(): if receiving_worker is None or not receiving_worker.thread.is_alive():
# Suggest that the user restarts the chat with /start # Suggest that the user restarts the chat with /start
bot.send_message(update.message.chat.id, strings.error_no_worker_for_chat) bot.send_message(update.message.chat.id, strings.error_no_worker_for_chat)
# Skip the update # Skip the update
continue continue
# Forward the update to the worker # Forward the update to the worker
receiving_worker.pipe.send(update) receiving_worker.queue.put(update)
# If the update is a inline keyboard press... # If the update is a inline keyboard press...
if update.inline_query is not None: if update.inline_query is not None:
# Forward the update to the corresponding worker # Forward the update to the corresponding worker
@ -80,7 +80,7 @@ def main():
# Skip the update # Skip the update
continue continue
# Forward the update to the worker # Forward the update to the worker
receiving_worker.pipe.send(update) receiving_worker.queue.put(update)
# If there were any updates... # If there were any updates...
if len(updates): if len(updates):
# Mark them as read by increasing the update_offset # Mark them as read by increasing the update_offset

View file

@ -1,8 +1,9 @@
import multiprocessing import threading
import telegram import telegram
import strings import strings
import configloader import configloader
import sys import sys
import queue as queuem
# Load the configuration # Load the configuration
config = configloader.load_config() config = configloader.load_config()
@ -18,52 +19,50 @@ class ChatWorker:
"""A worker for a single conversation. A new one should be created every time the /start command is sent.""" """A worker for a single conversation. A new one should be created every time the /start command is sent."""
def __init__(self, bot: telegram.Bot, chat: telegram.Chat): def __init__(self, bot: telegram.Bot, chat: telegram.Chat):
# A pipe connecting the main process to the chat process is created
out_pipe, in_pipe = multiprocessing.Pipe(duplex=False)
# The sending pipe is stored in the ChatWorker class, allowing the forwarding of messages to the chat process # The sending pipe is stored in the ChatWorker class, allowing the forwarding of messages to the chat process
self.pipe = in_pipe self.queue = queuem.Queue()
# A new process running the conversation handler is created, and the receiving pipe is passed to its arguments to enable the receiving of messages # A new thread running the conversation handler is created, and the queue is passed to its arguments to enable the receiving of messages
self.process = multiprocessing.Process(target=conversation_handler, args=(bot, chat, out_pipe)) self.thread = threading.Thread(target=conversation_handler, args=(bot, chat, self.queue))
def start(self): def start(self):
"""Start the worker process.""" """Start the worker process."""
self.process.start() self.thread.start()
def stop(self, reason: str=""): def stop(self, reason: str=""):
"""Gracefully stop the worker process""" """Gracefully stop the worker process"""
# Send a stop message to the process # Send a stop message to the thread
self.pipe.send(StopSignal(reason)) self.queue.put(StopSignal(reason))
# Wait for the process to stop # Wait for the thread to stop
self.process.join() self.thread.join()
# TODO: maybe move these functions to a class # TODO: maybe move these functions to a class
def graceful_stop(bot: telegram.Bot, chat: telegram.Chat, pipe): def graceful_stop(bot: telegram.Bot, chat: telegram.Chat, queue):
"""Handle the graceful stop of the process.""" """Handle the graceful stop of the process."""
# Notify the user that the session has expired # Notify the user that the session has expired
bot.send_message(chat.id, strings.conversation_expired) bot.send_message(chat.id, strings.conversation_expired)
# End the process # End the process
sys.exit(0) sys.exit(0)
def receive_next_update(bot: telegram.Bot, chat: telegram.Chat, pipe) -> telegram.Update: def receive_next_update(bot: telegram.Bot, chat: telegram.Chat, queue) -> telegram.Update:
"""Get the next update from a pipe. """Get the next update from a pipe.
If no update is found, block the process until one is received. If no update is found, block the process until one is received.
If a stop signal is sent, try to gracefully stop the process.""" If a stop signal is sent, try to gracefully stop the process."""
# Wait until some data is present in the pipe or the wait time runs out # Pop data from the queue
if not pipe.poll(int(config["Telegram"]["conversation_timeout"])): try:
# If the conversation times out, gracefully stop the process data = queue.get(timeout=int(config["Telegram"]["conversation_timeout"]))
graceful_stop(bot, chat, pipe) except queuem.Empty:
# Receive data from the pipe # If the conversation times out, gracefully stop the thread
data = pipe.recv() graceful_stop(bot, chat, queue)
# Check if the data is a stop signal instance # Check if the data is a stop signal instance
if isinstance(data, StopSignal): if isinstance(data, StopSignal):
# Gracefully stop the process # Gracefully stop the process
graceful_stop(bot, chat, pipe) graceful_stop(bot, chat, queue)
# Return the received update # Return the received update
return data return data
def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, pipe): def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, queue):
"""This function is ran once for every conversation (/start command) by a separate process.""" """This function is ran once for every conversation (/start command) by a separate process."""
# TODO: catch all the possible exceptions # TODO: catch all the possible exceptions
# Welcome the user to the bot # Welcome the user to the bot
@ -71,5 +70,5 @@ def conversation_handler(bot: telegram.Bot, chat: telegram.Chat, pipe):
# TODO: Send a command list or something # TODO: Send a command list or something
while True: while True:
# For now, echo the sent message # For now, echo the sent message
update = receive_next_update(bot, chat, pipe) update = receive_next_update(bot, chat, queue)
bot.send_message(chat.id, f"{multiprocessing.current_process().name} {update.message.text}") bot.send_message(chat.id, f"{threading.current_thread().name} {update.message.text}")