Huge re-org to match normal python project structure

This commit is contained in:
Marc Di Luzio 2024-08-14 22:42:53 +01:00
parent b263e20ca2
commit 92bc50396b
22 changed files with 35 additions and 47 deletions

0
matchy/__init__.py Normal file
View file

0
matchy/cogs/__init__.py Normal file
View file

206
matchy/cogs/matchy.py Normal file
View file

@ -0,0 +1,206 @@
"""
Matchy bot cog
"""
import logging
import discord
from discord import app_commands
from discord.ext import commands, tasks
from datetime import datetime, timedelta, time
import matchy.views.match as match
import matching
from matchy.files.state import State, AuthScope
import util
logger = logging.getLogger("cog")
logger.setLevel(logging.INFO)
class Cog(commands.Cog):
def __init__(self, bot: commands.Bot, state: State):
self.bot = bot
self.state = state
@commands.Cog.listener()
async def on_ready(self):
"""Bot is ready and connected"""
self.run_hourly_tasks.start()
self.bot.add_dynamic_items(match.DynamicGroupButton)
activity = discord.Game("/join")
await self.bot.change_presence(status=discord.Status.online, activity=activity)
logger.info("Bot is up and ready!")
@app_commands.command(description="Join the matchees for this channel")
@commands.guild_only()
async def join(self, interaction: discord.Interaction):
logger.info("Handling /join in %s %s from %s",
interaction.guild.name, interaction.channel, interaction.user.name)
self.state.set_user_active_in_channel(
interaction.user.id, interaction.channel.id)
await interaction.response.send_message(
f"Roger roger {interaction.user.mention}!\n"
+ f"Added you to {interaction.channel.mention}!",
ephemeral=True, silent=True)
@app_commands.command(description="Leave the matchees for this channel")
@commands.guild_only()
async def leave(self, interaction: discord.Interaction):
logger.info("Handling /leave in %s %s from %s",
interaction.guild.name, interaction.channel, interaction.user.name)
self.state.set_user_active_in_channel(
interaction.user.id, interaction.channel.id, False)
await interaction.response.send_message(
f"No worries {interaction.user.mention}. Come back soon :)", ephemeral=True, silent=True)
@app_commands.command(description="Pause your matching in this channel for a number of days")
@commands.guild_only()
@app_commands.describe(days="Days to pause for (defaults to 7)")
async def pause(self, interaction: discord.Interaction, days: int | None = None):
logger.info("Handling /pause in %s %s from %s with days=%s",
interaction.guild.name, interaction.channel, interaction.user.name, days)
if days is None: # Default to a week
days = 7
until = datetime.now() + timedelta(days=days)
self.state.set_user_paused_in_channel(
interaction.user.id, interaction.channel.id, until)
await interaction.response.send_message(
f"Sure thing {interaction.user.mention}!\n"
+ f"Paused you until {util.format_day(until)}!",
ephemeral=True, silent=True)
@app_commands.command(description="List the matchees for this channel")
@commands.guild_only()
async def list(self, interaction: discord.Interaction):
logger.info("Handling /list command in %s %s from %s",
interaction.guild.name, interaction.channel, interaction.user.name)
matchees = matching.get_matchees_in_channel(
self.state, interaction.channel)
mentions = [m.mention for m in matchees]
msg = "Current matchees in this channel:\n" + \
f"{util.format_list(mentions)}"
tasks = self.state.get_channel_match_tasks(interaction.channel.id)
for (day, hour, min) in tasks:
next_run = util.get_next_datetime(day, hour)
date_str = util.format_day(next_run)
msg += f"\nNext scheduled for {date_str} at {hour:02d}:00"
msg += f" with {min} members per group"
await interaction.response.send_message(msg, ephemeral=True, silent=True)
@app_commands.command(description="Schedule a match in this channel (UTC)")
@commands.guild_only()
@app_commands.describe(members_min="Minimum matchees per match (defaults to 3)",
weekday="Day of the week to run this (defaults 0, Monday)",
hour="Hour in the day (defaults to 9 utc)",
cancel="Cancel the scheduled match at this time")
async def schedule(self,
interaction: discord.Interaction,
members_min: int | None = None,
weekday: int | None = None,
hour: int | None = None,
cancel: bool = False):
"""Schedule a match using the input parameters"""
# Set all the defaults
if not members_min:
members_min = 3
if weekday is None:
weekday = 0
if hour is None:
hour = 9
channel_id = str(interaction.channel.id)
# Bail if not a matcher
if not self.state.get_user_has_scope(interaction.user.id, AuthScope.MATCHER):
await interaction.response.send_message("You'll need the 'matcher' scope to schedule a match",
ephemeral=True, silent=True)
return
# Add the scheduled task and save
success = self.state.set_channel_match_task(
channel_id, members_min, weekday, hour, not cancel)
# Let the user know what happened
if not cancel:
logger.info("Scheduled new match task in %s with min %s weekday %s hour %s",
channel_id, members_min, weekday, hour)
next_run = util.get_next_datetime(weekday, hour)
date_str = util.format_day(next_run)
await interaction.response.send_message(
f"Done :) Next run will be on {date_str} at {hour:02d}:00\n"
+ "Cancel this by re-sending the command with cancel=True",
ephemeral=True, silent=True)
elif success:
logger.info("Removed task in %s on weekday %s hour %s",
channel_id, weekday, hour)
await interaction.response.send_message(
f"Done :) Schedule on day {weekday} and hour {hour} removed!", ephemeral=True, silent=True)
else:
await interaction.response.send_message(
f"No schedule for this channel on day {weekday} and hour {hour} found :(", ephemeral=True, silent=True)
@app_commands.command(description="Match up matchees")
@commands.guild_only()
@app_commands.describe(members_min="Minimum matchees per match (defaults to 3)")
async def match(self, interaction: discord.Interaction, members_min: int | None = None):
"""Match groups of channel members"""
logger.info("Handling request '/match group_min=%s", members_min)
logger.info("User %s from %s in #%s", interaction.user,
interaction.guild.name, interaction.channel.name)
# Sort out the defaults, if not specified they'll come in as None
if not members_min:
members_min = 3
# Grab the groups
groups = matching.active_members_to_groups(
self.state, interaction.channel, members_min)
# Let the user know when there's nobody to match
if not groups:
await interaction.response.send_message("Nobody to match up :(", ephemeral=True, silent=True)
return
# Post about all the groups with a button to send to the channel
groups_list = '\n'.join(
", ".join([m.mention for m in g]) for g in groups)
msg = f"Roger! I've generated example groups for ya:\n\n{groups_list}"
view = discord.utils.MISSING
if self.state.get_user_has_scope(interaction.user.id, AuthScope.MATCHER):
# Otherwise set up the button
msg += "\n\nClick the button to match up groups and send them to the channel.\n"
view = discord.ui.View(timeout=None)
view.add_item(match.DynamicGroupButton(members_min))
else:
# Let a non-matcher know why they don't have the button
msg += f"\n\nYou'll need the {AuthScope.MATCHER}"
msg += " scope to post this to the channel, sorry!"
await interaction.response.send_message(msg, ephemeral=True, silent=True, view=view)
logger.info("Done.")
@tasks.loop(time=[time(hour=h) for h in range(24)])
async def run_hourly_tasks(self):
"""Run any hourly tasks we have"""
for (channel, min) in self.state.get_active_match_tasks():
logger.info("Scheduled match task triggered in %s", channel)
msg_channel = self.bot.get_channel(int(channel))
await matching.match_groups_in_channel(self.state, msg_channel, min)
for (channel, _) in self.state.get_active_match_tasks(datetime.now() + timedelta(days=1)):
logger.info("Reminding about scheduled task in %s", channel)
msg_channel = self.bot.get_channel(int(channel))
await msg_channel.send("Arf arf! just a reminder I'll be doin a matcherino in here in T-24hrs!"
+ "\nUse /join if you haven't already, or /pause if you want to skip a week :)")

