Source code for voltage.messageable

from __future__ import annotations

from asyncio import Task, sleep
from typing import TYPE_CHECKING, List, Optional, Union

# Internal imports
from .enums import SortType
from .errors import HTTPError
from .message import Message, MessageInteractions

if TYPE_CHECKING:
    from .embed import SendableEmbed
    from .file import File
    from .internals import CacheHandler
    from .message import MessageMasquerade, MessageReply
    from .types import MessagePayload, MessageReplyPayload, SendableEmbedPayload


class Typing:
    """
    A simple context manager for typing.
    """

    def __init__(self, channel: Messageable):
        self.channel = channel
        self.ws = channel.cache.ws
        self.loop = channel.cache.loop
        self.task: Task

    async def keep_typing(self):
        while True:
            await self.channel.start_typing()
            await sleep(2)

    def __enter__(self):
        self.task = self.loop.create_task(self.keep_typing())
        return self

    def __exit__(self, *_):
        self.task.cancel()
        self.loop.create_task(self.channel.end_typing())

    async def __aenter__(self):
        self.task = self.loop.create_task(self.keep_typing())
        return self

    async def __aexit__(self, *_):
        self.task.cancel()
        await self.channel.end_typing()


class MessageIterator:
    def __init__(self, data: list[MessagePayload], channel: Messageable):
        self.data = data
        self.processed: list[Message] = []
        self.channel = channel

    def _process_next(self):
        if len(self.data) == 0:
            if len(self.processed) == 0:
                return None
            else:
                self.processed.append(Message(self.data.pop(0), self.channel.cache))
                return self.processed[-1]

    def _get_next_message(self):
        return self.processed.pop(0)

    def __iter__(self):
        return self

    def __next__(self):
        if len(self.data) == 0:
            raise StopIteration
        return Message(self.data.pop(0), self.channel.cache)

    def __len__(self):
        return len(self.data) + len(self.processed)

    def __getitem__(self, index: int):
        while len(self.processed) <= index:
            self._process_next()
        return Message(self.data[index], self.channel.cache)

    def __reversed__(self):
        return MessageIterator(list(reversed(self.data)), self.channel)


[docs]class Messageable: # Really missing rust traits rn :( """ A class which all messageable have to inhertit from. Attributes ---------- channel_id: :class:`str` The ID of the messageable object's channel. cache: :class:`CacheHandler` The cache handler of the messageable object. """ __slots__ = () cache: CacheHandler
[docs] async def get_id(self) -> str: """ Get the ID of the messageable object's channel. Returns ------- :class:`str` The ID of the messageable object's channel. """ return NotImplemented # TIL: NotImplemented is a thing, thank you mr random stackoverflow user.
[docs] async def send( self, content: Optional[str] = None, *, embed: Optional[Union[SendableEmbed, SendableEmbedPayload]] = None, embeds: Optional[List[Union[SendableEmbed, SendableEmbedPayload]]] = None, attachment: Optional[Union[File, str]] = None, attachments: Optional[List[Union[File, str]]] = None, reply: Optional[MessageReply] = None, replies: Optional[List[Union[MessageReply, MessageReplyPayload]]] = None, masquerade: Optional[MessageMasquerade] = None, interactions: Optional[MessageInteractions] = None, delete_after: Optional[float] = None, ) -> Message: # YEAH BABY, THAT'S WHAT WE'VE BEEN WAITING FOR, THAT'S WHAT IT'S ALL ABOUT, WOOOOOOOOOOOOOOOO """ Send a message to the messageable object's channel. Parameters ---------- content: Optional[:class:`str`] The content of the message. embed: Optional[:class:`Embed`] The embed of the message. embeds: Optional[List[:class:`Embed`]] The embeds of the message. attachment: Optional[:class:`File`] The attachment of the message. attachments: Optional[List[:class:`File`]] The attachments of the message. reply: Optional[:class:`MessageReply`] The reply of the message. replies: Optional[List[:class:`MessageReply`]] The replies of the message. masquerade: Optional[:class:`MessageMasquerade`] The masquerade of the message. interactions: Optional[:class:`MessageInteractions`] The interactions of the message. Returns ------- :class:`Message` The message that got sent. """ embeds = [embed] if embed else embeds replies = [reply] if reply else replies attachments = [attachment] if attachment else attachments content = str(content) if content else None message = await self.cache.http.send_message( await self.get_id(), content=content, embeds=embeds, attachments=attachments, replies=replies, masquerade=masquerade, interactions=interactions, ) msg = self.cache.add_message(message) if delete_after is not None: self.cache.loop.create_task(msg.delete(delay=delete_after)) return msg
[docs] async def fetch_message(self, message_id: str) -> Message: """ Fetch a message from the messageable object's channel. Parameters ---------- message_id: :class:`str` The ID of the message to fetch. Returns ------- :class:`Message` The message that got fetched. """ return await self.cache.fetch_message(await self.get_id(), message_id)
[docs] async def history( self, limit: int = 100, *, sort: SortType = SortType.latest, before: Optional[str] = None, after: Optional[str] = None, nearby: Optional[str] = None, ) -> MessageIterator: """ Fetch the messageable object's channel's history. Parameters ---------- limit: Optional[:class:`int`] The limit of the history. sort: Optional[:class:`SortType`] The sort type of the history. before: Optional[:class:`str`] The ID of the message to fetch before. after: Optional[:class:`str`] The ID of the message to fetch after. nearby: Optional[:class:`str`] The ID of the message to fetch nearby. Returns ------- List[:class:`Message`] The messages that got fetched. """ messages = await self.cache.http.fetch_messages(await self.get_id(), sort.value, limit=limit, before=before, after=after, nearby=nearby, include_users=False) # type: ignore returned = [] for i in messages: if i["author"] != "00000000000000000000000000": returned.append(i) return MessageIterator(returned, self)
[docs] async def search( self, query: str, *, sort: SortType = SortType.latest, # type: ignore limit: int = 100, before: Optional[str] = None, after: Optional[str] = None, ) -> MessageIterator: """ Search for messages in the messageable object's channel. Parameters ---------- query: :class:`str` The query to search for. sort: Optional[:class:`SortType`] The sort type of the search. limit: Optional[:class:`int`] The limit of the search. before: Optional[:class:`str`] The ID of the message to fetch before. after: Optional[:class:`str`] The ID of the message to fetch after. Returns ------- List[:class:`Message`] The messages that got found. """ messages = await self.cache.http.search_for_message( await self.get_id(), query, sort=sort.value, limit=limit, before=before, after=after, include_users=False, ) return MessageIterator(messages, self)
[docs] async def purge(self, amount: int): """ Purge messages from the messageable object's channel. Parameters ---------- amount: :class:`int` The amount of messages to purge. """ channel_id = await self.get_id() for i in await self.cache.http.fetch_messages(channel_id, "Latest", limit=amount): try: await self.cache.http.delete_message(channel_id, i["_id"]) except HTTPError as e: status = e.response.status if status == 404: pass else: raise
[docs] def typing(self) -> Typing: """ A context manager that sends a typing indicator to the messageable object's channel. """ return Typing(self)
[docs] async def start_typing(self): """ Send a typing indicator to the messageable object's channel. """ await self.cache.ws.begin_typing(await self.get_id())
[docs] async def end_typing(self): """ Stop sending a typing indicator to the messageable object's channel. """ await self.cache.ws.end_typing(await self.get_id())