1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-24 03:54:20 +00:00
royalnet/royalnet/audio/playmodes.py

214 lines
7.3 KiB
Python
Raw Normal View History

2019-04-12 13:19:28 +00:00
import math
import random
import typing
2019-07-31 15:28:21 +00:00
from collections import namedtuple
2019-07-31 00:10:27 +00:00
from .ytdldiscord import YtdlDiscord
from .fileaudiosource import FileAudioSource
2019-04-12 13:19:28 +00:00
class PlayMode:
2019-04-23 18:08:37 +00:00
"""The base class for a PlayMode, such as :py:class:`royalnet.audio.Playlist`. Inherit from this class if you want to create a custom PlayMode."""
2019-04-16 23:15:35 +00:00
def __init__(self):
2019-04-23 18:08:37 +00:00
"""Create a new PlayMode and initialize the generator inside."""
2019-07-31 00:10:27 +00:00
self.now_playing: typing.Optional[YtdlDiscord] = None
2019-04-23 23:44:10 +00:00
self.generator: typing.AsyncGenerator = self._generate_generator()
2019-04-16 23:15:35 +00:00
2019-07-31 00:10:27 +00:00
async def next(self) -> typing.Optional[FileAudioSource]:
"""Get the next :py:class:`royalnet.audio.FileAudioSource` from the list and advance it.
2019-04-23 23:44:10 +00:00
Returns:
2019-07-31 00:10:27 +00:00
The next :py:class:`royalnet.audio.FileAudioSource`."""
2019-04-16 23:15:35 +00:00
return await self.generator.__anext__()
2019-04-23 18:08:37 +00:00
def videos_left(self) -> typing.Union[int, float]:
"""Return the number of videos left in the PlayMode.
2019-04-23 23:44:10 +00:00
Returns:
Usually a :py:class:`int`, but may return also :py:obj:`math.inf` if the PlayMode is infinite."""
2019-04-12 13:19:28 +00:00
raise NotImplementedError()
2019-04-23 23:44:10 +00:00
async def _generate_generator(self):
2019-07-31 00:10:27 +00:00
"""Factory function for an async generator that changes the ``now_playing`` property either to a :py:class:`royalnet.audio.FileAudioSource` or to ``None``, then yields the value it changed it to.
2019-04-23 23:44:10 +00:00
Yields:
2019-07-31 00:10:27 +00:00
The :py:class:`royalnet.audio.FileAudioSource` to be played next."""
raise NotImplementedError()
# This is needed to make the coroutine an async generator
# noinspection PyUnreachableCode
2019-04-16 23:15:35 +00:00
yield NotImplemented
2019-04-12 13:19:28 +00:00
2019-07-31 00:10:27 +00:00
def add(self, item: YtdlDiscord) -> None:
"""Add a new :py:class:`royalnet.audio.YtdlDiscord` to the PlayMode.
2019-04-23 23:44:10 +00:00
Args:
item: The item to add to the PlayMode."""
2019-04-12 13:19:28 +00:00
raise NotImplementedError()
2019-04-23 23:44:10 +00:00
def delete(self) -> None:
2019-07-31 00:10:27 +00:00
"""Delete all :py:class:`royalnet.audio.YtdlDiscord` contained inside this PlayMode."""
2019-04-23 18:08:37 +00:00
raise NotImplementedError()
2019-07-31 00:10:27 +00:00
def queue_preview(self) -> typing.List[YtdlDiscord]:
2019-06-02 16:19:53 +00:00
"""Display all the videos in the PlayMode as a list, if possible.
2019-07-31 00:10:27 +00:00
To be used with ``queue`` commands, for example.
2019-06-02 16:19:53 +00:00
Raises:
NotImplementedError: If a preview can't be generated.
Returns:
A list of videos contained in the queue."""
raise NotImplementedError()
2019-04-12 13:19:28 +00:00
class Playlist(PlayMode):
2019-07-31 00:10:27 +00:00
"""A video list. :py:class:`royalnet.audio.YtdlDiscord` played are removed from the list."""
2019-04-23 23:44:10 +00:00
2019-07-31 00:10:27 +00:00
def __init__(self, starting_list: typing.List[YtdlDiscord] = None):
2019-04-23 23:44:10 +00:00
"""Create a new Playlist.
Args:
starting_list: A list of items with which the Playlist will be created."""
2019-04-16 23:15:35 +00:00
super().__init__()
2019-04-12 13:19:28 +00:00
if starting_list is None:
starting_list = []
2019-07-31 00:10:27 +00:00
self.list: typing.List[YtdlDiscord] = starting_list
2019-04-12 13:19:28 +00:00
2019-04-23 18:08:37 +00:00
def videos_left(self) -> typing.Union[int, float]:
2019-04-12 13:19:28 +00:00
return len(self.list)
2019-04-23 23:44:10 +00:00
async def _generate_generator(self):
2019-04-12 13:19:28 +00:00
while True:
try:
next_video = self.list.pop(0)
except IndexError:
2019-04-16 23:15:35 +00:00
self.now_playing = None
2019-07-31 14:18:19 +00:00
yield None
2019-04-12 13:19:28 +00:00
else:
2019-04-16 23:15:35 +00:00
self.now_playing = next_video
2019-07-31 14:18:19 +00:00
yield next_video.spawn_audiosource()
if self.now_playing is not None:
self.now_playing.delete()
2019-04-12 13:19:28 +00:00
2019-04-23 23:44:10 +00:00
def add(self, item) -> None:
2019-04-12 13:19:28 +00:00
self.list.append(item)
2019-04-23 23:44:10 +00:00
def delete(self) -> None:
2019-05-31 10:31:51 +00:00
if self.now_playing is not None:
self.now_playing.delete()
while self.list:
self.list.pop(0).delete()
2019-07-31 00:10:27 +00:00
def queue_preview(self) -> typing.List[YtdlDiscord]:
2019-06-02 16:19:53 +00:00
return self.list
2019-04-12 13:19:28 +00:00
class Pool(PlayMode):
2019-07-31 15:28:21 +00:00
"""A random pool. :py:class:`royalnet.audio.YtdlDiscord` are selected in random order and are not repeated until every song has been played at least once."""
2019-04-23 23:44:10 +00:00
2019-07-31 00:10:27 +00:00
def __init__(self, starting_pool: typing.List[YtdlDiscord] = None):
2019-04-23 23:44:10 +00:00
"""Create a new Pool.
Args:
starting_pool: A list of items the Pool will be created from."""
2019-04-16 23:15:35 +00:00
super().__init__()
2019-04-12 13:19:28 +00:00
if starting_pool is None:
starting_pool = []
2019-07-31 00:10:27 +00:00
self.pool: typing.List[YtdlDiscord] = starting_pool
self._pool_copy: typing.List[YtdlDiscord] = []
2019-04-12 13:19:28 +00:00
2019-04-23 18:08:37 +00:00
def videos_left(self) -> typing.Union[int, float]:
2019-04-12 13:19:28 +00:00
return math.inf
2019-04-23 23:44:10 +00:00
async def _generate_generator(self):
2019-04-12 13:19:28 +00:00
while True:
if not self.pool:
2019-04-16 23:15:35 +00:00
self.now_playing = None
2019-04-12 17:24:42 +00:00
yield None
continue
2019-04-12 13:19:28 +00:00
self._pool_copy = self.pool.copy()
random.shuffle(self._pool_copy)
while self._pool_copy:
next_video = self._pool_copy.pop(0)
2019-04-16 23:15:35 +00:00
self.now_playing = next_video
2019-07-31 14:18:19 +00:00
yield next_video.spawn_audiosource()
2019-04-12 13:19:28 +00:00
2019-04-23 23:44:10 +00:00
def add(self, item) -> None:
2019-04-12 13:19:28 +00:00
self.pool.append(item)
self._pool_copy.append(item)
2019-04-12 13:19:28 +00:00
random.shuffle(self._pool_copy)
2019-04-23 23:44:10 +00:00
def delete(self) -> None:
for item in self.pool:
item.delete()
self.pool = None
self._pool_copy = None
2019-06-02 16:19:53 +00:00
2019-07-31 00:10:27 +00:00
def queue_preview(self) -> typing.List[YtdlDiscord]:
2019-06-02 16:19:53 +00:00
preview_pool = self.pool.copy()
random.shuffle(preview_pool)
2019-07-31 00:10:27 +00:00
return preview_pool
2019-07-31 15:28:21 +00:00
class Layers(PlayMode):
"""A playmode for playing a single song with multiple layers."""
Layer = namedtuple("Layer", ["dfile", "source"])
def __init__(self, starting_layers: typing.List[YtdlDiscord] = None):
super().__init__()
if starting_layers is None:
starting_layers = []
self.layers = []
for item in starting_layers:
self.add(item)
def videos_left(self) -> typing.Union[int, float]:
return 1 if len(self.layers) > 0 else 0
async def _generate_generator(self):
current_layer = None
current_source = None
while True:
if len(self.layers) == 0:
yield None
continue
if self.now_playing is None:
self.now_playing = self.layers[0].dfile
current_source = self.layers[0].source
current_layer = 0
yield current_source
continue
if current_source.file.closed:
self.now_playing = None
self.layers = []
current_layer = None
current_source = None
yield None
continue
current_layer += 1
current_position = current_source.file.tell()
if current_layer >= len(self.layers):
self.now_playing = self.layers[0].dfile
current_source = self.layers[0].source
current_source.file.seek(current_position)
current_layer = 0
yield current_source
continue
self.now_playing = self.layers[current_layer].dfile
current_source = self.layers[current_layer].source
current_source.file.seek(current_position)
yield current_source
def add(self, item) -> None:
self.layers.append(self.Layer(dfile=item, source=item.spawn_audiosource()))
def delete(self) -> None:
for item in self.layers:
item.dfile.delete()
self.layers = None
def queue_preview(self) -> typing.List[YtdlDiscord]:
return [layer.dfile for layer in self.layers]