Files
Ingress/schedule.py
2026-01-15 11:59:15 +01:00

226 lines
5.9 KiB
Python

import os
import requests
from datetime import datetime, timedelta, UTC
from zoneinfo import ZoneInfo
from pymongo import MongoClient
from dotenv import load_dotenv
from pymongo.errors import PyMongoError
from apscheduler.schedulers.blocking import BlockingScheduler
load_dotenv()
# Configuration
ENDPOINT_URL = os.getenv("ENDPOINT_URL")
MONGO_URI = os.getenv("MONGO_URI")
DB_NAME = os.getenv("DB_NAME")
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
TIMEZONE = ZoneInfo("Europe/Rome")
def convert_e6_to_degrees(e6_value):
"""
Convert E6 format to degrees.
Args:
e6_value: Integer value in E6 format (multiply by 1e-6 to get degrees)
Returns:
Float value in degrees
"""
return e6_value / 1_000_000 if e6_value else 0.0
def create_geojson_point(lat_e6, lng_e6):
"""
Create a GeoJSON Point from E6 coordinates.
Args:
lat_e6: Latitude in E6 format
lng_e6: Longitude in E6 format
Returns:
GeoJSON Point object or None if coordinates are invalid
"""
if lat_e6 == 0 and lng_e6 == 0:
return None
lat = convert_e6_to_degrees(lat_e6)
lng = convert_e6_to_degrees(lng_e6)
# GeoJSON Point format: [longitude, latitude]
return {
"type": "Point",
"coordinates": [lng, lat]
}
def transform_plext(plext):
"""
Transform a plext to use GeoJSON format for coordinates.
Args:
plext: Plext dictionary to transform
Returns:
Transformed plext with GeoJSON coordinates
"""
transformed = plext.copy()
# Transform coordinates field
if "coordinates" in transformed and isinstance(transformed["coordinates"], dict):
coords = transformed["coordinates"]
if "lat" in coords and "lng" in coords:
geojson = create_geojson_point(coords["lat"], coords["lng"])
if geojson:
transformed["coordinates"] = geojson
# Transform markup array
if "markup" in transformed and isinstance(transformed["markup"], list):
new_markup = []
for item in transformed["markup"]:
new_item = item.copy()
if "latE6" in item and "lngE6" in item:
geojson = create_geojson_point(item["latE6"], item["lngE6"])
if geojson:
new_item["location"] = geojson
new_markup.append(new_item)
transformed["markup"] = new_markup
return transformed
def get_time_range():
"""
Calculate the time range for the current execution.
Returns a tuple of (min_timestamp, max_timestamp) in ISO 8601 format with Europe/Rome timezone.
"""
now = datetime.now(TIMEZONE)
one_minute_ago = now - timedelta(minutes=1)
# Format with colon in timezone offset (e.g., +01:00 instead of +0100)
def format_with_colon(dt):
base = dt.strftime("%Y-%m-%dT%H:%M:%S")
# Get timezone offset and format with colon
offset = dt.strftime("%z")
if offset:
sign = offset[0]
hours = offset[1:3]
minutes = offset[3:5]
return f"{base}{sign}{hours}:{minutes}"
return base
min_timestamp = format_with_colon(one_minute_ago)
max_timestamp = format_with_colon(now)
return min_timestamp, max_timestamp
def fetch_plexts(min_timestamp, max_timestamp):
"""
Fetch plexts from the endpoint.
Args:
min_timestamp: Start timestamp in ISO 8601 format
max_timestamp: End timestamp in ISO 8601 format
Returns:
List of plexts or None if error occurs
"""
try:
params = {"min_timestamp": min_timestamp, "max_timestamp": max_timestamp}
response = requests.get(ENDPOINT_URL, params=params, timeout=30)
response.raise_for_status()
data = response.json()
return data.get("plexts", [])
except requests.RequestException as e:
print(f"Error fetching plexts: {e}")
return None
def save_to_mongodb(plexts):
"""
Save plexts to MongoDB with GeoJSON coordinates.
Args:
plexts: List of plext dictionaries to save
Returns:
True if successful, False otherwise
"""
if not plexts:
print("No plexts to save")
return True
try:
client = MongoClient(MONGO_URI)
db = client[DB_NAME]
collection = db[COLLECTION_NAME]
# Transform plexts to use GeoJSON format
transformed_plexts = [transform_plext(plext) for plext in plexts]
# Insert all transformed plexts
result = collection.insert_many(transformed_plexts)
print(f"Inserted {len(result.inserted_ids)} plexts to MongoDB")
return True
except PyMongoError as e:
print(f"Error saving to MongoDB: {e}")
return False
finally:
client.close()
def job():
"""
Main job function that runs every minute.
"""
print(
f"\n[{datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S %Z')}] Starting job..."
)
# Get time range
min_timestamp, max_timestamp = get_time_range()
print(f"Time range: {min_timestamp} to {max_timestamp}")
# Fetch plexts
plexts = fetch_plexts(min_timestamp, max_timestamp)
if plexts is None:
print("Failed to fetch plexts")
return
print(f"Fetched {len(plexts)} plexts")
# Save to MongoDB
if save_to_mongodb(plexts):
print("Job completed successfully")
else:
print("Job completed with errors")
def main():
"""
Main function to schedule and run the job.
"""
print("Starting Plex monitoring scheduler...")
print("Press Ctrl+C to stop")
# Create scheduler
scheduler = BlockingScheduler()
# Schedule job to run every minute
scheduler.add_job(job, "interval", minutes=1)
# Run the job immediately on startup
job()
# Start the scheduler
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
print("\nScheduler stopped by user")
if __name__ == "__main__":
main()