55
matchy/cogs/owner.py Normal file
View file

@ -0,0 +1,55 @@
"""
Owner bot cog
"""
import logging
from discord.ext import commands
from matchy.files.state import State, AuthScope
logger = logging.getLogger("owner")
logger.setLevel(logging.INFO)
class Cog(commands.Cog):
def __init__(self, bot: commands.Bot, state: State):
self._bot = bot
self._state = state
@commands.command()
@commands.dm_only()
@commands.is_owner()
async def sync(self, ctx: commands.Context):
"""
Sync the bot commands
You get rate limited if you do this too often so it's better to keep it on command
"""
msg = await ctx.reply(content="Syncing commands...", ephemeral=True)
synced = await self._bot.tree.sync()
logger.info("Synced %s command(s)", len(synced))
await msg.edit(content="Done!")
@commands.command()
@commands.dm_only()
@commands.is_owner()
async def close(self, ctx: commands.Context):
"""
Handle close command
Shuts down the bot when needed
"""
await ctx.reply("Closing bot...", ephemeral=True)
logger.info("Closing down the bot")
await self._bot.close()
@commands.command()
@commands.dm_only()
@commands.is_owner()
async def grant(self, ctx: commands.Context, user: str):
"""
Handle grant command
Grant the matcher scope to a given user
"""
if user.isdigit():
self._state.set_user_scope(str(user), AuthScope.MATCHER)
logger.info("Granting user %s matcher scope", user)
await ctx.reply("Done!", ephemeral=True)
else:
await ctx.reply("Likely not a user...", ephemeral=True)

