import logging import time from typing import List, Optional import requests from models import Plext, EventType # Configure logging logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) class IngressAPI: """Client for interacting with the Ingress Intel API. Attributes: BASE_URL: The base URL for the Ingress API. version: The API version string. headers: HTTP headers for API requests. """ BASE_URL = "https://intel.ingress.com/r" def __init__(self, version: str, cookie: str): """Initialize the IngressAPI client. Args: version: The API version string to use for requests. cookie: The authentication cookie string for the Ingress API. Should include csrftoken and sessionid. """ logger.info("=" * 80) logger.info("Initializing IngressAPI client") logger.info(f"API Version: {version}") logger.debug(f"Cookie length: {len(cookie)} characters") self.version = version self.headers = { "accept": "application/json, text/javascript, */*; q=0.01", "accept-language": "it-IT,it;q=0.9,en-US;q=0.8,en;q=0.7", "content-type": "application/json; charset=UTF-8", "cookie": cookie, "origin": "https://intel.ingress.com", "priority": "u=1, i", "referer": "https://intel.ingress.com/", "sec-ch-ua": '"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "x-requested-with": "XMLHttpRequest", } # Extract CSRF token from cookie and add to headers csrf_token = None for item in cookie.split(";"): if "csrftoken" in item: csrf_token = item.split("=")[1].strip() self.headers["x-csrftoken"] = csrf_token logger.debug(f"CSRF token extracted: {csrf_token[:10]}... (truncated)") break if not csrf_token: logger.warning("No CSRF token found in cookie!") logger.info(f"Headers configured with {len(self.headers)} fields") logger.debug(f"Header keys: {list(self.headers.keys())}") logger.info("IngressAPI client initialization complete") logger.info("=" * 80) def get_plexts( self, min_lat_e6: int, min_lng_e6: int, max_lat_e6: int, max_lng_e6: int, min_timestamp_ms: int = -1, max_timestamp_ms: int = -1, tab: str = "all", event_types: Optional[List[EventType]] = None, player_name: Optional[str] = None, ) -> List[Plext]: """Fetch plexts from the Ingress API. Args: min_lat_e6: Minimum latitude in microdegrees (E6 format). min_lng_e6: Minimum longitude in microdegrees (E6 format). max_lat_e6: Maximum latitude in microdegrees (E6 format). max_lng_e6: Maximum longitude in microdegrees (E6 format). min_timestamp_ms: Minimum timestamp in milliseconds since epoch. Use -1 for no minimum. max_timestamp_ms: Maximum timestamp in milliseconds since epoch. Use -1 for no maximum. tab: The tab to fetch from (default: "all"). event_types: Optional list of event types to filter by. player_name: Optional player name to filter by. Returns: A list of Plext objects matching the specified criteria. Raises: requests.HTTPError: If the API request fails. requests.exceptions.JSONDecodeError: If the response cannot be decoded as JSON. """ logger.info("-" * 80) logger.info("get_plexts method called") logger.info(f"Parameters:") logger.info(f" - min_lat_e6: {min_lat_e6}") logger.info(f" - min_lng_e6: {min_lng_e6}") logger.info(f" - max_lat_e6: {max_lat_e6}") logger.info(f" - max_lng_e6: {max_lng_e6}") logger.info(f" - min_timestamp_ms: {min_timestamp_ms}") logger.info(f" - max_timestamp_ms: {max_timestamp_ms}") logger.info(f" - tab: {tab}") logger.info(f" - event_types: {event_types}") logger.info(f" - player_name: {player_name}") payload = { "minLatE6": min_lat_e6, "minLngE6": min_lng_e6, "maxLatE6": max_lat_e6, "maxLngE6": max_lng_e6, "minTimestampMs": min_timestamp_ms, "maxTimestampMs": max_timestamp_ms, "ascendingTimestampOrder": True, "tab": tab, "v": self.version, } url = f"{self.BASE_URL}/getPlexts" logger.info(f"Preparing HTTP POST request to: {url}") logger.debug(f"Request payload: {payload}") logger.debug( f"Request headers (excluding cookie): { {k: v for k, v in self.headers.items() if k != 'cookie'} }" ) start_time = time.time() try: logger.info("Sending HTTP request...") response = requests.post(url, json=payload, headers=self.headers) elapsed_time = time.time() - start_time logger.info(f"HTTP Response received") logger.info(f" - Status Code: {response.status_code}") logger.info(f" - Status Text: {response.reason}") logger.info(f" - Response Time: {elapsed_time:.3f} seconds") logger.info(f" - Response Size: {len(response.content)} bytes") logger.debug(f" - Response Headers: {dict(response.headers)}") # Log response content for debugging (truncated if too large) response_text = response.text if len(response_text) > 500: logger.debug( f" - Response Content (first 500 chars): {response_text[:500]}..." ) else: logger.debug(f" - Response Content: {response_text}") # Check for non-200 status codes if response.status_code != 200: logger.error(f"Non-200 status code received: {response.status_code}") logger.error(f"Response body: {response.text}") # Raise exception for HTTP errors response.raise_for_status() logger.info("HTTP request successful (status code 200)") except requests.exceptions.HTTPError as e: logger.error(f"HTTP Error occurred!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.error(f" - Response Status Code: {response.status_code}") logger.error(f" - Response Text: {response.text}") logger.error(f" - Request URL: {url}") logger.error(f" - Request Payload: {payload}") logger.exception("Full exception traceback:") raise except requests.exceptions.ConnectionError as e: logger.error(f"Connection Error occurred!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.error(f" - Request URL: {url}") logger.exception("Full exception traceback:") raise except requests.exceptions.Timeout as e: logger.error(f"Timeout Error occurred!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.error(f" - Request URL: {url}") logger.exception("Full exception traceback:") raise except requests.exceptions.RequestException as e: logger.error(f"Request Exception occurred!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.error(f" - Request URL: {url}") logger.exception("Full exception traceback:") raise try: logger.info("Parsing JSON response...") data = response.json() logger.info("JSON parsing successful") logger.debug(f"JSON keys: {list(data.keys())}") if "result" in data: result_count = len(data["result"]) logger.info(f"Found {result_count} plexts in response") else: logger.warning("No 'result' key found in JSON response") logger.warning(f"Available keys: {list(data.keys())}") except requests.exceptions.JSONDecodeError as e: logger.error(f"JSON Decode Error occurred!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.error(f" - Response Text: {response.text}") logger.error(f" - Response Status Code: {response.status_code}") logger.exception("Full exception traceback:") raise except Exception as e: logger.error(f"Unexpected error during JSON parsing!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.exception("Full exception traceback:") raise try: logger.info("Creating Plext objects from JSON data...") plexts = [Plext.from_json(item) for item in data["result"]] logger.info(f"Successfully created {len(plexts)} Plext objects") except Exception as e: logger.error(f"Error creating Plext objects!") logger.error(f" - Exception Type: {type(e).__name__}") logger.error(f" - Exception Message: {str(e)}") logger.exception("Full exception traceback:") raise # Apply event type filter if event_types: logger.info(f"Filtering by event types: {event_types}") initial_count = len(plexts) plexts = [p for p in plexts if p.get_event_type() in event_types] filtered_count = len(plexts) logger.info( f"Event type filter: {initial_count} -> {filtered_count} plexts" ) logger.debug(f"Filtered out {initial_count - filtered_count} plexts") # Apply player name filter if player_name: logger.info(f"Filtering by player name: {player_name}") initial_count = len(plexts) plexts = [p for p in plexts if p.get_player_name() == player_name] filtered_count = len(plexts) logger.info( f"Player name filter: {initial_count} -> {filtered_count} plexts" ) logger.debug(f"Filtered out {initial_count - filtered_count} plexts") logger.info(f"Returning {len(plexts)} plexts") logger.info("-" * 80) return plexts