1
Fork 0
mirror of https://github.com/Steffo99/greed.git synced 2024-11-24 23:04:18 +00:00
greed/database.py

304 lines
12 KiB
Python
Raw Normal View History

2018-04-09 10:24:55 +00:00
import typing
2018-02-05 11:49:21 +00:00
from sqlalchemy import create_engine, Column, ForeignKey, UniqueConstraint
from sqlalchemy import Integer, BigInteger, String, Text, LargeBinary, DateTime, Boolean
from sqlalchemy.orm import sessionmaker, relationship, backref
from sqlalchemy.ext.declarative import declarative_base
import configloader
import telegram
2018-02-01 09:06:40 +00:00
import requests
2018-04-16 10:18:42 +00:00
import utils
2020-02-04 13:13:14 +00:00
import importlib
language = configloader.config["Config"]["language"]
strings = importlib.import_module("strings." + language)
2017-12-11 09:43:37 +00:00
# Create a (lazy) database engine
engine = create_engine(configloader.config["Database"]["engine"])
# Create a base class to define all the database subclasses
TableDeclarativeBase = declarative_base(bind=engine)
# Create a Session class able to initialize database sessions
Session = sessionmaker()
2018-04-09 10:24:55 +00:00
# Define all the database tables using the sqlalchemy declarative base
class User(TableDeclarativeBase):
"""A Telegram user who used the bot at least once."""
# Telegram data
user_id = Column(BigInteger, primary_key=True)
first_name = Column(String, nullable=False)
last_name = Column(String)
username = Column(String)
# Current wallet credit
2018-01-10 10:29:02 +00:00
credit = Column(Integer, nullable=False)
# Extra table parameters
__tablename__ = "users"
2017-12-21 09:42:23 +00:00
def __init__(self, telegram_chat: telegram.Chat, **kwargs):
2017-12-20 10:18:06 +00:00
# Initialize the super
super().__init__(**kwargs)
# Get the data from telegram
2017-12-21 09:42:23 +00:00
self.user_id = telegram_chat.id
self.first_name = telegram_chat.first_name
self.last_name = telegram_chat.last_name
self.username = telegram_chat.username
# The starting wallet value is 0
2018-01-15 09:16:04 +00:00
self.credit = 0
def __str__(self):
"""Describe the user in the best way possible given the available data."""
if self.username is not None:
return f"@{self.username}"
elif self.last_name is not None:
return f"{self.first_name} {self.last_name}"
else:
return self.first_name
def identifiable_str(self):
"""Describe the user in the best way possible, ensuring a way back to the database record exists."""
return f"user_{self.user_id} ({str(self)})"
2018-03-07 09:48:30 +00:00
def mention(self):
"""Mention the user in the best way possible given the available data."""
if self.username is not None:
return f"@{self.username}"
else:
return f"[{self.first_name}](tg://user?id={self.user_id})"
def recalculate_credit(self):
"""Recalculate the credit for this user by calculating the sum of the values of all their transactions."""
valid_transactions: typing.List[Transaction] = [t for t in self.transactions if not t.refunded]
self.credit = sum(map(lambda t: t.value, valid_transactions))
def __repr__(self):
return f"<User {self} having {self.credit} credit>"
class Product(TableDeclarativeBase):
"""A purchasable product."""
# Product id
id = Column(Integer, primary_key=True)
# Product name
name = Column(String)
# Product description
description = Column(Text)
# Product price, if null product is not for sale
2018-01-10 10:29:02 +00:00
price = Column(Integer)
2018-02-01 09:06:40 +00:00
# Image data
image = Column(LargeBinary)
2018-03-06 16:39:02 +00:00
# Product has been deleted
deleted = Column(Boolean, nullable=False)
# Extra table parameters
2018-02-05 11:49:21 +00:00
__tablename__ = "products"
# No __init__ is needed, the default one is sufficient
2018-02-15 09:37:51 +00:00
def __str__(self):
return self.text()
2018-04-09 10:24:55 +00:00
def text(self, style: str="full", cart_qty: int=None):
"""Return the product details formatted with Telegram HTML. The image is omitted."""
if style == "short":
2018-04-16 10:18:42 +00:00
return f"{cart_qty}x {utils.telegram_html_escape(self.name)} - {str(utils.Price(self.price) * cart_qty)}"
elif style == "full":
2018-04-09 10:24:55 +00:00
if cart_qty is not None:
cart = strings.in_cart_format_string.format(quantity=cart_qty)
else:
cart = ''
2018-04-16 10:18:42 +00:00
return strings.product_format_string.format(name=utils.telegram_html_escape(self.name),
description=utils.telegram_html_escape(self.description),
price=str(utils.Price(self.price)),
cart=cart)
else:
raise ValueError("style is not an accepted value")
def __repr__(self):
return f"<Product {self.name}>"
2018-02-15 09:37:51 +00:00
def send_as_message(self, chat_id: int) -> dict:
2018-02-01 09:06:40 +00:00
"""Send a message containing the product data."""
2018-02-15 09:37:51 +00:00
if self.image is None:
r = requests.get(f"https://api.telegram.org/bot{configloader.config['Telegram']['token']}/sendMessage",
2018-04-09 10:24:55 +00:00
params={"chat_id": chat_id,
"text": self.text(),
"parse_mode": "HTML"})
2018-02-15 09:37:51 +00:00
else:
r = requests.post(f"https://api.telegram.org/bot{configloader.config['Telegram']['token']}/sendPhoto",
files={"photo": self.image},
params={"chat_id": chat_id,
"caption": self.text(),
2018-02-15 09:37:51 +00:00
"parse_mode": "HTML"})
return r.json()
2018-02-01 09:06:40 +00:00
def set_image(self, file: telegram.File):
"""Download an image from Telegram and store it in the image column.
This is a slow blocking function. Try to avoid calling it directly, use a thread if possible."""
# Download the photo through a get request
r = requests.get(file.file_path)
# Store the photo in the database record
self.image = r.content
2017-12-20 10:18:06 +00:00
class Transaction(TableDeclarativeBase):
"""A greed wallet transaction.
Wallet credit ISN'T calculated from these, but they can be used to recalculate it."""
2018-04-01 16:10:14 +00:00
# TODO: split this into multiple tables
# The internal transaction ID
2018-01-10 10:29:02 +00:00
transaction_id = Column(Integer, primary_key=True)
# The user whose credit is affected by this transaction
user_id = Column(BigInteger, ForeignKey("users.user_id"), nullable=False)
2020-03-27 17:06:31 +00:00
user = relationship("User", backref=backref("transactions"))
# The value of this transaction. Can be both negative and positive.
2018-01-10 10:29:02 +00:00
value = Column(Integer, nullable=False)
# Refunded status: if True, ignore the value of this transaction when recalculating
refunded = Column(Boolean, default=False)
# Extra notes on the transaction
notes = Column(Text)
2018-01-10 10:29:02 +00:00
2017-12-20 10:18:06 +00:00
# Payment provider
provider = Column(String)
2018-01-10 10:29:02 +00:00
# Transaction ID supplied by Telegram
telegram_charge_id = Column(String)
2017-12-20 10:18:06 +00:00
# Transaction ID supplied by the payment provider
2018-01-10 10:29:02 +00:00
provider_charge_id = Column(String)
2017-12-20 10:18:06 +00:00
# Extra transaction data, may be required by the payment provider in case of a dispute
payment_name = Column(String)
2018-01-10 10:29:02 +00:00
payment_phone = Column(String)
2017-12-20 10:18:06 +00:00
payment_email = Column(String)
2018-01-10 10:29:02 +00:00
# Order ID
2018-03-12 09:19:01 +00:00
order_id = Column(Integer, ForeignKey("orders.order_id"))
order = relationship("Order")
2018-01-10 10:29:02 +00:00
2017-12-20 10:18:06 +00:00
# Extra table parameters
__tablename__ = "transactions"
2018-01-10 10:29:02 +00:00
__table_args__ = (UniqueConstraint("provider", "provider_charge_id"),)
2017-12-20 10:18:06 +00:00
def __str__(self):
2018-04-16 10:18:42 +00:00
string = f"<b>T{self.transaction_id}</b> | {str(self.user)} | {utils.Price(self.value)}"
if self.refunded:
string += f" | {strings.emoji_refunded}"
if self.provider:
string += f" | {self.provider}"
if self.notes:
string += f" | {self.notes}"
2017-12-20 10:18:06 +00:00
return string
def __repr__(self):
2018-02-05 11:49:21 +00:00
return f"<Transaction {self.transaction_id} for User {self.user_id} {str(self)}>"
2017-12-20 10:18:06 +00:00
class Admin(TableDeclarativeBase):
"""A greed administrator with his permissions."""
# The telegram id
user_id = Column(BigInteger, ForeignKey("users.user_id"), primary_key=True)
2018-01-10 10:29:02 +00:00
user = relationship("User")
2017-12-20 10:18:06 +00:00
# Permissions
2018-04-30 14:14:02 +00:00
edit_products = Column(Boolean, default=False)
receive_orders = Column(Boolean, default=False)
create_transactions = Column(Boolean, default=False)
2018-04-16 10:09:51 +00:00
display_on_help = Column(Boolean, default=False)
is_owner = Column(Boolean, default=False)
2018-04-01 16:10:14 +00:00
# Live mode enabled
live_mode = Column(Boolean, default=False)
2017-12-20 10:18:06 +00:00
# Extra table parameters
__tablename__ = "admins"
def __repr__(self):
return f"<Admin {self.user_id}>"
2018-02-05 11:49:21 +00:00
class Order(TableDeclarativeBase):
"""An order which has been placed by an user.
It may include multiple products, available in the OrderItem table."""
# The unique order id
order_id = Column(Integer, primary_key=True)
# The user who placed the order
user_id = Column(BigInteger, ForeignKey("users.user_id"))
user = relationship("User")
# Date of creation
creation_date = Column(DateTime, nullable=False)
# Date of delivery
2018-02-05 11:49:21 +00:00
delivery_date = Column(DateTime)
# Date of refund: if null, product hasn't been refunded
refund_date = Column(DateTime)
# Refund reason: if null, product hasn't been refunded
refund_reason = Column(Text)
2018-02-05 11:49:21 +00:00
# List of items in the order
2018-04-09 10:24:55 +00:00
items: typing.List["OrderItem"] = relationship("OrderItem")
2018-02-05 11:49:21 +00:00
# Extra details specified by the purchasing user
notes = Column(Text)
2018-03-12 09:19:01 +00:00
# Linked transaction
transaction = relationship("Transaction", uselist=False)
2018-02-05 11:49:21 +00:00
# Extra table parameters
__tablename__ = "orders"
def __repr__(self):
return f"<Order {self.order_id} placed by User {self.user_id}>"
2018-04-26 06:47:53 +00:00
def get_text(self, session, user=False):
2018-03-12 09:19:01 +00:00
joined_self = session.query(Order).filter_by(order_id=self.order_id).join(Transaction).one()
2018-03-07 09:48:30 +00:00
items = ""
for item in self.items:
items += str(item) + "\n"
2018-04-01 16:10:14 +00:00
if self.delivery_date is not None:
status_emoji = strings.emoji_completed
2018-04-26 06:47:53 +00:00
status_text = strings.text_completed
2018-04-01 16:10:14 +00:00
elif self.refund_date is not None:
status_emoji = strings.emoji_refunded
2018-04-26 06:47:53 +00:00
status_text = strings.text_refunded
2018-04-01 16:10:14 +00:00
else:
status_emoji = strings.emoji_not_processed
2018-04-26 06:47:53 +00:00
status_text = strings.text_not_processed
2018-04-26 07:59:47 +00:00
if user and configloader.config["Appearance"]["full_order_info"] == "no":
2018-04-26 06:47:53 +00:00
return strings.user_order_format_string.format(status_emoji=status_emoji,
status_text=status_text,
items=items,
notes=self.notes,
value=str(utils.Price(-joined_self.transaction.value))) + \
(strings.refund_reason.format(reason=self.refund_reason) if self.refund_date is not None else "")
else:
return status_emoji + " " + \
strings.order_number.format(id=self.order_id) + "\n" + \
strings.order_format_string.format(user=self.user.mention(),
date=self.creation_date.isoformat(),
items=items,
notes=self.notes if self.notes is not None else "",
value=str(utils.Price(-joined_self.transaction.value))) + \
(strings.refund_reason.format(reason=self.refund_reason) if self.refund_date is not None else "")
2018-03-07 09:48:30 +00:00
2018-02-05 11:49:21 +00:00
class OrderItem(TableDeclarativeBase):
"""A product that has been purchased as part of an order."""
# The unique item id
item_id = Column(Integer, primary_key=True)
# The product that is being ordered
product_id = Column(Integer, ForeignKey("products.id"), nullable=False)
product = relationship("Product")
# The order in which this item is being purchased
order_id = Column(Integer, ForeignKey("orders.order_id"), nullable=False)
# Extra table parameters
__tablename__ = "orderitems"
2018-03-07 09:48:30 +00:00
def __str__(self):
2018-04-16 10:18:42 +00:00
return f"{self.product.name} - {str(utils.Price(self.product.price))}"
2018-03-07 09:48:30 +00:00
2018-02-05 11:49:21 +00:00
def __repr__(self):
return f"<OrderItem {self.item_id}>"
2018-05-02 14:32:16 +00:00
TableDeclarativeBase.metadata.create_all()