|
| 1 | +from pathlib import Path |
| 2 | +from typing import TYPE_CHECKING, Any, Sequence, Union, cast |
| 3 | + |
| 4 | +from nonebot.adapters import Bot, Event |
| 5 | +from nonebot.adapters.yunhu.bot import Bot as YunHuBot |
| 6 | +from nonebot.adapters.yunhu.event import Event as YunHuEvent |
| 7 | +from nonebot.adapters.yunhu.event import MessageEvent, NoticeEvent |
| 8 | +from nonebot.adapters.yunhu.message import Message, MessageSegment |
| 9 | +from nonebot.adapters.yunhu.models import BaseNotice, ButtonBody, SendMsgResponse |
| 10 | +from tarina import lang |
| 11 | + |
| 12 | +from nonebot_plugin_alconna.uniseg.constraint import SupportScope |
| 13 | +from nonebot_plugin_alconna.uniseg.exporter import MessageExporter, SerializeFailed, SupportAdapter, Target, export |
| 14 | +from nonebot_plugin_alconna.uniseg.segment import At, Button, Emoji, File, Image, Keyboard, Reply, Segment, Text, Video |
| 15 | + |
| 16 | + |
| 17 | +class YunHuMessageExporter(MessageExporter[Message]): |
| 18 | + @classmethod |
| 19 | + def get_adapter(cls) -> SupportAdapter: |
| 20 | + return SupportAdapter.yunhu |
| 21 | + |
| 22 | + def get_message_type(self): |
| 23 | + return Message |
| 24 | + |
| 25 | + def get_target(self, event: Event, bot: Union[Bot, None] = None) -> Target: |
| 26 | + if isinstance(event, MessageEvent): |
| 27 | + return Target( |
| 28 | + event.event.sender.senderId, |
| 29 | + event.event.chat.chatId if event.event.chat.chatType == "group" else "", |
| 30 | + private=(event.event.chat.chatType == "bot"), |
| 31 | + source=event.event.message.msgId, |
| 32 | + adapter=self.get_adapter(), |
| 33 | + self_id=bot.self_id if bot else None, |
| 34 | + scope=SupportScope.yunhu, |
| 35 | + ) |
| 36 | + if isinstance(event, NoticeEvent): |
| 37 | + if TYPE_CHECKING: |
| 38 | + assert isinstance(event.event, BaseNotice) |
| 39 | + return Target( |
| 40 | + event.get_user_id(), |
| 41 | + event.event.chatId, |
| 42 | + private=(event.event.chatType == "user"), |
| 43 | + source=event.header.eventId, |
| 44 | + adapter=self.get_adapter(), |
| 45 | + self_id=bot.self_id if bot else None, |
| 46 | + scope=SupportScope.yunhu, |
| 47 | + ) |
| 48 | + raise NotImplementedError |
| 49 | + |
| 50 | + def get_message_id(self, event: Event) -> str: |
| 51 | + assert isinstance( |
| 52 | + event, |
| 53 | + MessageEvent, |
| 54 | + ) |
| 55 | + return event.event.message.msgId |
| 56 | + |
| 57 | + @export |
| 58 | + async def text(self, seg: Text, bot: Union[Bot, None]) -> "MessageSegment": |
| 59 | + if seg.extract_most_style() == "markdown": |
| 60 | + return MessageSegment.markdown(seg.text) |
| 61 | + if seg.extract_most_style() == "html": |
| 62 | + return MessageSegment.html(seg.text) |
| 63 | + if seg.styles: |
| 64 | + return MessageSegment.markdown(str(seg)) |
| 65 | + return MessageSegment.text(seg.text) |
| 66 | + |
| 67 | + @export |
| 68 | + async def at(self, seg: At, bot: Union[Bot, None]) -> "MessageSegment": |
| 69 | + if seg.flag == "user": |
| 70 | + return MessageSegment.at(seg.target, seg.display or "") |
| 71 | + raise SerializeFailed( |
| 72 | + lang.require("nbp-uniseg", "failed_segment").format(adapter="yunhu", seg=seg, target="at") |
| 73 | + ) |
| 74 | + |
| 75 | + @export |
| 76 | + async def face(self, seg: Emoji, bot: Union[Bot, None]) -> "MessageSegment": |
| 77 | + return MessageSegment.face(seg.id, seg.name or "") |
| 78 | + |
| 79 | + @export |
| 80 | + async def media(self, seg: Union[Image, Video, File], bot: Union[Bot, None]) -> "MessageSegment": |
| 81 | + name = seg.__class__.__name__.lower() |
| 82 | + method = { |
| 83 | + "image": MessageSegment.image, |
| 84 | + "video": MessageSegment.video, |
| 85 | + "file": MessageSegment.file, |
| 86 | + }[name] |
| 87 | + |
| 88 | + if seg.url: |
| 89 | + return method(url=seg.url) |
| 90 | + if seg.raw: |
| 91 | + return method(raw=seg.raw_bytes) |
| 92 | + if seg.path: |
| 93 | + return method(raw=Path(seg.path).read_bytes()) |
| 94 | + raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type="image", seg=seg)) |
| 95 | + |
| 96 | + @export |
| 97 | + async def reply(self, seg: Reply, bot: Union[Bot, None]) -> "MessageSegment": |
| 98 | + return MessageSegment("$yunhu:reply", {"message_id": seg.id}) |
| 99 | + |
| 100 | + def _button(self, seg: Button, bot: Union[Bot, None]) -> ButtonBody: |
| 101 | + label = str(seg.label) |
| 102 | + if seg.flag == "link": |
| 103 | + return {"text": label, "actionType": 1, "url": seg.url} # pyright: ignore[reportReturnType] |
| 104 | + if seg.flag == "action": |
| 105 | + return {"text": label, "actionType": 3} # pyright: ignore[reportReturnType] |
| 106 | + return {"text": label, "actionType": 2, "value": seg.text} # pyright: ignore[reportReturnType] |
| 107 | + |
| 108 | + @export |
| 109 | + async def button(self, seg: Button, bot: Union[Bot, None]): |
| 110 | + return MessageSegment("$yunhu:button", {"button": self._button(seg, bot)}) |
| 111 | + |
| 112 | + @export |
| 113 | + async def keyboard(self, seg: Keyboard, bot: Union[Bot, None]): |
| 114 | + if not seg.children: |
| 115 | + raise SerializeFailed(lang.require("nbp-uniseg", "invalid_segment").format(type="keyboard", seg=seg)) |
| 116 | + buttons = [self._button(but, bot) for but in seg.children] |
| 117 | + if not seg.row: |
| 118 | + return MessageSegment("$yunhu:button_row", {"buttons": buttons}) |
| 119 | + rows = [buttons[i : i + (seg.row or 9)] for i in range(0, len(buttons), seg.row or 9)] |
| 120 | + return MessageSegment("$yunhu:keyboard", {"buttons": rows}) |
| 121 | + |
| 122 | + async def send_to(self, target: Union[Target, YunHuEvent], bot: Bot, message: Message, **kwargs): |
| 123 | + assert isinstance(bot, YunHuBot) |
| 124 | + if TYPE_CHECKING: |
| 125 | + assert isinstance(message, self.get_message_type()) |
| 126 | + |
| 127 | + kb = None |
| 128 | + message_id: str | None = None |
| 129 | + |
| 130 | + # 处理 button |
| 131 | + if buttons := message.get("$yunhu:button"): |
| 132 | + message = message.exclude("$yunhu:button") |
| 133 | + buts = [but.data["button"] for but in buttons] |
| 134 | + kb = [buts[i : i + 9] for i in range(0, len(buts), 9)] |
| 135 | + |
| 136 | + # 处理 button_row |
| 137 | + if rows := message.get("$yunhu:button_row"): |
| 138 | + message = message.exclude("$yunhu:button_row") |
| 139 | + but_rows = [row.data["buttons"] for row in rows] |
| 140 | + if not kb: |
| 141 | + kb = but_rows |
| 142 | + else: |
| 143 | + kb.extend(but_rows) |
| 144 | + |
| 145 | + # 处理 keyboard |
| 146 | + if keyboard := message.get("$yunhu:keyboard"): |
| 147 | + message = message.exclude("$yunhu:keyboard") |
| 148 | + keyboard_buttons = keyboard[0].data["buttons"] |
| 149 | + if not kb: |
| 150 | + kb = keyboard_buttons |
| 151 | + else: |
| 152 | + kb.extend(keyboard_buttons) |
| 153 | + |
| 154 | + if kb: |
| 155 | + message.append(MessageSegment.buttons(kb)) |
| 156 | + |
| 157 | + # 处理 reply |
| 158 | + if reply_segments := message.get("$yunhu:reply"): |
| 159 | + message = message.exclude("$yunhu:reply") |
| 160 | + raw_inner_id = reply_segments[0].data.get("message_id") |
| 161 | + message_id = str(raw_inner_id) if raw_inner_id else None |
| 162 | + else: |
| 163 | + message_id = None |
| 164 | + |
| 165 | + if isinstance(target, YunHuEvent): |
| 166 | + if message_id: |
| 167 | + return await bot.send(event=target, message=message, reply_to=message_id) |
| 168 | + return await bot.send(event=target, message=message) |
| 169 | + |
| 170 | + content, content_type = message.serialize() |
| 171 | + return await bot.send_msg( |
| 172 | + receive_type=("user" if target.private else "group"), |
| 173 | + receive_id=target.id, |
| 174 | + content=content, |
| 175 | + content_type=content_type, |
| 176 | + parent_id=message_id, |
| 177 | + ) |
| 178 | + |
| 179 | + async def recall(self, mid: Any, bot: Bot, context: Union[Target, Event]): |
| 180 | + assert isinstance(bot, YunHuBot) |
| 181 | + if isinstance(mid, (str, int)) and isinstance(context, MessageEvent): |
| 182 | + if context.event.message.chatType == "bot": |
| 183 | + chat_id = context.event.sender.senderId |
| 184 | + chat_type = "user" |
| 185 | + else: |
| 186 | + chat_id = context.event.message.chatId |
| 187 | + chat_type = "group" |
| 188 | + await bot.delete_msg(message_id=str(mid), chat_id=chat_id, chat_type=chat_type) |
| 189 | + else: |
| 190 | + _mid: SendMsgResponse = cast(SendMsgResponse, mid) |
| 191 | + assert _mid.data |
| 192 | + await bot.delete_msg( |
| 193 | + message_id=_mid.data.messageInfo.msgId, |
| 194 | + chat_id=_mid.data.messageInfo.recvId, |
| 195 | + chat_type=_mid.data.messageInfo.recvType, |
| 196 | + ) |
| 197 | + |
| 198 | + async def edit(self, new: Sequence[Segment], mid: Any, bot: Bot, context: Union[Target, Event]): |
| 199 | + assert isinstance(bot, YunHuBot) |
| 200 | + new_msg = await self.export(new, bot, True) |
| 201 | + content, _type = new_msg.serialize() |
| 202 | + if isinstance(mid, (str, int)) and isinstance(context, MessageEvent): |
| 203 | + if context.event.message.chatType == "bot": |
| 204 | + chat_id = context.event.sender.senderId |
| 205 | + chat_type = "user" |
| 206 | + else: |
| 207 | + chat_id = context.event.message.chatId |
| 208 | + chat_type = "group" |
| 209 | + await bot.edit_msg( |
| 210 | + message_id=str(mid), |
| 211 | + recvId=chat_id, |
| 212 | + recvType=chat_type, |
| 213 | + content=content, # pyright: ignore[reportArgumentType] |
| 214 | + content_type=_type, # pyright: ignore[reportArgumentType] |
| 215 | + ) |
| 216 | + else: |
| 217 | + _mid: SendMsgResponse = cast(SendMsgResponse, mid) |
| 218 | + assert _mid.data |
| 219 | + await bot.edit_msg( |
| 220 | + message_id=_mid.data.messageInfo.msgId, |
| 221 | + recvId=_mid.data.messageInfo.recvId, |
| 222 | + recvType=_mid.data.messageInfo.recvType, |
| 223 | + content=content, # pyright: ignore[reportArgumentType] |
| 224 | + content_type=_type, # pyright: ignore[reportArgumentType] |
| 225 | + ) |
| 226 | + |
| 227 | + def get_reply(self, mid: Any): |
| 228 | + if isinstance(mid, MessageEvent): |
| 229 | + return Reply(mid.event.message.msgId) |
| 230 | + raise NotImplementedError |
0 commit comments