This repository was archived by the owner on Sep 2, 2023. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathrun.py
297 lines (251 loc) · 9.22 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# shenhe-bot by seria
import argparse
import asyncio
import json
import os
import platform
import sys
from pathlib import Path
from typing import Dict, List, Optional
import aiofiles
import aiohttp
import asyncpg
import discord
import sentry_sdk
from discord import app_commands
from discord.ext import commands
from dotenv import load_dotenv
import dev.models as models
from apps.db.main import Database
from apps.genshin import launch_browsers, launch_debug_browser
from apps.genshin_data.text_maps import load_text_maps
from apps.text_map import text_map
from dev.base_ui import (
get_error_handle_embed,
global_error_handler,
support_server_view,
)
from dev.exceptions import FeatureDisabled, Maintenance
from dev.models import BotModel
from utils import log, sentry_logging
load_dotenv()
parser = argparse.ArgumentParser()
parser.add_argument(
"--env",
choices=("production", "testing", "development"),
required=False,
default="development",
help="The environment to run the application in",
)
args = parser.parse_args()
if args.env == "production":
token = os.getenv("SHENHE_BOT_TOKEN")
debug = False
launch_browser_in_debug = False
database_url = os.getenv("SHENHE_BOT_DATABASE_URL")
sentry_sdk.init(
dsn=os.getenv("SENTRY_DSN"),
integrations=[sentry_logging],
traces_sample_rate=1.0,
)
elif args.env == "testing":
token = os.getenv("SHENHE_TEST_TOKEN")
debug = True
launch_browser_in_debug = True
database_url = os.getenv("SHENHE_BOT_DATABASE_URL")
elif args.env == "development":
token = os.getenv("YAE_TOKEN")
debug = True
launch_browser_in_debug = False
database_url = os.getenv("YAE_DATABASE_URL")
else:
print("Invalid environment specified")
sys.exit(1)
class Translator(app_commands.Translator):
async def translate(
self,
string: app_commands.locale_str,
lang: discord.Locale,
_: app_commands.TranslationContext,
) -> Optional[str]:
try:
text = text_map.get(string.extras["hash"], lang)
if len(text.split(" ")) == 1:
return text.lower()
if text == "":
return None
# hard code stuff
if str(lang) == "vi" and string.extras["hash"] == 105:
return "nhân-vật"
return text
except KeyError:
return None
class ShenheCommandTree(app_commands.CommandTree):
def __init__(self, bot: commands.AutoShardedBot):
super().__init__(bot)
async def sync(
self, *, guild: Optional[discord.abc.Snowflake] = None
) -> List[app_commands.AppCommand]:
synced = await super().sync(guild=guild)
log.info(f"[System]sync: Synced {len(synced)} commands")
if synced:
command_map: Dict[str, int] = {}
for command in synced:
command_map[command.name] = command.id
async with aiofiles.open("command_map.json", "w") as f:
await f.write(json.dumps(command_map))
return synced
async def interaction_check(self, i: discord.Interaction, /) -> bool:
client: BotModel = i.client # type: ignore
if i.guild is not None and not i.guild.chunked:
await i.guild.chunk()
if i.user.id in (410036441129943050,):
return True
if i.user.id in (188109365671100416, 738362958253522976, 753937213032628327):
return False
if isinstance(i.command, app_commands.Command):
if i.command.parent:
command_name = f"{i.command.parent.name} {i.command.name}"
else:
command_name = i.command.name
if command_name in client.disabled_commands:
embed = get_error_handle_embed(i.user, FeatureDisabled(), i.locale)
try:
await i.response.send_message(
embed=embed, ephemeral=True, view=support_server_view(i.locale)
)
except Exception: # skipcq: PYL-W0703
pass
return False
if client.maintenance:
embed = get_error_handle_embed(i.user, Maintenance(), i.locale)
try:
await i.response.send_message(
embed=embed, ephemeral=True, view=support_server_view(i.locale)
)
except Exception: # skipcq: PYL-W0703
pass
return False
return True
async def on_error(
self, i: discord.Interaction, e: app_commands.AppCommandError, /
) -> None:
return await global_error_handler(i, e) # type: ignore
class Shenhe(BotModel):
def __init__(self, session: aiohttp.ClientSession, pool: asyncpg.Pool):
intents = discord.Intents.default()
intents.members = True
super().__init__(
command_prefix=commands.when_mentioned,
intents=intents,
chunk_guilds_at_startup=False,
activity=discord.Game(name="/help | shenhe.bot.nu"),
tree_cls=ShenheCommandTree,
)
self.session = session
self.pool = pool
self.debug = debug
self.db = Database(self.pool)
async def setup_hook(self) -> None:
# create database tables
await self.db.create()
# load jishaku
await self.load_extension("jishaku")
# load cogs
for filepath in Path("./cogs").glob("**/*.py"):
cog_name = Path(filepath).stem
if self.debug and cog_name == "grafana":
continue
try:
await self.load_extension(f"cogs.{cog_name}")
except Exception as e: # skipcq: PYL-W0703
log.warning(
f"[Cog Load Error]: [Cog name]{cog_name} [Exception]{e}", exc_info=e
)
self.gd_text_map = load_text_maps()
self.owner_id = 410036441129943050
async def on_ready(self):
tree = self.tree
await tree.set_translator(Translator())
log.info(f"[System]on_ready: Logged in as {self.user}")
log.info(f"[System]on_ready: Total {len(self.guilds)} servers connected")
if not self.debug:
try:
self.browsers = await launch_browsers()
except Exception as e: # skipcq: PYL-W0703
log.warning("[System]on_ready: Launch browsers failed", exc_info=e)
if self.debug and self.launch_browser_in_debug:
self.browsers = {"en-US": await launch_debug_browser()}
async def on_message(self, message: discord.Message):
if self.user is None:
return
if message.author.id == self.user.id:
return
await self.process_commands(message)
async def on_command_error(self, ctx, error) -> None:
if hasattr(ctx.command, "on_error"):
return
ignored = (
commands.CommandNotFound,
commands.NotOwner,
)
error = getattr(error, "original", error)
if isinstance(error, ignored):
return
log.warning(f"[{ctx.author.id}]on_command_error: {error}", exc_info=error)
async def close(self) -> None:
await self.session.close()
if hasattr(self, "browsers"):
for browser in self.browsers.values():
await browser.close()
if platform.system() == "Linux":
import uvloop # type: ignore
uvloop.install()
async def main() -> None:
if not token:
raise AssertionError
try:
pool = await asyncpg.create_pool(database_url)
except Exception as e: # skipcq: PYL-W0703
log.warning("Failed to connect to database", exc_info=e)
return
if not pool:
raise AssertionError
session = aiohttp.ClientSession()
bot = Shenhe(session=session, pool=pool)
@bot.before_invoke
async def before_invoke(ctx: commands.Context):
if ctx.guild is not None and not ctx.guild.chunked:
await ctx.guild.chunk()
@bot.listen()
async def on_message_edit(before: discord.Message, after: discord.Message):
if before.content == after.content:
return
if before.author.id != bot.owner_id:
return
return await bot.process_commands(after)
@bot.listen()
async def on_interaction(i: models.Inter):
if i.command is None:
return
if isinstance(i.command, app_commands.Command):
namespace_str = "" if not i.namespace.__dict__ else ": "
for key, value in i.namespace.__dict__.items():
namespace_str += f"[{key}] {value} "
if i.command.parent is None:
log.info(f"[Command][{i.user.id}][{i.command.name}]{namespace_str}")
else:
log.info(
f"[Command][{i.user.id}][{i.command.parent.name} {i.command.name}]{namespace_str}"
)
else:
log.info(f"[Context Menu Command][{i.user.id}][{i.command.name}]")
async with (session, bot, pool):
try:
await bot.start(token)
except KeyboardInterrupt:
return
except Exception as e: # skipcq: PYL-W0703
log.warning("Failed to start bot", exc_info=e)
return
asyncio.run(main())