226 lines
5.9 KiB
Python
226 lines
5.9 KiB
Python
import os
|
|
import requests
|
|
from datetime import datetime, timedelta, UTC
|
|
from zoneinfo import ZoneInfo
|
|
from pymongo import MongoClient
|
|
from dotenv import load_dotenv
|
|
from pymongo.errors import PyMongoError
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
ENDPOINT_URL = os.getenv("ENDPOINT_URL")
|
|
MONGO_URI = os.getenv("MONGO_URI")
|
|
DB_NAME = os.getenv("DB_NAME")
|
|
COLLECTION_NAME = os.getenv("COLLECTION_NAME")
|
|
TIMEZONE = ZoneInfo("Europe/Rome")
|
|
|
|
|
|
def convert_e6_to_degrees(e6_value):
|
|
"""
|
|
Convert E6 format to degrees.
|
|
|
|
Args:
|
|
e6_value: Integer value in E6 format (multiply by 1e-6 to get degrees)
|
|
|
|
Returns:
|
|
Float value in degrees
|
|
"""
|
|
return e6_value / 1_000_000 if e6_value else 0.0
|
|
|
|
|
|
def create_geojson_point(lat_e6, lng_e6):
|
|
"""
|
|
Create a GeoJSON Point from E6 coordinates.
|
|
|
|
Args:
|
|
lat_e6: Latitude in E6 format
|
|
lng_e6: Longitude in E6 format
|
|
|
|
Returns:
|
|
GeoJSON Point object or None if coordinates are invalid
|
|
"""
|
|
if lat_e6 == 0 and lng_e6 == 0:
|
|
return None
|
|
|
|
lat = convert_e6_to_degrees(lat_e6)
|
|
lng = convert_e6_to_degrees(lng_e6)
|
|
|
|
# GeoJSON Point format: [longitude, latitude]
|
|
return {
|
|
"type": "Point",
|
|
"coordinates": [lng, lat]
|
|
}
|
|
|
|
|
|
def transform_plext(plext):
|
|
"""
|
|
Transform a plext to use GeoJSON format for coordinates.
|
|
|
|
Args:
|
|
plext: Plext dictionary to transform
|
|
|
|
Returns:
|
|
Transformed plext with GeoJSON coordinates
|
|
"""
|
|
transformed = plext.copy()
|
|
|
|
# Transform coordinates field
|
|
if "coordinates" in transformed and isinstance(transformed["coordinates"], dict):
|
|
coords = transformed["coordinates"]
|
|
if "lat" in coords and "lng" in coords:
|
|
geojson = create_geojson_point(coords["lat"], coords["lng"])
|
|
if geojson:
|
|
transformed["coordinates"] = geojson
|
|
|
|
# Transform markup array
|
|
if "markup" in transformed and isinstance(transformed["markup"], list):
|
|
new_markup = []
|
|
for item in transformed["markup"]:
|
|
new_item = item.copy()
|
|
if "latE6" in item and "lngE6" in item:
|
|
geojson = create_geojson_point(item["latE6"], item["lngE6"])
|
|
if geojson:
|
|
new_item["location"] = geojson
|
|
new_markup.append(new_item)
|
|
transformed["markup"] = new_markup
|
|
|
|
return transformed
|
|
|
|
|
|
def get_time_range():
|
|
"""
|
|
Calculate the time range for the current execution.
|
|
Returns a tuple of (min_timestamp, max_timestamp) in ISO 8601 format with Europe/Rome timezone.
|
|
"""
|
|
now = datetime.now(TIMEZONE)
|
|
one_minute_ago = now - timedelta(minutes=1)
|
|
|
|
# Format with colon in timezone offset (e.g., +01:00 instead of +0100)
|
|
def format_with_colon(dt):
|
|
base = dt.strftime("%Y-%m-%dT%H:%M:%S")
|
|
# Get timezone offset and format with colon
|
|
offset = dt.strftime("%z")
|
|
if offset:
|
|
sign = offset[0]
|
|
hours = offset[1:3]
|
|
minutes = offset[3:5]
|
|
return f"{base}{sign}{hours}:{minutes}"
|
|
return base
|
|
|
|
min_timestamp = format_with_colon(one_minute_ago)
|
|
max_timestamp = format_with_colon(now)
|
|
|
|
return min_timestamp, max_timestamp
|
|
|
|
|
|
def fetch_plexts(min_timestamp, max_timestamp):
|
|
"""
|
|
Fetch plexts from the endpoint.
|
|
|
|
Args:
|
|
min_timestamp: Start timestamp in ISO 8601 format
|
|
max_timestamp: End timestamp in ISO 8601 format
|
|
|
|
Returns:
|
|
List of plexts or None if error occurs
|
|
"""
|
|
try:
|
|
params = {"min_timestamp": min_timestamp, "max_timestamp": max_timestamp}
|
|
response = requests.get(ENDPOINT_URL, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
return data.get("plexts", [])
|
|
except requests.RequestException as e:
|
|
print(f"Error fetching plexts: {e}")
|
|
return None
|
|
|
|
|
|
def save_to_mongodb(plexts):
|
|
"""
|
|
Save plexts to MongoDB with GeoJSON coordinates.
|
|
|
|
Args:
|
|
plexts: List of plext dictionaries to save
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
if not plexts:
|
|
print("No plexts to save")
|
|
return True
|
|
|
|
try:
|
|
client = MongoClient(MONGO_URI)
|
|
db = client[DB_NAME]
|
|
collection = db[COLLECTION_NAME]
|
|
|
|
# Transform plexts to use GeoJSON format
|
|
transformed_plexts = [transform_plext(plext) for plext in plexts]
|
|
|
|
# Insert all transformed plexts
|
|
result = collection.insert_many(transformed_plexts)
|
|
print(f"Inserted {len(result.inserted_ids)} plexts to MongoDB")
|
|
return True
|
|
except PyMongoError as e:
|
|
print(f"Error saving to MongoDB: {e}")
|
|
return False
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
def job():
|
|
"""
|
|
Main job function that runs every minute.
|
|
"""
|
|
print(
|
|
f"\n[{datetime.now(TIMEZONE).strftime('%Y-%m-%d %H:%M:%S %Z')}] Starting job..."
|
|
)
|
|
|
|
# Get time range
|
|
min_timestamp, max_timestamp = get_time_range()
|
|
print(f"Time range: {min_timestamp} to {max_timestamp}")
|
|
|
|
# Fetch plexts
|
|
plexts = fetch_plexts(min_timestamp, max_timestamp)
|
|
|
|
if plexts is None:
|
|
print("Failed to fetch plexts")
|
|
return
|
|
|
|
print(f"Fetched {len(plexts)} plexts")
|
|
|
|
# Save to MongoDB
|
|
if save_to_mongodb(plexts):
|
|
print("Job completed successfully")
|
|
else:
|
|
print("Job completed with errors")
|
|
|
|
|
|
def main():
|
|
"""
|
|
Main function to schedule and run the job.
|
|
"""
|
|
print("Starting Plex monitoring scheduler...")
|
|
print("Press Ctrl+C to stop")
|
|
|
|
# Create scheduler
|
|
scheduler = BlockingScheduler()
|
|
|
|
# Schedule job to run every minute
|
|
scheduler.add_job(job, "interval", minutes=1)
|
|
|
|
# Run the job immediately on startup
|
|
job()
|
|
|
|
# Start the scheduler
|
|
try:
|
|
scheduler.start()
|
|
except (KeyboardInterrupt, SystemExit):
|
|
print("\nScheduler stopped by user")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|