0
matchy/files/__init__.py Normal file
View file

152
matchy/files/config.py Normal file
View file

@ -0,0 +1,152 @@
"""Very simple config loading library"""
from schema import Schema, Use, Optional
import matchy.files.ops as ops
import os
import logging
import json
logger = logging.getLogger("config")
logger.setLevel(logging.INFO)
# Envar takes precedent
_ENVAR = "MATCHY_CONFIG"
_FILE = ".matchy/config.json"
# Warning: Changing any of the below needs proper thought to ensure backwards compatibility
_VERSION = 2
class _Key():
VERSION = "version"
MATCH = "match"
SCORE_FACTORS = "score_factors"
REPEAT_ROLE = "repeat_role"
REPEAT_MATCH = "repeat_match"
EXTRA_MEMBER = "extra_member"
UPPER_THRESHOLD = "upper_threshold"
# Removed
_OWNERS = "owners"
_TOKEN = "token"
_SCHEMA = Schema(
{
# The current version
_Key.VERSION: Use(int),
# Settings for the match algorithmn, see matching.py for explanations on usage
Optional(_Key.MATCH): {
Optional(_Key.SCORE_FACTORS): {
Optional(_Key.REPEAT_ROLE): Use(int),
Optional(_Key.REPEAT_MATCH): Use(int),
Optional(_Key.EXTRA_MEMBER): Use(int),
Optional(_Key.UPPER_THRESHOLD): Use(int),
}
}
}
)
_EMPTY_DICT = {
_Key.VERSION: _VERSION
}
def _migrate_to_v1(d: dict):
# Owners moved to History in v1
# Note: owners will be required to be re-added to the state.json
if _Key._OWNERS in d:
owners = d.pop(_Key._OWNERS)
logger.warning(
"Migration removed owners from config, these must be re-added to the state.json")
logger.warning("Owners: %s", owners)
def _migrate_to_v2(d: dict):
# Token moved to the environment
if _Key._TOKEN in d:
del d[_Key._TOKEN]
# Set of migration functions to apply
_MIGRATIONS = [
_migrate_to_v1,
_migrate_to_v2
]
class _ScoreFactors():
def __init__(self, data: dict):
"""Initialise and validate the config"""
self._dict = data
@property
def repeat_role(self) -> int:
return self._dict.get(_Key.REPEAT_ROLE, None)
@property
def repeat_match(self) -> int:
return self._dict.get(_Key.REPEAT_MATCH, None)
@property
def extra_member(self) -> int:
return self._dict.get(_Key.EXTRA_MEMBER, None)
@property
def upper_threshold(self) -> int:
return self._dict.get(_Key.UPPER_THRESHOLD, None)
class _Config():
def __init__(self, data: dict):
"""Initialise and validate the config"""
_SCHEMA.validate(data)
self._dict = data
@property
def token(self) -> str:
return self._dict["token"]
@property
def score_factors(self) -> _ScoreFactors:
return _ScoreFactors(self._dict.get(_Key.SCORE_FACTORS, {}))
def _migrate(dict: dict):
"""Migrate a dict through versions"""
version = dict.get("version", 0)
for i in range(version, _VERSION):
_MIGRATIONS[i](dict)
dict["version"] = _VERSION
def _load() -> _Config:
"""
Load the state from an envar or file
Apply any required migrations
"""
# Try the envar first
envar = os.environ.get(_ENVAR)
if envar:
loaded = json.loads(envar)
logger.info("Config loaded from $%s", _ENVAR)
else:
# Otherwise try the file
if os.path.isfile(_FILE):
loaded = ops.load(_FILE)
logger.info("Config loaded from %s", _FILE)
else:
loaded = _EMPTY_DICT
logger.warning("No %s file found, using defaults", _FILE)
_migrate(loaded)
return _Config(loaded)
# Core config for users to use
# Singleton as there should only be one, it's static, and global
Config = _load()

