diff --git a/lib.py b/lib.py new file mode 100644 index 0000000..d384703 --- /dev/null +++ b/lib.py @@ -0,0 +1,147 @@ +import os +import logging +from datetime import datetime +from zoneinfo import ZoneInfo +from functools import wraps +from flask import request, Response +from models import Plext + +# Timezone configuration +TIMEZONE = ZoneInfo("Europe/Rome") + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) +logger = logging.getLogger(__name__) + + +def check_basic_auth(username: str, password: str) -> bool: + """ + Check if the provided username and password match the configured credentials. + + Args: + username: Username from the request + password: Password from the request + + Returns: + True if credentials match, False otherwise + """ + expected_username = os.getenv("BASIC_AUTH_USER") + expected_password = os.getenv("BASIC_AUTH_PASSWORD") + + if not expected_username or not expected_password: + logger.warning("BASIC_AUTH_USER or BASIC_AUTH_PASSWORD not configured") + return False + + return username == expected_username and password == expected_password + + +def basic_auth_required(f): + """ + Decorator to require basic authentication for an endpoint. + + Returns: + 401 Unauthorized if authentication fails or is missing + """ + + @wraps(f) + def decorated(*args, **kwargs): + auth = request.authorization + + if not auth or not check_basic_auth(auth.username, auth.password): + return Response( + "Could not verify your access level for that URL.\n" + "You have to login with proper credentials", + 401, + {"WWW-Authenticate": 'Basic realm="Login Required"'}, + ) + + return f(*args, **kwargs) + + return decorated + + +def parse_timestamp(value: str) -> int: + """ + Parse timestamp from either milliseconds (int) or ISO 8601 string. + + Args: + value: Either integer milliseconds or ISO 8601 string + + Returns: + Timestamp in milliseconds since epoch + + Raises: + ValueError: If format is invalid + """ + # Try parsing as integer (milliseconds) + try: + return int(value) + except ValueError: + pass + + # Try parsing as ISO 8601 datetime + try: + # Handle 'Z' suffix (UTC) + iso_value = value.replace("Z", "+00:00") + + # Handle timezone offset without colon (e.g., +0100 -> +01:00) + # Match pattern like +0100 or -0100 at the end of the string + import re + + match = re.search(r"([+-])(\d{2})(\d{2})$", iso_value) + if match: + sign, hours, minutes = match.groups() + iso_value = re.sub( + r"([+-])(\d{2})(\d{2})$", f"{sign}{hours}:{minutes}", iso_value + ) + + dt = datetime.fromisoformat(iso_value) + return int(dt.timestamp() * 1000) + except ValueError: + raise ValueError( + f"Invalid timestamp format: {value}. " + "Use milliseconds (e.g., 1736659200000) or " + "ISO 8601 format (e.g., '2026-01-12T10:00:00Z' or '2026-01-12T10:00:00+01:00')" + ) + + +def plext_to_dict(plext: Plext) -> dict: + """ + Convert a Plext object to a dictionary for JSON serialization. + + Args: + plext: Plext object to convert + + Returns: + Dictionary representation of the plext + """ + coords = plext.get_event_coordinates() + # Convert timestamp from milliseconds to datetime in Europe/Rome timezone + dt = datetime.fromtimestamp(plext.timestamp / 1000.0, tz=TIMEZONE) + return { + "id": plext.id, + "timestamp": plext.timestamp, + "timestamp_formatted": dt.strftime("%Y-%m-%d %H:%M:%S"), + "text": plext.text, + "team": plext.team, + "plext_type": plext.plext_type, + "categories": plext.categories, + "event_type": plext.get_event_type().name, + "player_name": plext.get_player_name(), + "portal_name": plext.get_portal_name(), + "coordinates": {"lat": coords[0], "lng": coords[1]} if coords else None, + "markup": [ + { + "type": m.type, + "plain": m.plain, + "team": m.team, + "name": m.name, + "address": m.address, + "latE6": m.latE6, + "lngE6": m.lngE6, + } + for m in plext.markup + ], + }