cogs, shardable, setup, update, say_ralf commands

Signed-off-by: Marc Ahlgrim <marc@onemarcfifty.com>
This commit is contained in:
Marc Ahlgrim
2022-07-26 11:43:33 +02:00
parent 2e25186e96
commit 455da6a9f3
17 changed files with 503 additions and 363 deletions
+269 -143
View File
@@ -1,19 +1,19 @@
import discord
import random
import numpy as np
import utils
import datetime
import config
import os
import ast
from glob import glob
from dateutil import tz
from sys import exit
from discord import app_commands
from discord.ext import tasks
from discord.ext.commands import Bot,AutoShardedBot
from classes.dis_events import DiscordEvents
from classes.subscribe import Subscribe
from classes.config import Config
from dataclasses import dataclass
# #######################################
@@ -21,80 +21,222 @@ from classes.subscribe import Subscribe
# #######################################
class OMFClient(discord.Client):
class OMFBot(AutoShardedBot):
channel_idle_timer: int
lastNotifyTimeStamp = None
theGuild : discord.Guild = None
# each guild has the following elements:
# - a list of scheduled Events (EventsList)
# - a list of Messages that will be sent at idle times
# - a timer indicating the number of scheduler runs to
# wait before a new idle message is sent
lastSentMessage:discord.Message=None
@dataclass
class GuildData:
EventsList = {}
idle_messages=[]
channel_idle_timer=0
guildEventsList = None
guildEventsClass: DiscordEvents = None
# the guildDataList contains one GuildData class per item.
# the key is the guild ID
guildDataList={}
# configData is the generic config object that reads / writes
# the config data to disk
configData:Config
# EventsClass is the interface to the restful api
# that allows us to create scheduled events
EventsClass : DiscordEvents
# #######################################
# init constructor
# #######################################
def __init__(self) -> None:
print('Init')
# Try to set all intents
intents = discord.Intents.all()
super().__init__(intents=intents)
# We need a `discord.app_commands.CommandTree` instance
# to register application commands (slash commands in this case)
self.tree = app_commands.CommandTree(self)
super().__init__(command_prefix="!",intents=intents)
self.prefix="!"
self.configData=Config('config.json')
# The subscribe command will add/remove the notification roles
# based on the scheduled events
@self.tree.command(name="subscribe", description="(un)subscribe to Events)")
async def subscribe(interaction: discord.Interaction):
# preload the menu items with the roles that the user has already
# we might move this to the init of the modal
x: Subscribe
role: discord.Role
member: discord.Member
x=Subscribe()
guildNode=self.configData.readGuild(interaction.guild.id)
member = interaction.user
for option in x.Menu.options:
role = option.value
if not (member.get_role(role) is None):
option.default=True
x=Subscribe(autoEvents=guildNode["AUTO_EVENTS"],member=member)
await interaction.response.send_modal(x)
# The setup command will ask for the guild parameters and
# read them in
@self.tree.command(name="setup", description="Define parameters for the bot")
async def setup(
interaction: discord.Interaction,
template_channel:discord.TextChannel,
idle_channel:discord.TextChannel,
idle_sleepcycles:int,
avoid_spam:int):
self.channel_idle_timer = 0
self.idle_channel = self.get_channel(config.CONFIG["IDLE_MESSAGE_CHANNEL_ID"])
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(f'only an Administrator can do that', ephemeral=True)
else:
try:
jData={
"IDLE_MESSAGE_CHANNEL_ID" : idle_channel.id,
"CONFIG_CHANNEL_ID": template_channel.id,
"CHANNEL_IDLE_INTERVAL" : idle_sleepcycles,
"AVOID_SPAM" : avoid_spam,
"AUTO_EVENTS": []
}
self.configData.writeGuild(interaction.guild.id,jData)
await interaction.response.send_message(f'All updated\nThank you for using my services!\nyou might need to run /update', ephemeral=True)
except Exception as e:
print(f"ERROR in setup command: {e}")
await interaction.response.send_message(f'Ooops, there was a glitch!', ephemeral=True)
# The update command will read the guild configs from the
# message templates channel
@self.tree.command(name="update", description="read in the message templates and update the cache")
async def update(interaction: discord.Interaction):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(f'only an Administrator can do that', ephemeral=True)
else:
try:
await self.readMessageTemplates(interaction.guild)
#self.configData.writeGuild(interaction.guild.id,jData)
numMessages=len(self.guildDataList[f'{interaction.guild.id}'].idle_messages)
guildNode=self.configData.readGuild(interaction.guild.id)
eventNodes=guildNode["AUTO_EVENTS"]
numEvents=len(eventNodes)
await interaction.response.send_message(f'{numEvents} Events and {numMessages} Message templates\nThank you for using my services!', ephemeral=True)
except Exception as e:
print(f"ERROR in update command: {e}")
await interaction.response.send_message(f'Ooops, there was a glitch!', ephemeral=True)
@self.tree.command(name="say_ralf", description="admin function")
async def say_ralf(
interaction: discord.Interaction,
which_channel:discord.TextChannel,
message:str):
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(f'only an Administrator can do that', ephemeral=True)
else:
try:
await which_channel.send(message)
await interaction.response.send_message('message sent', ephemeral=True)
except Exception as e:
print(f"ERROR in say_ralf: {e}")
await interaction.response.send_message(f'Ooops, there was a glitch!', ephemeral=True)
# ################################
# the bot run command just starts
# the bot with the token from
# the json config file
# ################################
def run(self,*args, **kwargs):
super().run(token=self.configData.getToken())
# #########################
# setup_hook waits for the
# command tree to sync
# and loads the cogs
# #########################
async def setup_hook(self) -> None:
# Sync the application command with Discord.
await self.tree.sync()
# load all cogs
try:
for file in os.listdir("cogs"):
if file.endswith(".py"):
name = file[:-3]
await self.load_extension(f"cogs.{name}")
except Exception as e:
print(f"ERROR in setup_hook: {e}")
# ######################################################
# send_random_message is called when the server is idle
# and posts a random message to the server
# ######################################################
async def send_random_message(self):
async def send_random_message(self,guildID):
guildNode=self.configData.readGuild(guildID)
print("Sending random message")
if self.idle_channel == None:
self.idle_channel = self.get_channel(config.CONFIG["IDLE_MESSAGE_CHANNEL_ID"])
print (f'The idle channel is {config.CONFIG["IDLE_MESSAGE_CHANNEL_ID"]} - {self.idle_channel}')
await self.idle_channel.send(f"{random.choice(self.idle_messages)}")
idle_channel_id=guildNode["IDLE_MESSAGE_CHANNEL_ID"]
idle_channel=self.get_channel(idle_channel_id)
gdn:self.GuildData
gdn=self.guildDataList[f'{guildID}']
idle_messages=gdn.idle_messages
# if the author of the previously last sent message and
# the new message is ourselves, then delete the
# previous message
try:
lastSentMessage = await idle_channel.fetch_message(
idle_channel.last_message_id)
if (int(f'{guildNode["AVOID_SPAM"]}') == 1) and (lastSentMessage is not None):
if (lastSentMessage.author == self.user):
await lastSentMessage.delete()
except Exception as e:
print(f"delete lastmessage error: {e}")
try:
await idle_channel.send(f'{random.choice(idle_messages)}')
except Exception as e:
print(f"send random message error: {e}")
# ######################################################
# readMessageTemplates reads all messages from
# the guildID/"config"/"CONFIG_CHANNEL_ID"] node
# of the configdata and stores it in the idleMessages dict
# in an array under the guild ID key
# ######################################################
async def readMessageTemplates(self,theGuild:discord.Guild):
# we init the guild data with a new GuildData object
self.guildDataList[f'{theGuild.id}'] = self.GuildData()
guildNode=self.configData.readGuild(theGuild.id)
if guildNode is None:
print (f"Guild {theGuild.id} has no setup")
return
theTemplateChannel:discord.TextChannel
theTemplateChannel=theGuild.get_channel(int(guildNode["CONFIG_CHANNEL_ID"]))
message:discord.Message
messages = theTemplateChannel.history(limit=50)
self.guildDataList[f'{theGuild.id}'].idle_messages=[]
eventNodes=[]
async for message in messages:
messageContent:str
messageContent=message.content
try:
someDict=ast.literal_eval(messageContent)
if isinstance(someDict, dict):
eventNodes.append(someDict)
except Exception as e:
self.guildDataList[f'{theGuild.id}'].idle_messages.append(message.content)
guildNode["AUTO_EVENTS"]=eventNodes
self.configData.writeGuild(theGuild.id,guildNode)
numMessages=len(self.guildDataList[f'{theGuild.id}'].idle_messages)
numEvents=len(eventNodes)
print(f'{numEvents} Events and {numMessages} Message templates')
# ######################################################
# on_ready is called once the client is initialized
@@ -105,37 +247,18 @@ class OMFClient(discord.Client):
async def on_ready(self):
print('Logged on as', self.user)
self.EventsClass = DiscordEvents(
discord_token=self.configData.getToken(),
client_id=self.configData.getClientID(),
bot_permissions=8,
api_version=10)
# read in the random message files
# the idle_messages array holds one element per message
# every file is read in as a whole into one element of the array
# every message is read in as a whole into one element of the array
self.idle_messages = []
for filename in glob(config.CONFIG["IDLE_MESSAGE_DIR"] + '/*.txt'):
print ("read {}",filename)
with open(filename) as f:
self.idle_messages.append(f.read())
self.idle_messages = np.array(self.idle_messages)
# store the guild for further use
guild: discord.Guild
for guild in self.guilds:
if (int(guild.id) == int(config.SECRETS["GUILD_ID"])):
print (f"GUILD MATCHES {guild.id}")
self.theGuild = guild
if (self.theGuild is None):
print("the guild (Server ID)could not be found - please check all config data")
exit()
self.guildEventsClass = DiscordEvents(
discord_token=config.SECRETS["BOT_TOKEN"],
client_id=config.SECRETS["CLIENT_ID"],
bot_permissions=8,
api_version=10,
guild_id=config.SECRETS["GUILD_ID"]
)
for theGuild in self.guilds:
await self.readMessageTemplates(theGuild)
# start the schedulers
@@ -149,11 +272,14 @@ class OMFClient(discord.Client):
# for the next sunday
# ######################################################
async def create_events (self):
async def create_events (self,theGuild):
print("Create Events")
for theEvent in config.AUTO_EVENTS:
guildNode=self.configData.readGuild(theGuild.id)
eventNodes=guildNode["AUTO_EVENTS"]
for theEvent in eventNodes:
# calculate the date of the future event
theDate:datetime.datetime = utils.onDay(datetime.date.today(),theEvent['day_of_week'])
@@ -171,16 +297,18 @@ class OMFClient(discord.Client):
# after 2 AM
strStart=theDate.strftime(f"%Y-%m-%dT{utcStartTime}")
strEnd=theDate.strftime(f"%Y-%m-%dT{utcEndTime}")
await self.guildEventsClass.create_guild_event(
await self.EventsClass.create_guild_event(
event_name=theEvent['title'],
event_description=theEvent['description'],
event_start_time=f"{strStart}",
event_end_time=f"{strEnd}",
event_metadata={},
event_privacy_level=2,
channel_id=theEvent['channel'])
channel_id=theEvent['channel'],
guild_id=theGuild.id)
# once we have created the event, we let everyone know
channel = self.get_channel(config.CONFIG["IDLE_MESSAGE_CHANNEL_ID"])
channel = theGuild.get_channel(guildNode["IDLE_MESSAGE_CHANNEL_ID"])
await channel.send(f'Hi - I have created the scheduled Event {theEvent["title"]}')
@@ -188,9 +316,10 @@ class OMFClient(discord.Client):
# get_event_list gives a list of scheduled events
# ######################################################
async def get_events_list (self):
eventList = await self.guildEventsClass.list_guild_events()
self.guildEventsList = eventList
async def get_events_list (self,theGuild: discord.Guild):
eventList = await self.EventsClass.list_guild_events(theGuild.id)
self.GuildData(self.guildDataList[f'{theGuild.id}']).EventsList = eventList
return eventList
# ######################################################
@@ -207,30 +336,16 @@ class OMFClient(discord.Client):
if message.flags.ephemeral:
return
print("{} has just sent {}".format(message.author, message.content))
# if the author of the previously last sent message and
# the new message is ourselves, then delete the
# previous message
if (int(f'{config.CONFIG["AVOID_SPAM"]}') == 1) and (self.lastSentMessage is not None):
if ((message.author == self.user) and
(self.lastSentMessage.author == self. user) and
(int(f"{self.lastSentMessage.channel.id}") == (int(config.CONFIG["IDLE_MESSAGE_CHANNEL_ID"])))):
try:
await self.lastSentMessage.delete()
except Exception as e:
print(f"delete lastmessage error: {e}")
self.lastSentMessage = message
# don't respond to ourselves
if message.author == self.user:
return
# reset the idle timer if a message has been sent or received
self.channel_idle_timer = 0
self.guildDataList[f'{message.guild.id}'].channel_idle_timer=0
await self.process_commands(message)
# ######################################################
@@ -263,10 +378,11 @@ class OMFClient(discord.Client):
if datetime.date.today().weekday() == 0:
print("create events")
try:
await self.create_events()
except Exception as e:
print(f"Daily Task create Events failed: {e}")
for theGuild in self.guilds:
try:
await self.create_events(theGuild=theGuild)
except Exception as e:
print(f"Daily Task create Events failed: {e}")
# ######################################################
@@ -281,50 +397,60 @@ class OMFClient(discord.Client):
@tasks.loop(minutes=10)
async def task_scheduler(self):
self.channel_idle_timer += 1
print("SCHEDULE")
# #####################################
# see if we need to send a random message
# if the counter is greater than CHANNEL_IDLE_INTERVAL
# then send a random message into the idle_channel
# #####################################
try:
if self.channel_idle_timer >= config.CONFIG["CHANNEL_IDLE_INTERVAL"]:
self.channel_idle_timer = 0
await self.send_random_message()
except Exception as e:
print(f"Scheduler random_message failed: {e}")
for theGuild in self.guilds:
guildNode=self.configData.readGuild(theGuild.id)
if guildNode is None:
print (f"Guild {theGuild.id} has no setup")
continue
gdn:self.GuildData
gdn=self.guildDataList[f'{theGuild.id}']
gdn.channel_idle_timer += 1
# see if we need to send out notifications for events
# The Event details are stored in config.
eventList=None
for theEvent in config.AUTO_EVENTS:
# first let's convert the String dates to datetime:
theDate=utils.onDay(datetime.date.today(),theEvent['day_of_week'])
startTime=theEvent['start_time']
eventScheduledTimeDate=datetime.datetime.fromisoformat (theDate.strftime(f"%Y-%m-%dT{startTime}"))
# now let's figure out the time deltas:
timeUntilEvent=eventScheduledTimeDate - datetime.datetime.today()
earliestPing=eventScheduledTimeDate - datetime.timedelta(minutes=theEvent["notify_minutes"]) - datetime.timedelta(minutes=10)
latestPing=eventScheduledTimeDate - datetime.timedelta(minutes=theEvent["notify_minutes"])
#print("found scheduled event")
#print(f"The event is on {theDate} at {startTime} - that is in {timeUntilEvent}")
#print (f"we ping between {earliestPing} and {latestPing}")
# If we are in the interval then let's initiate the reminder
if (earliestPing < datetime.datetime.today() <= latestPing):
# let's first check if the event is still on
# it may have been deleted or modified on the server
# we don't want to alert for non-existing events
# we'll just use the title to compare for the time being.
print("Let me check if the event is still on")
try:
if eventList == None:
eventList = await self.get_events_list()
for theScheduledEvent in eventList:
if theScheduledEvent["name"] == theEvent["title"]:
channel = self.get_channel(config.CONFIG["IDLE_MESSAGE_CHANNEL_ID"])
theMessageText=f'Hi <@&{theEvent["subscription_role_num"]}>, the event *** {theEvent["title"]} *** will start in roughly {theEvent["notify_minutes"]} minutes in the <#{theEvent["channel"]}> channel. {theEvent["description"]}'
await channel.send(f"{theMessageText}")
except Exception as e:
print(f"Scheduler event_reminder failed: {e}")
# #####################################
# see if we need to send a random message
# if the counter is greater than CHANNEL_IDLE_INTERVAL
# then send a random message into the idle_channel
# #####################################
try:
if gdn.channel_idle_timer >= guildNode["CHANNEL_IDLE_INTERVAL"]:
gdn.channel_idle_timer = 0
await self.send_random_message(guildID=theGuild.id)
except Exception as e:
print(f"Scheduler random_message failed: {e}")
# see if we need to send out notifications for events
# The Event details are stored in config.
eventList=None
for theEvent in guildNode["AUTO_EVENTS"]:
# first let's convert the String dates to datetime:
theDate=utils.onDay(datetime.date.today(),theEvent['day_of_week'])
startTime=theEvent['start_time']
eventScheduledTimeDate=datetime.datetime.fromisoformat (theDate.strftime(f"%Y-%m-%dT{startTime}"))
# now let's figure out the time deltas:
timeUntilEvent=eventScheduledTimeDate - datetime.datetime.today()
earliestPing=eventScheduledTimeDate - datetime.timedelta(minutes=theEvent["notify_minutes"]) - datetime.timedelta(minutes=10)
latestPing=eventScheduledTimeDate - datetime.timedelta(minutes=theEvent["notify_minutes"])
#print("found scheduled event")
#print(f"The event is on {theDate} at {startTime} - that is in {timeUntilEvent}")
#print (f"we ping between {earliestPing} and {latestPing}")
# If we are in the interval then let's initiate the reminder
if (earliestPing < datetime.datetime.today() <= latestPing):
# let's first check if the event is still on
# it may have been deleted or modified on the server
# we don't want to alert for non-existing events
# we'll just use the title to compare for the time being.
print("Let me check if the event is still on")
try:
if eventList == None:
eventList = await self.get_events_list(theGuild=theGuild)
for theScheduledEvent in eventList:
if theScheduledEvent["name"] == theEvent["title"]:
channel = self.get_channel(guildNode["IDLE_MESSAGE_CHANNEL_ID"])
theMessageText=f'Hi <@&{theEvent["subscription_role_num"]}>, the event *** {theEvent["title"]} *** will start in roughly {theEvent["notify_minutes"]} minutes in the <#{theEvent["channel"]}> channel. {theEvent["description"]}'
await channel.send(f"{theMessageText}")
except Exception as e:
print(f"Scheduler event_reminder failed: {e}")
+87
View File
@@ -0,0 +1,87 @@
import json
# very rudimentary implementation of a
# generic config class using a json File
# this is OK for few clients.
# for many (>100) clients we should consider file locking
# for very many (>10000) clients we should use a database
class Config():
configFileName:str
cfg:json
def __init__(self,filename:str) -> None:
self.configFileName = filename
self.readConfig()
def readConfig(self) -> json:
try:
f = open(self.configFileName)
self.cfg = json.load(f)
f.close()
except Exception as e:
print(f"Error reading Config Data: {e}")
def writeConfig(self):
try:
with open(self.configFileName, 'w', encoding='utf-8') as f:
json.dump(self.cfg, f, ensure_ascii=False, indent=5)
f.close()
except Exception as e:
print(f"Error writing Config Data: {e}")
def getNode(self,nodeID:str):
if nodeID in self.cfg:
return self.cfg[nodeID]
else:
return None
def getToken(self):
secretNode=self.getNode("secret")
return secretNode.get('BOT_TOKEN')
def getClientID(self):
secretNode=self.getNode("secret")
return secretNode.get('CLIENT_ID')
def readGuild(self,guildID) -> json:
guildNode=self.getNode("guilds")
return guildNode.get(f"{guildID}")
def writeGuild(self,guildID,nodeData):
guildNode=self.getNode("guilds")
guildNode[f"{guildID}"]=nodeData
self.writeConfig()
# the config.json contains the following main nodes:
# #######################
# "secret"
# #######################
# the secret key contains the following items:
# the BOT_TOKEN is the Oauth2 token for your bot
# example: "BOT_TOKEN" : "DFHEZRERZQRJTSUPERSECRETTTOKENUTZZH"
# #######################
# "guilds"
# #######################
# the guilds node contains all guild specific items, such as channel IDs
# that node will be created if the /setup command is used
# "CONFIG_CHANNEL_ID" is the ID of the channel where the
# idle message templates are located
# "IDLE_MESSAGE_CHANNEL_ID" is the ID of the channel where the
# bot posts a message about the new "ticket"
# QUESTION_SLEEPING_TIME (number)
# # Variable that indicates when the bot answers after a question has been asked
# (in scheduler cycles)
+12 -14
View File
@@ -10,24 +10,21 @@ from classes.restfulapi import DiscordAPI
class DiscordEvents(DiscordAPI):
def __init__(self,
discord_token: str,
client_id: str,
bot_permissions: int,
api_version: int,
guild_id:str) -> None:
super().__init__(discord_token,client_id,bot_permissions,api_version)
self.guild_id = guild_id
print (f" DiscordEvent Client {client_id} guild {guild_id}")
# def __init__(self,
# discord_token: str,
# client_id: str,
# bot_permissions: int,
# api_version: int) -> None:
#
# super().__init__(discord_token,client_id,bot_permissions,api_version)
async def list_guild_events(self) -> list:
async def list_guild_events(self,guild_id) -> list:
# Returns a list of upcoming events for the supplied guild ID
# Format of return is a list of one dictionary per event containing information.
event_retrieve_url = f'{self.base_api_url}/guilds/{self.guild_id}/scheduled-events'
event_retrieve_url = f'{self.base_api_url}/guilds/{guild_id}/scheduled-events'
response_list = await self.get_api(event_retrieve_url)
return response_list
@@ -39,7 +36,8 @@ class DiscordEvents(DiscordAPI):
event_end_time: str,
event_metadata: dict,
event_privacy_level=2,
channel_id=None
channel_id=None,
guild_id=0
) -> None:
# Creates a guild event using the supplied arguments
@@ -50,7 +48,7 @@ class DiscordEvents(DiscordAPI):
# Event times can use UTC Offsets! - if you omit, then it will be
# undefined (i.e. UTC / GMT+0)
event_create_url = f'{self.base_api_url}/guilds/{self.guild_id}/scheduled-events'
event_create_url = f'{self.base_api_url}/guilds/{guild_id}/scheduled-events'
event_data = json.dumps({
'name': event_name,
'privacy_level': event_privacy_level,
+26 -18
View File
@@ -1,6 +1,8 @@
import discord
import traceback
import config
from numpy import array
from classes.config import Config
# ############################################
# the Subscribe() class is a modal ui dialog
@@ -12,23 +14,31 @@ import config
class Subscribe(discord.ui.Modal, title='(un)subscribe to Event-Notification'):
# We need to make sure that the config is read at object definition time
# because AUTO_EVENTS might be empty otherwise
if config.AUTO_EVENTS == []:
config.readConfig()
# define the menu options (label=text, vaue=role_id)
def __init__(self, autoEvents : array,member:discord.Member) -> None:
super().__init__()
menu_options = []
for menu_option in config.AUTO_EVENTS:
menu_options.append(discord.SelectOption(label= menu_option["notify_hint"],
value=menu_option["subscription_role_num"]))
# define the UI dropdown menu Element
# create the menu items from the Event roles:
menu_options = []
for menu_option in autoEvents:
menu_options.append(discord.SelectOption(
label= menu_option["notify_hint"],
value=menu_option["subscription_role_num"]))
Menu = discord.ui.Select(
options = menu_options,
max_values=len(config.AUTO_EVENTS),
min_values=0)
# define the UI dropdown menu Element
self.Menu = discord.ui.Select(
options = menu_options,
max_values=len(autoEvents),
min_values=0)
# Make sure the existing user roles are preselected
for option in self.Menu.options:
role = option.value
if not (member.get_role(role) is None):
option.default=True
# now add the child element to the form for drawing
self.add_item(self.Menu)
# #####################################
@@ -44,9 +54,8 @@ class Subscribe(discord.ui.Modal, title='(un)subscribe to Event-Notification'):
roles = interaction.user.guild.roles
member: discord.Member
member = interaction.user
# first we remove all roles
for option in self.Menu.options:
role=member.get_role(option.value)
if not (role is None):
@@ -55,7 +64,6 @@ class Subscribe(discord.ui.Modal, title='(un)subscribe to Event-Notification'):
# then we assign all selected roles
# unfortunately we need to loop through all rolles
# maybe there is a better solution ?
for option in self.Menu._selected_values:
for role in roles:
if int(option) == int(role.id):