mirror of
https://github.com/RYGhub/nameless.git
synced 2024-11-21 21:24:18 +00:00
First commit
This commit is contained in:
commit
f895650cac
3 changed files with 171 additions and 0 deletions
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
.idea/
|
||||||
|
database.sqlite
|
||||||
|
__pycache__/
|
68
database.py
Normal file
68
database.py
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey
|
||||||
|
from sqlalchemy.orm import relationship, sessionmaker
|
||||||
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
|
|
||||||
|
db = create_engine("sqlite:///database.sqlite")
|
||||||
|
|
||||||
|
Base = declarative_base(bind=db)
|
||||||
|
Session = sessionmaker(bind=db)
|
||||||
|
|
||||||
|
# Using sqlite, only a single write session is available
|
||||||
|
session = Session()
|
||||||
|
|
||||||
|
class User(Base):
|
||||||
|
"""The basic data of a Telegram user"""
|
||||||
|
__tablename__ = "user"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True)
|
||||||
|
username = Column(String)
|
||||||
|
firstname = Column(String)
|
||||||
|
lastname = Column(String)
|
||||||
|
language = Column(String)
|
||||||
|
chapter = Column(Integer)
|
||||||
|
|
||||||
|
|
||||||
|
async def message(self, bot, text):
|
||||||
|
await bot.sendMessage(self.id, text)
|
||||||
|
|
||||||
|
def __init__(self, tid: int, firstname: str=None, lastname: str=None, username: str=None, language: str=None):
|
||||||
|
self.id = tid
|
||||||
|
self.username = username
|
||||||
|
self.firstname = firstname
|
||||||
|
self.lastname = lastname
|
||||||
|
self.language = language
|
||||||
|
self.chapter = 0
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<User {self.id}>"
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
# Display the username
|
||||||
|
if self.username is not None:
|
||||||
|
return f"@{self.username}"
|
||||||
|
# Display first name and last name
|
||||||
|
if self.lastname is not None:
|
||||||
|
return f"{self.firstname} {self.lastname}"
|
||||||
|
# Fallback if no lastname
|
||||||
|
return f"{self.firstname}"
|
||||||
|
|
||||||
|
|
||||||
|
class FirstChapter(Base):
|
||||||
|
"""The save data of the first chapter"""
|
||||||
|
__tablename__ = "firstchapter"
|
||||||
|
|
||||||
|
user_id = Column(Integer, ForeignKey("user.id"), primary_key=True)
|
||||||
|
user = relationship("User")
|
||||||
|
|
||||||
|
current_question = Column(Integer)
|
||||||
|
game_topic = Column(String)
|
||||||
|
game_release = Column(String)
|
||||||
|
|
||||||
|
def __init__(self, user):
|
||||||
|
self.user_id = user.id
|
||||||
|
self.current_question = 0
|
||||||
|
|
||||||
|
|
||||||
|
# If the script is run as standalone, generate the database
|
||||||
|
if __name__ == "__main__":
|
||||||
|
Base.metadata.create_all()
|
100
main.py
Normal file
100
main.py
Normal file
|
@ -0,0 +1,100 @@
|
||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
import telepot.aio
|
||||||
|
import random
|
||||||
|
from telepot.aio.loop import MessageLoop
|
||||||
|
|
||||||
|
from database import session, User, FirstChapter
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
|
||||||
|
async def on_message(message):
|
||||||
|
"""Called when a telegram message is received."""
|
||||||
|
# If a private message is received, add the user to the database
|
||||||
|
if message["chat"]["type"] == "private":
|
||||||
|
# Search for the matching user in the database
|
||||||
|
user = session.query(User).filter_by(id=message["chat"]["id"]).first()
|
||||||
|
# If the user doesn't exist, create a new entry
|
||||||
|
if user is None:
|
||||||
|
user = User(message["from"]["id"], message["from"]["first_name"], message["from"].get("last_name"), message["from"].get("username"), message["from"].get("language_code"))
|
||||||
|
# Add the user to the database
|
||||||
|
session.add(user)
|
||||||
|
session.commit()
|
||||||
|
await user.message(nlessbot, "Welcome to Nameless!\nThere is no game here yet.\nOr maybe there is.")
|
||||||
|
loop.create_task(call_every_x_seconds(advance_to_chapter_one, 10, user=user))
|
||||||
|
# If the user is playing the prologue, answer appropriately
|
||||||
|
if user.chapter == 0:
|
||||||
|
if message["text"].lower() == "sas":
|
||||||
|
await user.message(nlessbot, "sus")
|
||||||
|
elif message["text"].lower() == "lol":
|
||||||
|
await user.message(nlessbot, "haha")
|
||||||
|
elif "wtf" in message["text"]:
|
||||||
|
await user.message(nlessbot, "¯\_(ツ)_/¯")
|
||||||
|
elif user.chapter == 1:
|
||||||
|
data = session.query(FirstChapter).filter_by(user_id=user.id).first()
|
||||||
|
if data.current_question == 0:
|
||||||
|
data.game_topic = message["text"]
|
||||||
|
data.current_question = 1
|
||||||
|
session.commit()
|
||||||
|
await user.message(nlessbot, "Hmmm. Interesting.")
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
await user.message(nlessbot, "When do you think it will be released?")
|
||||||
|
elif data.current_question == 1:
|
||||||
|
if "half" in message["text"].lower() and "life" in message["text"].lower():
|
||||||
|
await user.message(nlessbot, "So... Never? I don't think so.")
|
||||||
|
return
|
||||||
|
data.game_release = message["text"]
|
||||||
|
data.current_question = 2
|
||||||
|
session.commit()
|
||||||
|
await user.message(nlessbot, "See you later then!")
|
||||||
|
|
||||||
|
|
||||||
|
async def call_every_x_seconds(coroutine: asyncio.coroutine, timeout: int, *args, **kwargs):
|
||||||
|
"""Call a function every x seconds, and stop calling it if it returns Ellipsis."""
|
||||||
|
while True:
|
||||||
|
# Await the coroutine
|
||||||
|
result = await coroutine(*args, **kwargs)
|
||||||
|
# Stop calling it if it returns ...
|
||||||
|
if result is ...:
|
||||||
|
break
|
||||||
|
# Wait the time specified in timeout
|
||||||
|
await asyncio.sleep(timeout)
|
||||||
|
|
||||||
|
|
||||||
|
async def advance_to_chapter_one(user):
|
||||||
|
"""Start chapter one. Maybe."""
|
||||||
|
# Generate a random number from 0 to 99
|
||||||
|
rolled_number = random.randrange(100)
|
||||||
|
# 1% chance to pass the check, it should take around 16 minutes
|
||||||
|
if rolled_number == 0:
|
||||||
|
# Advance the chapter number
|
||||||
|
user.chapter = 1
|
||||||
|
# Create the row for this chapter's data
|
||||||
|
data = FirstChapter(user)
|
||||||
|
# Add the row to the database
|
||||||
|
session.add(data)
|
||||||
|
session.commit()
|
||||||
|
# Introduce the chapter to the player
|
||||||
|
await user.message(nlessbot, "What do you think this game will be about?")
|
||||||
|
# Display that an user has advanced to a new chapter
|
||||||
|
print(f"{user} has advanced to Chapter One!")
|
||||||
|
# Stop calling the function
|
||||||
|
return ...
|
||||||
|
|
||||||
|
|
||||||
|
# Get the bot token from envvars
|
||||||
|
nlessbot_token = os.environ["nameless_token"]
|
||||||
|
# Create the bot
|
||||||
|
nlessbot = telepot.aio.Bot(nlessbot_token)
|
||||||
|
# Initialize the event loop
|
||||||
|
loop.create_task(MessageLoop(nlessbot, {"chat": on_message}).run_forever())
|
||||||
|
# Add the events that were stopped when the script was taken down
|
||||||
|
users = session.query(User).all()
|
||||||
|
for user in users:
|
||||||
|
if user.chapter == 0:
|
||||||
|
loop.create_task(call_every_x_seconds(advance_to_chapter_one, 10, user=user))
|
||||||
|
# Run the event loop
|
||||||
|
try:
|
||||||
|
loop.run_forever()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
exit(0)
|
Loading…
Reference in a new issue