dockerize!

This commit is contained in:
Matteo Rosati
2026-01-14 23:54:51 +01:00
parent 1e76403565
commit 46ddcc16da
14 changed files with 1362 additions and 18 deletions

View File

@@ -1,12 +1,42 @@
import logging
import time
from typing import List, Optional
import requests
from models import Plext, EventType
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)
class IngressAPI:
"""Client for interacting with the Ingress Intel API.
Attributes:
BASE_URL: The base URL for the Ingress API.
version: The API version string.
headers: HTTP headers for API requests.
"""
BASE_URL = "https://intel.ingress.com/r"
def __init__(self, version: str, cookie: str):
"""Initialize the IngressAPI client.
Args:
version: The API version string to use for requests.
cookie: The authentication cookie string for the Ingress API.
Should include csrftoken and sessionid.
"""
logger.info("=" * 80)
logger.info("Initializing IngressAPI client")
logger.info(f"API Version: {version}")
logger.debug(f"Cookie length: {len(cookie)} characters")
self.version = version
self.headers = {
"accept": "application/json, text/javascript, */*; q=0.01",
@@ -27,11 +57,22 @@ class IngressAPI:
}
# Extract CSRF token from cookie and add to headers
csrf_token = None
for item in cookie.split(";"):
if "csrftoken" in item:
self.headers["x-csrftoken"] = item.split("=")[1].strip()
csrf_token = item.split("=")[1].strip()
self.headers["x-csrftoken"] = csrf_token
logger.debug(f"CSRF token extracted: {csrf_token[:10]}... (truncated)")
break
if not csrf_token:
logger.warning("No CSRF token found in cookie!")
logger.info(f"Headers configured with {len(self.headers)} fields")
logger.debug(f"Header keys: {list(self.headers.keys())}")
logger.info("IngressAPI client initialization complete")
logger.info("=" * 80)
def get_plexts(
self,
min_lat_e6: int,
@@ -44,9 +85,42 @@ class IngressAPI:
event_types: Optional[List[EventType]] = None,
player_name: Optional[str] = None,
) -> List[Plext]:
"""Fetch plexts from the Ingress API.
Args:
min_lat_e6: Minimum latitude in microdegrees (E6 format).
min_lng_e6: Minimum longitude in microdegrees (E6 format).
max_lat_e6: Maximum latitude in microdegrees (E6 format).
max_lng_e6: Maximum longitude in microdegrees (E6 format).
min_timestamp_ms: Minimum timestamp in milliseconds since epoch.
Use -1 for no minimum.
max_timestamp_ms: Maximum timestamp in milliseconds since epoch.
Use -1 for no maximum.
tab: The tab to fetch from (default: "all").
event_types: Optional list of event types to filter by.
player_name: Optional player name to filter by.
Returns:
A list of Plext objects matching the specified criteria.
Raises:
requests.HTTPError: If the API request fails.
requests.exceptions.JSONDecodeError: If the response cannot be
decoded as JSON.
"""
Fetches plexts from the Ingress API.
"""
logger.info("-" * 80)
logger.info("get_plexts method called")
logger.info(f"Parameters:")
logger.info(f" - min_lat_e6: {min_lat_e6}")
logger.info(f" - min_lng_e6: {min_lng_e6}")
logger.info(f" - max_lat_e6: {max_lat_e6}")
logger.info(f" - max_lng_e6: {max_lng_e6}")
logger.info(f" - min_timestamp_ms: {min_timestamp_ms}")
logger.info(f" - max_timestamp_ms: {max_timestamp_ms}")
logger.info(f" - tab: {tab}")
logger.info(f" - event_types: {event_types}")
logger.info(f" - player_name: {player_name}")
payload = {
"minLatE6": min_lat_e6,
"minLngE6": min_lng_e6,
@@ -54,27 +128,147 @@ class IngressAPI:
"maxLngE6": max_lng_e6,
"minTimestampMs": min_timestamp_ms,
"maxTimestampMs": max_timestamp_ms,
"ascendingTimestampOrder": True,
"tab": tab,
"v": self.version,
}
response = requests.post(
f"{self.BASE_URL}/getPlexts", json=payload, headers=self.headers
url = f"{self.BASE_URL}/getPlexts"
logger.info(f"Preparing HTTP POST request to: {url}")
logger.debug(f"Request payload: {payload}")
logger.debug(
f"Request headers (excluding cookie): { {k: v for k, v in self.headers.items() if k != 'cookie'} }"
)
response.raise_for_status()
start_time = time.time()
try:
data = response.json()
except requests.exceptions.JSONDecodeError:
print(f"Error decoding JSON: {response.text}")
logger.info("Sending HTTP request...")
response = requests.post(url, json=payload, headers=self.headers)
elapsed_time = time.time() - start_time
logger.info(f"HTTP Response received")
logger.info(f" - Status Code: {response.status_code}")
logger.info(f" - Status Text: {response.reason}")
logger.info(f" - Response Time: {elapsed_time:.3f} seconds")
logger.info(f" - Response Size: {len(response.content)} bytes")
logger.debug(f" - Response Headers: {dict(response.headers)}")
# Log response content for debugging (truncated if too large)
response_text = response.text
if len(response_text) > 500:
logger.debug(
f" - Response Content (first 500 chars): {response_text[:500]}..."
)
else:
logger.debug(f" - Response Content: {response_text}")
# Check for non-200 status codes
if response.status_code != 200:
logger.error(f"Non-200 status code received: {response.status_code}")
logger.error(f"Response body: {response.text}")
# Raise exception for HTTP errors
response.raise_for_status()
logger.info("HTTP request successful (status code 200)")
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP Error occurred!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.error(f" - Response Status Code: {response.status_code}")
logger.error(f" - Response Text: {response.text}")
logger.error(f" - Request URL: {url}")
logger.error(f" - Request Payload: {payload}")
logger.exception("Full exception traceback:")
raise
plexts = [Plext.from_json(item) for item in data["result"]]
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection Error occurred!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.error(f" - Request URL: {url}")
logger.exception("Full exception traceback:")
raise
except requests.exceptions.Timeout as e:
logger.error(f"Timeout Error occurred!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.error(f" - Request URL: {url}")
logger.exception("Full exception traceback:")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Request Exception occurred!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.error(f" - Request URL: {url}")
logger.exception("Full exception traceback:")
raise
try:
logger.info("Parsing JSON response...")
data = response.json()
logger.info("JSON parsing successful")
logger.debug(f"JSON keys: {list(data.keys())}")
if "result" in data:
result_count = len(data["result"])
logger.info(f"Found {result_count} plexts in response")
else:
logger.warning("No 'result' key found in JSON response")
logger.warning(f"Available keys: {list(data.keys())}")
except requests.exceptions.JSONDecodeError as e:
logger.error(f"JSON Decode Error occurred!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.error(f" - Response Text: {response.text}")
logger.error(f" - Response Status Code: {response.status_code}")
logger.exception("Full exception traceback:")
raise
except Exception as e:
logger.error(f"Unexpected error during JSON parsing!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.exception("Full exception traceback:")
raise
try:
logger.info("Creating Plext objects from JSON data...")
plexts = [Plext.from_json(item) for item in data["result"]]
logger.info(f"Successfully created {len(plexts)} Plext objects")
except Exception as e:
logger.error(f"Error creating Plext objects!")
logger.error(f" - Exception Type: {type(e).__name__}")
logger.error(f" - Exception Message: {str(e)}")
logger.exception("Full exception traceback:")
raise
# Apply event type filter
if event_types:
logger.info(f"Filtering by event types: {event_types}")
initial_count = len(plexts)
plexts = [p for p in plexts if p.get_event_type() in event_types]
filtered_count = len(plexts)
logger.info(
f"Event type filter: {initial_count} -> {filtered_count} plexts"
)
logger.debug(f"Filtered out {initial_count - filtered_count} plexts")
# Apply player name filter
if player_name:
logger.info(f"Filtering by player name: {player_name}")
initial_count = len(plexts)
plexts = [p for p in plexts if p.get_player_name() == player_name]
filtered_count = len(plexts)
logger.info(
f"Player name filter: {initial_count} -> {filtered_count} plexts"
)
logger.debug(f"Filtered out {initial_count - filtered_count} plexts")
logger.info(f"Returning {len(plexts)} plexts")
logger.info("-" * 80)
return plexts