Files
geozoner-back/app/services/geo_pipeline.py
Redsandy 3ea4fb4771 init
2026-03-14 18:48:57 +03:00

212 lines
6.3 KiB
Python

"""Geo pipeline — processes raw GPS tracks into zone polygons.
All geo math is done via Shapely (pure Python), so it works
with both SQLite (dev) and PostgreSQL (prod).
"""
import json
import math
import uuid
from datetime import datetime, timezone
from rdp import rdp
from shapely.geometry import Polygon, mapping
from shapely import wkt
from sqlmodel import Session
from app.config import (
MIN_ZONE_AREA_M2,
LOOP_CLOSURE_RADIUS_M,
DOUGLAS_PEUCKER_EPSILON_M,
)
from app.models.activity import Activity
from app.models.zone import Zone
from app.schemas.activity import GpsPoint, ActivityDetail
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Haversine distance in meters between two lat/lon points."""
R = 6_371_000 # Earth radius in meters
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = (
math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
)
return 2 * R * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def compute_distance_m(points: list[GpsPoint]) -> float:
"""Total distance in meters along a GPS track."""
total = 0.0
for i in range(1, len(points)):
total += haversine_m(
points[i - 1].lat, points[i - 1].lon, points[i].lat, points[i].lon
)
return total
def simplify_track(points: list[GpsPoint], epsilon: float) -> list[GpsPoint]:
"""Douglas-Peucker simplification on lat/lon coordinates.
epsilon is in approximate degrees; we convert from meters.
~0.00001 degree ≈ 1.11 m at the equator.
"""
coords = [(p.lat, p.lon) for p in points]
eps_deg = epsilon / 111_000 # rough meters->degrees
simplified = rdp(coords, epsilon=eps_deg)
# Rebuild GpsPoint list (drop timestamps for simplified points)
return [GpsPoint(lat=c[0], lon=c[1]) for c in simplified]
def is_loop_closed(points: list[GpsPoint], radius_m: float) -> bool:
"""Check if start and end points are within radius_m meters."""
if len(points) < 4:
return False
return (
haversine_m(points[0].lat, points[0].lon, points[-1].lat, points[-1].lon)
<= radius_m
)
def build_polygon(points: list[GpsPoint]) -> Polygon | None:
"""Build a Shapely Polygon from GPS points.
Uses (lon, lat) ordering for GeoJSON/WKT compatibility.
Closes the ring if needed.
"""
coords = [(p.lon, p.lat) for p in points]
if coords[0] != coords[-1]:
coords.append(coords[0])
if len(coords) < 4:
return None
poly = Polygon(coords)
if not poly.is_valid:
poly = poly.buffer(0) # attempt to fix self-intersections
return poly if poly.is_valid and not poly.is_empty else None
def polygon_area_m2(poly: Polygon) -> float:
"""Approximate area in m² from a WGS84 polygon.
Uses a simple cos(lat) correction. Accurate enough for city-scale zones.
"""
centroid = poly.centroid
lat_rad = math.radians(centroid.y)
# Degrees to meters conversion factors
m_per_deg_lat = 111_320.0
m_per_deg_lon = 111_320.0 * math.cos(lat_rad)
# Scale polygon to meters and compute area
coords = list(poly.exterior.coords)
scaled = [(x * m_per_deg_lon, y * m_per_deg_lat) for x, y in coords]
return abs(Polygon(scaled).area)
def wkt_to_geojson(wkt_str: str) -> dict:
"""Convert WKT polygon string to GeoJSON dict."""
geom = wkt.loads(wkt_str)
return mapping(geom)
def process_activity(
user_id: uuid.UUID,
activity_type: str,
started_at: datetime,
ended_at: datetime,
gps_track: list[GpsPoint],
session: Session,
) -> ActivityDetail:
"""Full geo pipeline: filter -> simplify -> validate loop -> build polygon -> persist.
Returns an ActivityDetail with zone info if a zone was created.
"""
# 1. Create activity record
distance = compute_distance_m(gps_track)
activity = Activity(
user_id=user_id,
type=activity_type,
started_at=started_at,
ended_at=ended_at,
distance_m=distance,
raw_gpx=json.dumps([{"lat": p.lat, "lon": p.lon} for p in gps_track]),
status="pending",
)
session.add(activity)
session.flush() # get activity.id
zone_id = None
area_m2 = None
try:
# 2. Filter outliers (skip points with hdop > 4 if present)
filtered = [p for p in gps_track if p.hdop is None or p.hdop <= 4]
if len(filtered) < 10:
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 3. Simplify
simplified = simplify_track(filtered, DOUGLAS_PEUCKER_EPSILON_M)
# 4. Check loop closure
if not is_loop_closed(simplified, LOOP_CLOSURE_RADIUS_M):
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 5. Build polygon
poly = build_polygon(simplified)
if poly is None:
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 6. Validate area
area_m2 = polygon_area_m2(poly)
if area_m2 < MIN_ZONE_AREA_M2:
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 7. Persist zone
zone = Zone(
owner_id=user_id,
activity_id=activity.id,
polygon_wkt=poly.wkt,
area_m2=area_m2,
)
session.add(zone)
activity.status = "completed"
session.commit()
session.refresh(zone)
session.refresh(activity)
zone_id = zone.id
area_m2 = zone.area_m2
except Exception:
activity.status = "failed"
session.commit()
return _to_detail(activity, zone_id, area_m2)
def _to_detail(
activity: Activity,
zone_id: uuid.UUID | None,
area_m2: float | None,
) -> ActivityDetail:
return ActivityDetail(
id=activity.id,
user_id=activity.user_id,
type=activity.type,
started_at=activity.started_at,
ended_at=activity.ended_at,
distance_m=activity.distance_m,
status=activity.status,
created_at=activity.created_at,
zone_id=zone_id,
area_m2=area_m2,
)