20
matchy/files/ops.py Normal file
View file

@ -0,0 +1,20 @@
"""File operation helpers"""
import json
import shutil
def load(file: str) -> dict:
"""Load a json file directly as a dict"""
with open(file) as f:
return json.load(f)
def save(file: str, content: dict):
"""
Save out a content dictionary to a file
Stores it in an intermediary file first incase the dump fails
"""
intermediate = file + ".nxt"
with open(intermediate, "w") as f:
json.dump(content, f, indent=4)
shutil.move(intermediate, file)

402
matchy/files/state.py Normal file
View file

@ -0,0 +1,402 @@
"""Store bot state"""
import os
from datetime import datetime
from schema import Schema, Use, Optional
from collections.abc import Generator
from typing import Protocol
import matchy.files.ops as ops
import copy
import logging
from contextlib import contextmanager
logger = logging.getLogger("state")
logger.setLevel(logging.INFO)
# Warning: Changing any of the below needs proper thought to ensure backwards compatibility
_VERSION = 4
def _migrate_to_v1(d: dict):
"""v1 simply renamed matchees to users"""
logger.info("Renaming %s to %s", _Key._MATCHEES, _Key.USERS)
d[_Key.USERS] = d[_Key._MATCHEES]
del d[_Key._MATCHEES]
def _migrate_to_v2(d: dict):
"""v2 swapped the date over to a less silly format"""
logger.info("Fixing up date format from %s to %s",
_TIME_FORMAT_OLD, _TIME_FORMAT)
def old_to_new_ts(ts: str) -> str:
return datetime.strftime(datetime.strptime(ts, _TIME_FORMAT_OLD), _TIME_FORMAT)
# Adjust all the history keys
d[_Key._HISTORY] = {
old_to_new_ts(ts): entry
for ts, entry in d[_Key._HISTORY].items()
}
# Adjust all the user parts
for user in d[_Key.USERS].values():
# Update the match dates
matches = user.get(_Key.MATCHES, {})
for id, ts in matches.items():
matches[id] = old_to_new_ts(ts)
# Update any reactivation dates
channels = user.get(_Key.CHANNELS, {})
for id, channel in channels.items():
old_ts = channel.get(_Key.REACTIVATE, None)
if old_ts:
channel[_Key.REACTIVATE] = old_to_new_ts(old_ts)
def _migrate_to_v3(d: dict):
"""v3 simply added the tasks entry"""
d[_Key.TASKS] = {}
def _migrate_to_v4(d: dict):
"""v4 removed verbose history tracking"""
del d[_Key._HISTORY]
# Set of migration functions to apply
_MIGRATIONS = [
_migrate_to_v1,
_migrate_to_v2,
_migrate_to_v3,
_migrate_to_v4,
]
class AuthScope(str):
"""Various auth scopes"""
MATCHER = "matcher"
class _Key(str):
"""Various keys used in the schema"""
VERSION = "version"
USERS = "users"
SCOPES = "scopes"
MATCHES = "matches"
ACTIVE = "active"
CHANNELS = "channels"
REACTIVATE = "reactivate"
TASKS = "tasks"
MATCH_TASKS = "match_tasks"
MEMBERS_MIN = "members_min"
WEEKDAY = "weekdays"
HOUR = "hours"
# Unused
_MATCHEES = "matchees"
_HISTORY = "history"
_GROUPS = "groups"
_MEMBERS = "members"
_TIME_FORMAT = "%Y-%m-%d %H:%M:%S.%f"
_TIME_FORMAT_OLD = "%a %b %d %H:%M:%S %Y"
_SCHEMA = Schema(
{
# The current version
_Key.VERSION: Use(int),
_Key.USERS: {
# User ID as string
Optional(str): {
Optional(_Key.SCOPES): Use(list[str]),
Optional(_Key.MATCHES): {
# Matchee ID and Datetime pair
Optional(str): Use(str)
},
Optional(_Key.CHANNELS): {
# The channel ID
Optional(str): {
# Whether the user is signed up in this channel
_Key.ACTIVE: Use(bool),
# A timestamp for when to re-activate the user
Optional(_Key.REACTIVATE): Use(str),
}
}
}
},
_Key.TASKS: {
# Channel ID as string
Optional(str): {
Optional(_Key.MATCH_TASKS): [
{
_Key.MEMBERS_MIN: Use(int),
_Key.WEEKDAY: Use(int),
_Key.HOUR: Use(int),
}
]
}
}
}
)
# Empty but schema-valid internal dict
_EMPTY_DICT = {
_Key.USERS: {},
_Key.TASKS: {},
_Key.VERSION: _VERSION
}
assert _SCHEMA.validate(_EMPTY_DICT)
class Member(Protocol):
@property
def id(self) -> int:
pass
def ts_to_datetime(ts: str) -> datetime:
"""Convert a string ts to datetime using the internal format"""
return datetime.strptime(ts, _TIME_FORMAT)
def datetime_to_ts(ts: datetime) -> str:
"""Convert a datetime to a string ts using the internal format"""
return datetime.strftime(ts, _TIME_FORMAT)
class State():
def __init__(self, data: dict, file: str | None = None):
"""Initialise and validate the state"""
self.validate(data)
self._dict = copy.deepcopy(data)
self._file = file
def validate(self, dict: dict = None):
"""Initialise and validate a state dict"""
if not dict:
dict = self._dict
_SCHEMA.validate(dict)
def get_history_timestamps(self, users: list[Member]) -> list[datetime]:
"""Grab all timestamps in the history"""
others = [m.id for m in users]
# Fetch all the interaction times in history
# But only for interactions in the given user group
times = set()
for data in (data for id, data in self._users.items() if int(id) in others):
matches = data.get(_Key.MATCHES, {})
for ts in (ts for id, ts in matches.items() if int(id) in others):
times.add(ts)
# Convert to datetimes and sort
datetimes = [ts_to_datetime(ts) for ts in times]
datetimes.sort()
return datetimes
def get_user_matches(self, id: int) -> list[int]:
return self._users.get(str(id), {}).get(_Key.MATCHES, {})
def log_groups(self, groups: list[list[Member]], ts: datetime = None) -> None:
"""Log the groups"""
ts = datetime_to_ts(ts or datetime.now())
with self._safe_wrap_write() as safe_state:
for group in groups:
# Update the matchee data with the matches
for m in group:
matchee = safe_state._users.setdefault(str(m.id), {})
matchee_matches = matchee.setdefault(_Key.MATCHES, {})
for o in (o for o in group if o.id != m.id):
matchee_matches[str(o.id)] = ts
def set_user_scope(self, id: str, scope: str, value: bool = True):
"""Add an auth scope to a user"""
with self._safe_wrap_write() as safe_state:
# Dive in
user = safe_state._users.setdefault(str(id), {})
scopes = user.setdefault(_Key.SCOPES, [])
# Set the value
if value and scope not in scopes:
scopes.append(scope)
elif not value and scope in scopes:
scopes.remove(scope)
def get_user_has_scope(self, id: str, scope: str) -> bool:
"""
Check if a user has an auth scope
"owner" users have all scopes
"""
user = self._users.get(str(id), {})
scopes = user.get(_Key.SCOPES, [])
return scope in scopes
def set_user_active_in_channel(self, id: str, channel_id: str, active: bool = True):
"""Set a user as active (or not) on a given channel"""
self._set_user_channel_prop(id, channel_id, _Key.ACTIVE, active)
def get_user_active_in_channel(self, id: str, channel_id: str) -> bool:
"""Get a users active channels"""
user = self._users.get(str(id), {})
channels = user.get(_Key.CHANNELS, {})
return str(channel_id) in [channel for (channel, props) in channels.items() if props.get(_Key.ACTIVE, False)]
def set_user_paused_in_channel(self, id: str, channel_id: str, until: datetime):
"""Sets a user as paused in a channel"""
# Deactivate the user in the channel first
self.set_user_active_in_channel(id, channel_id, False)
self._set_user_channel_prop(
id, channel_id, _Key.REACTIVATE, datetime_to_ts(until))
def reactivate_users(self, channel_id: str):
"""Reactivate any users who've passed their reactivation time on this channel"""
with self._safe_wrap_write() as safe_state:
for user in safe_state._users.values():
channels = user.get(_Key.CHANNELS, {})
channel = channels.get(str(channel_id), {})
if channel and not channel[_Key.ACTIVE]:
reactivate = channel.get(_Key.REACTIVATE, None)
# Check if we've gone past the reactivation time and re-activate
if reactivate and datetime.now() > ts_to_datetime(reactivate):
channel[_Key.ACTIVE] = True
def get_active_match_tasks(self, time: datetime | None = None) -> Generator[str, int]:
"""
Get any active match tasks at the given time
returns list of channel,members_min pairs
"""
if not time:
time = datetime.now()
weekday = time.weekday()
hour = time.hour
for channel, tasks in self._tasks.items():
for match in tasks.get(_Key.MATCH_TASKS, []):
if match[_Key.WEEKDAY] == weekday and match[_Key.HOUR] == hour:
yield (channel, match[_Key.MEMBERS_MIN])
def get_channel_match_tasks(self, channel_id: str) -> Generator[int, int, int]:
"""
Get all match tasks for the channel
"""
all_tasks = (
tasks.get(_Key.MATCH_TASKS, [])
for channel, tasks in self._tasks.items()
if str(channel) == str(channel_id)
)
for tasks in all_tasks:
for task in tasks:
yield (task[_Key.WEEKDAY], task[_Key.HOUR], task[_Key.MEMBERS_MIN])
def set_channel_match_task(self, channel_id: str, members_min: int, weekday: int, hour: int, set: bool) -> bool:
"""Set up a match task on a channel"""
with self._safe_wrap_write() as safe_state:
channel = safe_state._tasks.setdefault(str(channel_id), {})
matches = channel.setdefault(_Key.MATCH_TASKS, [])
found = False
for match in matches:
# Specifically check for the combination of weekday and hour
if match[_Key.WEEKDAY] == weekday and match[_Key.HOUR] == hour:
found = True
if set:
match[_Key.MEMBERS_MIN] = members_min
else:
matches.remove(match)
# Return true as we've successfully changed the data in place
return True
# If we didn't find it, add it to the schedule
if not found and set:
matches.append({
_Key.MEMBERS_MIN: members_min,
_Key.WEEKDAY: weekday,
_Key.HOUR: hour,
})
return True
# We did not manage to remove the schedule (or add it? though that should be impossible)
return False
@property
def dict_internal_copy(self) -> dict:
"""Only to be used to get the internal dict as a copy"""
return copy.deepcopy(self._dict)
@property
def _users(self) -> dict[str]:
return self._dict[_Key.USERS]
@property
def _tasks(self) -> dict[str]:
return self._dict[_Key.TASKS]
def _set_user_channel_prop(self, id: str, channel_id: str, key: str, value):
"""Set a user channel property helper"""
with self._safe_wrap_write() as safe_state:
# Dive in
user = safe_state._users.setdefault(str(id), {})
channels = user.setdefault(_Key.CHANNELS, {})
channel = channels.setdefault(str(channel_id), {})
# Set the value
channel[key] = value
# TODO: Make this a decorator?
@contextmanager
def _safe_wrap_write(self):
"""Safely run any function wrapped in a validate"""
# Wrap in a temporary state to validate first to prevent corruption
tmp_state = State(self._dict)
try:
yield tmp_state
finally:
# Validate and then overwrite our dict with the new one
tmp_state.validate()
self._dict = tmp_state._dict
# Write this change out if we have a file
if self._file:
self._save_to_file()
def _save_to_file(self):
"""Saves the state out to the chosen file"""
ops.save(self._file, self.dict_internal_copy)
def _migrate(dict: dict):
"""Migrate a dict through versions"""
version = dict.get("version", 0)
for i in range(version, _VERSION):
logger.info("Migrating from v%s to v%s", version, version+1)
_MIGRATIONS[i](dict)
dict[_Key.VERSION] = _VERSION
def load_from_file(file: str) -> State:
"""
Load the state from a files
Apply any required migrations
"""
loaded = _EMPTY_DICT
# If there's a file load it and try to migrate
if os.path.isfile(file):
loaded = ops.load(file)
_migrate(loaded)
st = State(loaded, file)
# Save out the migrated (or new) file
ops.save(file, st._dict)
return st

