This commit is contained in:
Redsandy
2026-03-14 18:48:57 +03:00
parent 1d1350fc13
commit 3ea4fb4771
40 changed files with 2150 additions and 0 deletions

0
app/services/__init__.py Normal file
View File

23
app/services/capture.py Normal file
View File

@@ -0,0 +1,23 @@
"""Zone capture logic — detects overlap and transfers territory.
TODO: Implement in Phase 3 (zone capture mechanics).
"""
import uuid
from sqlmodel import Session
def process_captures(
attacker_zone_id: uuid.UUID,
attacker_user_id: uuid.UUID,
session: Session,
) -> list[dict]:
"""Check if the new zone overlaps any friend zones and apply capture rules.
Returns a list of capture events (zone_id, victim_id, captured_area).
Not yet implemented — returns empty list for MVP Phase 1.
"""
# Phase 3: query friend zones, compute ST_Intersection with Shapely,
# apply defense level rules, update ownership, create ZoneHistory records.
return []

View File

@@ -0,0 +1,211 @@
"""Geo pipeline — processes raw GPS tracks into zone polygons.
All geo math is done via Shapely (pure Python), so it works
with both SQLite (dev) and PostgreSQL (prod).
"""
import json
import math
import uuid
from datetime import datetime, timezone
from rdp import rdp
from shapely.geometry import Polygon, mapping
from shapely import wkt
from sqlmodel import Session
from app.config import (
MIN_ZONE_AREA_M2,
LOOP_CLOSURE_RADIUS_M,
DOUGLAS_PEUCKER_EPSILON_M,
)
from app.models.activity import Activity
from app.models.zone import Zone
from app.schemas.activity import GpsPoint, ActivityDetail
def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Haversine distance in meters between two lat/lon points."""
R = 6_371_000 # Earth radius in meters
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = (
math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
)
return 2 * R * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def compute_distance_m(points: list[GpsPoint]) -> float:
"""Total distance in meters along a GPS track."""
total = 0.0
for i in range(1, len(points)):
total += haversine_m(
points[i - 1].lat, points[i - 1].lon, points[i].lat, points[i].lon
)
return total
def simplify_track(points: list[GpsPoint], epsilon: float) -> list[GpsPoint]:
"""Douglas-Peucker simplification on lat/lon coordinates.
epsilon is in approximate degrees; we convert from meters.
~0.00001 degree ≈ 1.11 m at the equator.
"""
coords = [(p.lat, p.lon) for p in points]
eps_deg = epsilon / 111_000 # rough meters->degrees
simplified = rdp(coords, epsilon=eps_deg)
# Rebuild GpsPoint list (drop timestamps for simplified points)
return [GpsPoint(lat=c[0], lon=c[1]) for c in simplified]
def is_loop_closed(points: list[GpsPoint], radius_m: float) -> bool:
"""Check if start and end points are within radius_m meters."""
if len(points) < 4:
return False
return (
haversine_m(points[0].lat, points[0].lon, points[-1].lat, points[-1].lon)
<= radius_m
)
def build_polygon(points: list[GpsPoint]) -> Polygon | None:
"""Build a Shapely Polygon from GPS points.
Uses (lon, lat) ordering for GeoJSON/WKT compatibility.
Closes the ring if needed.
"""
coords = [(p.lon, p.lat) for p in points]
if coords[0] != coords[-1]:
coords.append(coords[0])
if len(coords) < 4:
return None
poly = Polygon(coords)
if not poly.is_valid:
poly = poly.buffer(0) # attempt to fix self-intersections
return poly if poly.is_valid and not poly.is_empty else None
def polygon_area_m2(poly: Polygon) -> float:
"""Approximate area in m² from a WGS84 polygon.
Uses a simple cos(lat) correction. Accurate enough for city-scale zones.
"""
centroid = poly.centroid
lat_rad = math.radians(centroid.y)
# Degrees to meters conversion factors
m_per_deg_lat = 111_320.0
m_per_deg_lon = 111_320.0 * math.cos(lat_rad)
# Scale polygon to meters and compute area
coords = list(poly.exterior.coords)
scaled = [(x * m_per_deg_lon, y * m_per_deg_lat) for x, y in coords]
return abs(Polygon(scaled).area)
def wkt_to_geojson(wkt_str: str) -> dict:
"""Convert WKT polygon string to GeoJSON dict."""
geom = wkt.loads(wkt_str)
return mapping(geom)
def process_activity(
user_id: uuid.UUID,
activity_type: str,
started_at: datetime,
ended_at: datetime,
gps_track: list[GpsPoint],
session: Session,
) -> ActivityDetail:
"""Full geo pipeline: filter -> simplify -> validate loop -> build polygon -> persist.
Returns an ActivityDetail with zone info if a zone was created.
"""
# 1. Create activity record
distance = compute_distance_m(gps_track)
activity = Activity(
user_id=user_id,
type=activity_type,
started_at=started_at,
ended_at=ended_at,
distance_m=distance,
raw_gpx=json.dumps([{"lat": p.lat, "lon": p.lon} for p in gps_track]),
status="pending",
)
session.add(activity)
session.flush() # get activity.id
zone_id = None
area_m2 = None
try:
# 2. Filter outliers (skip points with hdop > 4 if present)
filtered = [p for p in gps_track if p.hdop is None or p.hdop <= 4]
if len(filtered) < 10:
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 3. Simplify
simplified = simplify_track(filtered, DOUGLAS_PEUCKER_EPSILON_M)
# 4. Check loop closure
if not is_loop_closed(simplified, LOOP_CLOSURE_RADIUS_M):
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 5. Build polygon
poly = build_polygon(simplified)
if poly is None:
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 6. Validate area
area_m2 = polygon_area_m2(poly)
if area_m2 < MIN_ZONE_AREA_M2:
activity.status = "failed"
session.commit()
return _to_detail(activity, None, None)
# 7. Persist zone
zone = Zone(
owner_id=user_id,
activity_id=activity.id,
polygon_wkt=poly.wkt,
area_m2=area_m2,
)
session.add(zone)
activity.status = "completed"
session.commit()
session.refresh(zone)
session.refresh(activity)
zone_id = zone.id
area_m2 = zone.area_m2
except Exception:
activity.status = "failed"
session.commit()
return _to_detail(activity, zone_id, area_m2)
def _to_detail(
activity: Activity,
zone_id: uuid.UUID | None,
area_m2: float | None,
) -> ActivityDetail:
return ActivityDetail(
id=activity.id,
user_id=activity.user_id,
type=activity.type,
started_at=activity.started_at,
ended_at=activity.ended_at,
distance_m=activity.distance_m,
status=activity.status,
created_at=activity.created_at,
zone_id=zone_id,
area_m2=area_m2,
)

View File

@@ -0,0 +1,29 @@
"""Push notifications via Firebase Cloud Messaging.
TODO: Implement in Phase 2.
"""
import uuid
def send_zone_captured(
victim_user_id: uuid.UUID, attacker_name: str, area_m2: float
) -> None:
"""Notify a user that their zone was partially captured."""
# Phase 2: Use firebase-admin SDK to send FCM push
pass
def send_leaderboard_change(user_id: uuid.UUID, new_rank: int) -> None:
"""Notify a user that their leaderboard position changed."""
pass
def send_streak_at_risk(user_id: uuid.UUID, streak_days: int) -> None:
"""Notify a user that their streak is about to expire."""
pass
def send_friend_joined(user_id: uuid.UUID, friend_name: str) -> None:
"""Notify a user that a friend just joined GeoZoner."""
pass

22
app/services/scoring.py Normal file
View File

@@ -0,0 +1,22 @@
"""Daily scoring engine — calculates points per user.
TODO: Implement in Phase 2 (scoring system).
"""
from sqlmodel import Session
def calculate_daily_scores(session: Session) -> int:
"""Run daily score calculation for all users.
Returns the number of users scored.
Not yet implemented — will be triggered by a cron job or manual call.
"""
# Phase 2: For each user, compute:
# - base_pts: 1 pt per 1,000 m² zone area held
# - defense bonus: +20% per defense level above 1
# - capture bonus: +50 pts per zone captured that day
# - streak bonus: x(1 + streak_days * 0.05), capped at 2x
# - activity bonus: +10 pts per km covered
return 0