192 lines
6.1 KiB
Python
192 lines
6.1 KiB
Python
"""Utility functions for matchy"""
|
|
import logging
|
|
import random
|
|
from datetime import datetime, timedelta
|
|
from typing import Protocol, runtime_checkable
|
|
import history
|
|
|
|
|
|
# Number of days to step forward from the start of history for each match attempt
|
|
_ATTEMPT_TIMESTEP_INCREMENT = timedelta(days=7)
|
|
|
|
# Attempts for each of those time periods
|
|
_ATTEMPTS_PER_TIMESTEP = 3
|
|
|
|
# Various eligability scoring factors for group meetups
|
|
_SCORE_CURRENT_MEMBERS = 2**1
|
|
_SCORE_REPEAT_ROLE = 2**2
|
|
_SCORE_REPEAT_MATCH = 2**3
|
|
_SCORE_EXTRA_MEMBERS = 2**4
|
|
|
|
# Scores higher than this are fully rejected
|
|
_SCORE_UPPER_THRESHOLD = 2**6
|
|
|
|
logger = logging.getLogger("matching")
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
@runtime_checkable
|
|
class Role(Protocol):
|
|
@property
|
|
def id(self) -> int:
|
|
pass
|
|
|
|
|
|
@runtime_checkable
|
|
class Member(Protocol):
|
|
@property
|
|
def mention(self) -> str:
|
|
pass
|
|
|
|
@property
|
|
def id(self) -> int:
|
|
pass
|
|
|
|
@property
|
|
def roles(self) -> list[Role]:
|
|
pass
|
|
|
|
|
|
@runtime_checkable
|
|
class Role(Protocol):
|
|
@property
|
|
def name(self) -> str:
|
|
pass
|
|
|
|
|
|
@runtime_checkable
|
|
class Guild(Protocol):
|
|
@property
|
|
def roles(self) -> list[Role]:
|
|
pass
|
|
|
|
|
|
def members_to_groups_simple(matchees: list[Member], per_group: int) -> tuple[bool, list[list[Member]]]:
|
|
"""Super simple group matching, literally no logic"""
|
|
num_groups = max(len(matchees)//per_group, 1)
|
|
return [matchees[i::num_groups] for i in range(num_groups)]
|
|
|
|
|
|
def get_member_group_eligibility_score(member: Member,
|
|
group: list[Member],
|
|
relevant_matches: list[int],
|
|
per_group: int) -> int:
|
|
"""Rates a member against a group"""
|
|
rating = len(group) * _SCORE_CURRENT_MEMBERS
|
|
|
|
repeat_meetings = sum(m.id in relevant_matches for m in group)
|
|
rating += repeat_meetings * _SCORE_REPEAT_MATCH
|
|
|
|
repeat_roles = sum(r in member.roles for r in (m.roles for m in group))
|
|
rating += (repeat_roles * _SCORE_REPEAT_ROLE)
|
|
|
|
extra_members = len(group) - per_group
|
|
if extra_members > 0:
|
|
rating += extra_members * _SCORE_EXTRA_MEMBERS
|
|
|
|
return rating
|
|
|
|
|
|
def attempt_create_groups(matchees: list[Member],
|
|
hist: history.History,
|
|
oldest_relevant_ts: datetime,
|
|
per_group: int) -> tuple[bool, list[list[Member]]]:
|
|
"""History aware group matching"""
|
|
num_groups = max(len(matchees)//per_group, 1)
|
|
|
|
# Set up the groups in place
|
|
groups = list([] for _ in range(num_groups))
|
|
|
|
matchees_left = matchees.copy()
|
|
|
|
# Sequentially try and fit each matchee into a group
|
|
while matchees_left:
|
|
# Get the next matchee to place
|
|
matchee = matchees_left.pop()
|
|
matchee_matches = hist.matchees.get(
|
|
str(matchee.id), {}).get("matches", {})
|
|
relevant_matches = list(int(id) for id, ts in matchee_matches.items()
|
|
if history.ts_to_datetime(ts) >= oldest_relevant_ts)
|
|
|
|
# Try every single group from the current group onwards
|
|
# Progressing through the groups like this ensures we slowly fill them up with compatible people
|
|
scores: list[tuple[int, int]] = []
|
|
for group in groups:
|
|
|
|
score = get_member_group_eligibility_score(
|
|
matchee, group, relevant_matches, num_groups)
|
|
|
|
# If the score isn't too high, consider this group
|
|
if score <= _SCORE_UPPER_THRESHOLD:
|
|
scores.append((group, score))
|
|
|
|
# Optimisation:
|
|
# A score of 0 means we've got something good enough and can skip
|
|
if score == 0:
|
|
break
|
|
|
|
if scores:
|
|
(group, _) = sorted(scores, key=lambda pair: pair[1])[0]
|
|
group.append(matchee)
|
|
else:
|
|
# If we failed to add this matchee, bail on the group creation as it could not be done
|
|
return None
|
|
|
|
return groups
|
|
|
|
|
|
def datetime_range(start_time: datetime, increment: timedelta, end: datetime):
|
|
"""Yields a datetime range with a given increment"""
|
|
current = start_time
|
|
while current <= end or end is None:
|
|
yield current
|
|
current += increment
|
|
|
|
|
|
def members_to_groups(matchees: list[Member],
|
|
hist: history.History = history.History(),
|
|
per_group: int = 3,
|
|
allow_fallback: bool = False) -> list[list[Member]]:
|
|
"""Generate the groups from the set of matchees"""
|
|
attempts = 0 # Tracking for logging purposes
|
|
rand = random.Random(117) # Some stable randomness
|
|
|
|
# Grab the oldest timestamp
|
|
history_start = hist.oldest() or datetime.now()
|
|
|
|
# Walk from the start of time until now using the timestep increment
|
|
for oldest_relevant_datetime in datetime_range(history_start, _ATTEMPT_TIMESTEP_INCREMENT, datetime.now()):
|
|
|
|
# Have a few attempts before stepping forward in time
|
|
for _ in range(_ATTEMPTS_PER_TIMESTEP):
|
|
|
|
rand.shuffle(matchees) # Shuffle the matchees each attempt
|
|
|
|
attempts += 1
|
|
groups = attempt_create_groups(
|
|
matchees, hist, oldest_relevant_datetime, per_group)
|
|
|
|
# Fail the match if our groups aren't big enough
|
|
if (len(matchees)//per_group) <= 1 or (groups and all(len(g) >= per_group for g in groups)):
|
|
logger.info("Matched groups after %s attempt(s)", attempts)
|
|
return groups
|
|
|
|
# If we've still failed, just use the simple method
|
|
if allow_fallback:
|
|
logger.info("Fell back to simple groups after %s attempt(s)", attempts)
|
|
return members_to_groups_simple(matchees, per_group)
|
|
|
|
|
|
def group_to_message(group: list[Member]) -> str:
|
|
"""Get the message to send for each group"""
|
|
mentions = [m.mention for m in group]
|
|
if len(group) > 1:
|
|
mentions = f"{', '.join(mentions[:-1])} and {mentions[-1]}"
|
|
else:
|
|
mentions = mentions[0]
|
|
return f"Matched up {mentions}!"
|
|
|
|
|
|
def get_role_from_guild(guild: Guild, role: str) -> Role:
|
|
"""Find a role in a guild"""
|
|
return next((r for r in guild.roles if r.name == role), None)
|