239
matchy/matching.py Normal file
View file

@ -0,0 +1,239 @@
"""Utility functions for matchy"""
import logging
import discord
from datetime import datetime
from typing import Protocol, runtime_checkable
from matchy.files.state import State, ts_to_datetime
import util
import matchy.files.config as config
class _ScoreFactors(int):
"""
Score factors used when trying to build up "best fit" groups
Matchees are sequentially placed into the lowest scoring available group
"""
# Added for each role the matchee has that another group member has
REPEAT_ROLE = config.Config.score_factors.repeat_role or 2**2
# Added for each member in the group that the matchee has already matched with
REPEAT_MATCH = config.Config.score_factors.repeat_match or 2**3
# Added for each additional member over the set "per group" value
EXTRA_MEMBER = config.Config.score_factors.extra_member or 2**5
# Upper threshold, if the user scores higher than this they will not be placed in that group
UPPER_THRESHOLD = config.Config.score_factors.upper_threshold or 2**6
logger = logging.getLogger("matching")
logger.setLevel(logging.INFO)
@runtime_checkable
class Role(Protocol):
@property
def id(self) -> int:
pass
@property
def name(self) -> str:
pass
@runtime_checkable
class Member(Protocol):
@property
def mention(self) -> str:
pass
@property
def id(self) -> int:
pass
@property
def display_name(self) -> str:
pass
@property
def roles(self) -> list[Role]:
pass
@runtime_checkable
class Guild(Protocol):
@property
def roles(self) -> list[Role]:
pass
def members_to_groups_simple(matchees: list[Member], per_group: int) -> tuple[bool, list[list[Member]]]:
"""Super simple group matching, literally no logic"""
num_groups = max(len(matchees)//per_group, 1)
return [matchees[i::num_groups] for i in range(num_groups)]
def get_member_group_eligibility_score(member: Member,
group: list[Member],
prior_matches: list[int],
per_group: int) -> float:
"""Rates a member against a group"""
# An empty group is a "perfect" score atomatically
rating = 0
if not group:
return rating
# Add score based on prior matchups of this user
num_prior = sum(m.id in prior_matches for m in group)
rating += num_prior * _ScoreFactors.REPEAT_MATCH
# Calculate the number of roles that match
all_role_ids = set(r.id for mr in [r.roles for r in group] for r in mr)
member_role_ids = [r.id for r in member.roles]
repeat_roles = sum(id in member_role_ids for id in all_role_ids)
rating += repeat_roles * _ScoreFactors.REPEAT_ROLE
# Add score based on the number of extra members
# Calculate the member offset (+1 for this user)
extra_members = (len(group) - per_group) + 1
if extra_members >= 0:
rating += extra_members * _ScoreFactors.EXTRA_MEMBER
return rating
def attempt_create_groups(matchees: list[Member],
state: State,
oldest_relevant_ts: datetime,
per_group: int) -> tuple[bool, list[list[Member]]]:
"""History aware group matching"""
num_groups = max(len(matchees)//per_group, 1)
# Set up the groups in place
groups = [[] for _ in range(num_groups)]
matchees_left = matchees.copy()
# Sequentially try and fit each matchee into a group
while matchees_left:
# Get the next matchee to place
matchee = matchees_left.pop()
matchee_matches = state.get_user_matches(matchee.id)
relevant_matches = [int(id) for id, ts
in matchee_matches.items()
if ts_to_datetime(ts) >= oldest_relevant_ts]
# Try every single group from the current group onwards
# Progressing through the groups like this ensures we slowly fill them up with compatible people
scores: list[tuple[int, float]] = []
for group in groups:
score = get_member_group_eligibility_score(
matchee, group, relevant_matches, per_group)
# If the score isn't too high, consider this group
if score <= _ScoreFactors.UPPER_THRESHOLD:
scores.append((group, score))
# Optimisation:
# A score of 0 means we've got something good enough and can skip
if score == 0:
break
if scores:
(group, _) = sorted(scores, key=lambda pair: pair[1])[0]
group.append(matchee)
else:
# If we failed to add this matchee, bail on the group creation as it could not be done
return None
return groups
def iterate_all_shifts(list: list):
"""Yields each shifted variation of the input list"""
yield list
for _ in range(len(list)-1):
list = list[1:] + [list[0]]
yield list
def members_to_groups(matchees: list[Member],
state: State,
per_group: int = 3,
allow_fallback: bool = False) -> list[list[Member]]:
"""Generate the groups from the set of matchees"""
attempts = 0 # Tracking for logging purposes
num_groups = len(matchees)//per_group
# Bail early if there's no-one to match
if not matchees:
return []
# Walk from the start of history until now trying to match up groups
for oldest_relevant_datetime in state.get_history_timestamps(matchees) + [datetime.now()]:
# Attempt with each starting matchee
for shifted_matchees in iterate_all_shifts(matchees):
attempts += 1
groups = attempt_create_groups(
shifted_matchees, state, oldest_relevant_datetime, per_group)
# Fail the match if our groups aren't big enough
if num_groups <= 1 or (groups and all(len(g) >= per_group for g in groups)):
logger.info("Matched groups after %s attempt(s)", attempts)
return groups
# If we've still failed, just use the simple method
if allow_fallback:
logger.info("Fell back to simple groups after %s attempt(s)", attempts)
return members_to_groups_simple(matchees, per_group)
# Simply assert false, this should never happen
# And should be caught by tests
assert False
async def match_groups_in_channel(state: State, channel: discord.channel, min: int):
"""Match up the groups in a given channel"""
groups = active_members_to_groups(state, channel, min)
# Send the groups
for group in groups:
message = await channel.send(
f"Matched up {util.format_list([m.mention for m in group])}!")
# Set up a thread for this match if the bot has permissions to do so
if channel.permissions_for(channel.guild.me).create_public_threads:
await channel.create_thread(
name=util.format_list([m.display_name for m in group]),
message=message,
reason="Creating a matching thread")
# Close off with a message
await channel.send("That's all folks, happy matching and remember - DFTBA!")
# Save the groups to the history
state.log_groups(groups)
logger.info("Done! Matched into %s groups.", len(groups))
def get_matchees_in_channel(state: State, channel: discord.channel):
"""Fetches the matchees in a channel"""
# Reactivate any unpaused users
state.reactivate_users(channel.id)
# Gather up the prospective matchees
return [m for m in channel.members if state.get_user_active_in_channel(m.id, channel.id)]
def active_members_to_groups(state: State, channel: discord.channel, min_members: int):
"""Helper to create groups from channel members"""
# Gather up the prospective matchees
matchees = get_matchees_in_channel(state, channel)
# Create our groups!
return members_to_groups(matchees, state, min_members, allow_fallback=True)

39
matchy/util.py Normal file
View file

@ -0,0 +1,39 @@
from datetime import datetime, timedelta
def get_day_with_suffix(day):
"""Get the suffix for a day of the month"""
if 11 <= day <= 13:
return str(day) + 'th'
else:
return str(day) + {1: 'st', 2: 'nd', 3: 'rd'}.get(day % 10, 'th')
def format_day(time: datetime) -> str:
"""Format the a given datetime"""
num = get_day_with_suffix(time.day)
day = time.strftime("%a")
return f"{day} {num}"
def format_list(list: list) -> str:
"""Format a list into a human readable format of foo, bar and bob"""
if len(list) > 1:
return f"{', '.join(list[:-1])} and {list[-1]}"
return list[0] if list else ''
def get_next_datetime(weekday, hour) -> datetime:
"""Get the next datetime for the given weekday and hour"""
now = datetime.now()
days_until_next_week = (weekday - now.weekday() + 7) % 7
# Account for when we're already beyond the time now
if days_until_next_week == 0 and now.hour >= hour:
days_until_next_week = 7
# Calculate the next datetime
next_date = now + timedelta(days=days_until_next_week)
next_date.replace(hour=hour)
return next_date

0
matchy/views/__init__.py Normal file
View file

53
matchy/views/match.py Normal file
View file

@ -0,0 +1,53 @@
"""
Class for a button that matches groups in a channel
"""
import logging
import discord
import re
import matchy.files.state as state
import matching
logger = logging.getLogger("match_button")
logger.setLevel(logging.INFO)
# Increment when adjusting the custom_id so we don't confuse old users
_MATCH_BUTTON_CUSTOM_ID_VERSION = 1
_MATCH_BUTTON_CUSTOM_ID_PREFIX = f'match:v{_MATCH_BUTTON_CUSTOM_ID_VERSION}:'
class DynamicGroupButton(discord.ui.DynamicItem[discord.ui.Button],
template=_MATCH_BUTTON_CUSTOM_ID_PREFIX + r'min:(?P<min>[0-9]+)'):
"""
Describes a simple button that lets the user trigger a match
"""
def __init__(self, min: int) -> None:
super().__init__(
discord.ui.Button(
label='Match Groups!',
style=discord.ButtonStyle.blurple,
custom_id=_MATCH_BUTTON_CUSTOM_ID_PREFIX + f'min:{min}',
)
)
self.min: int = min
self.state = state.load_from_file()
# This is called when the button is clicked and the custom_id matches the template.
@classmethod
async def from_custom_id(cls, intrctn: discord.Interaction, item: discord.ui.Button, match: re.Match[str], /):
min = int(match['min'])
return cls(min)
async def callback(self, intrctn: discord.Interaction) -> None:
"""Match up people when the button is pressed"""
logger.info("Handling button press min=%s", self.min)
logger.info("User %s from %s in #%s", intrctn.user,
intrctn.guild.name, intrctn.channel.name)
# Let the user know we've recieved the message
await intrctn.response.send_message(content="Matchy is matching matchees...", ephemeral=True)
# Perform the match
await matching.match_groups_in_channel(self.state, intrctn.channel, self.min)