146 lines
4.3 KiB
Python
146 lines
4.3 KiB
Python
import os
|
|
import argparse
|
|
from typing import List
|
|
from datetime import datetime
|
|
from dotenv import load_dotenv
|
|
from ingress import IngressAPI
|
|
from models import EventType, Plext
|
|
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def parse_timestamp(value: str) -> int:
|
|
"""
|
|
Parse timestamp from either milliseconds (int) or ISO 8601 string.
|
|
|
|
Args:
|
|
value: Either integer milliseconds or ISO 8601 string
|
|
|
|
Returns:
|
|
Timestamp in milliseconds since epoch
|
|
|
|
Raises:
|
|
ValueError: If format is invalid
|
|
"""
|
|
# Try parsing as integer (milliseconds)
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Try parsing as ISO 8601 datetime
|
|
try:
|
|
# Handle 'Z' suffix (UTC)
|
|
iso_value = value.replace("Z", "+00:00")
|
|
dt = datetime.fromisoformat(iso_value)
|
|
return int(dt.timestamp() * 1000)
|
|
except ValueError:
|
|
raise ValueError(
|
|
f"Invalid timestamp format: {value}. "
|
|
"Use milliseconds (e.g., 1736659200000) or "
|
|
"ISO 8601 format (e.g., '2026-01-12T10:00:00Z')"
|
|
)
|
|
|
|
|
|
def print_plexts(plexts: List[Plext]):
|
|
"""Print plexts to stdout in a human-readable format.
|
|
|
|
Sorts plexts by timestamp (ascending - oldest first) and prints each
|
|
plext with its timestamp, event type, text, and coordinates if available.
|
|
|
|
Args:
|
|
plexts: List of Plext objects to print.
|
|
"""
|
|
# Sort plexts by timestamp (ascending - oldest first)
|
|
sorted_plexts = sorted(plexts, key=lambda p: p.timestamp)
|
|
|
|
for plext in sorted_plexts:
|
|
dt = datetime.fromtimestamp(plext.timestamp / 1000.0)
|
|
timestamp_str = dt.strftime("%Y-%m-%d %H:%M:%S")
|
|
coords = plext.get_event_coordinates()
|
|
if coords:
|
|
lat, lng = coords
|
|
print(
|
|
f"[{timestamp_str}] [{plext.get_event_type().name}] {plext.text} - Coords: {lat}, {lng}"
|
|
)
|
|
else:
|
|
print(f"[{timestamp_str}] [{plext.get_event_type().name}] {plext.text}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the Ingress Intel Report CLI.
|
|
|
|
Parses command-line arguments, initializes the IngressAPI client,
|
|
fetches plexts based on the specified filters, and prints them.
|
|
"""
|
|
parser = argparse.ArgumentParser(description="Ingress Intel Report")
|
|
parser.add_argument(
|
|
"--event-types",
|
|
nargs="+",
|
|
type=str,
|
|
choices=[e.name for e in EventType],
|
|
help="List of event types to filter by.",
|
|
)
|
|
parser.add_argument("--player-name", type=str, help="Player name to filter by.")
|
|
parser.add_argument(
|
|
"--min-lat", type=int, default=45470259, help="Minimum latitude."
|
|
)
|
|
parser.add_argument(
|
|
"--min-lng", type=int, default=12244155, help="Minimum longitude."
|
|
)
|
|
parser.add_argument(
|
|
"--max-lat", type=int, default=45480370, help="Maximum latitude."
|
|
)
|
|
parser.add_argument(
|
|
"--max-lng", type=int, default=12298207, help="Maximum longitude."
|
|
)
|
|
parser.add_argument(
|
|
"--min-timestamp",
|
|
type=parse_timestamp,
|
|
default=-1,
|
|
help="Minimum timestamp (milliseconds since epoch or ISO 8601 format, e.g., '2026-01-12T10:00:00Z')",
|
|
)
|
|
parser.add_argument(
|
|
"--max-timestamp",
|
|
type=parse_timestamp,
|
|
default=-1,
|
|
help="Maximum timestamp (milliseconds since epoch or ISO 8601 format, e.g., '2026-01-12T10:00:00Z')",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# It's recommended to set the INGRESS_COOKIE environment variable
|
|
# instead of hardcoding the cookie in the script.
|
|
cookie = os.getenv("INGRESS_COOKIE")
|
|
|
|
# This is the version from the curl command in json_doc.md
|
|
# Replace with a valid one if it expires
|
|
client = IngressAPI(
|
|
version=os.getenv("V"),
|
|
cookie=cookie,
|
|
)
|
|
|
|
event_types = (
|
|
[EventType[et] for et in args.event_types] if args.event_types else None
|
|
)
|
|
|
|
try:
|
|
plexts = client.get_plexts(
|
|
min_lat_e6=args.min_lat,
|
|
min_lng_e6=args.min_lng,
|
|
max_lat_e6=args.max_lat,
|
|
max_lng_e6=args.max_lng,
|
|
min_timestamp_ms=args.min_timestamp,
|
|
max_timestamp_ms=args.max_timestamp,
|
|
event_types=event_types,
|
|
player_name=args.player_name,
|
|
)
|
|
print_plexts(plexts)
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|