#!/usr/bin/env python3 """ Frida data receiver server. Receives captured HTTP responses from the Frida hook and saves them. Also provides a simple API to query captured data. """ import json import frida import sys import os import time from pathlib import Path from http.server import HTTPServer, BaseHTTPRequestHandler import threading CAPTURE_DIR = Path(__file__).parent / "captures" CAPTURE_DIR.mkdir(exist_ok=True) captured_data = [] def on_message(message, data): """Handle messages from Frida hook.""" if message["type"] == "send": payload = message["payload"] captured_data.append(payload) url = payload.get("url", "unknown") status = payload.get("status", "?") method = payload.get("method", "?") body_len = len(payload.get("responseBody", "") or "") print(f"\n{'='*60}") print(f"[CAPTURED] {method} {url}") print(f" Status: {status} | Response: {body_len} chars") # Save to file ts = time.strftime("%Y%m%d_%H%M%S") domain = url.split("/")[2] if "/" in url else "unknown" filename = f"{ts}_{domain}_{method}_{status}.json" filepath = CAPTURE_DIR / filename with open(filepath, "w") as f: json.dump(payload, f, indent=2) print(f" Saved: {filepath.name}") # Print response body preview resp_body = payload.get("responseBody", "") if resp_body: try: parsed = json.loads(resp_body) print(f" Response preview: {json.dumps(parsed, indent=2)[:500]}") except: print(f" Response preview: {resp_body[:300]}") elif message["type"] == "error": print(f"[ERROR] {message['stack']}") def attach_to_app(package_name, script_path): """Attach Frida to a running app and load the hook script.""" device = frida.get_usb_device() print(f"[*] Connected to: {device.name}") # Try to attach to running process first try: session = device.attach(package_name) print(f"[*] Attached to running process: {package_name}") except frida.ProcessNotFoundError: # Spawn it print(f"[*] Spawning: {package_name}") pid = device.spawn([package_name]) session = device.attach(pid) device.resume(pid) print(f"[*] Spawned and attached: PID {pid}") with open(script_path) as f: script_code = f.read() script = session.create_script(script_code) script.on("message", on_message) script.load() print(f"[*] Script loaded. Intercepting traffic for {package_name}...") return session def main(): if len(sys.argv) < 2: print("Usage: python server.py [hook_script.js]") print("") print("Examples:") print(" python server.py com.spirit.customerapp") print(" python server.py com.delta.mobile.android") print(" python server.py com.united.mobile.android") print("") print("Default hook script: okhttp_hook.js") sys.exit(1) package_name = sys.argv[1] script_path = sys.argv[2] if len(sys.argv) > 2 else str(Path(__file__).parent / "okhttp_hook.js") print(f"[*] Target: {package_name}") print(f"[*] Script: {script_path}") print(f"[*] Captures: {CAPTURE_DIR}") print("") session = attach_to_app(package_name, script_path) print("") print("[*] Ready. Interact with the app to trigger API calls.") print("[*] Press Ctrl+C to stop.") print("") try: sys.stdin.read() except KeyboardInterrupt: print("\n[*] Stopping...") session.detach() # Save all captured data if captured_data: summary_path = CAPTURE_DIR / f"all_captures_{time.strftime('%Y%m%d_%H%M%S')}.json" with open(summary_path, "w") as f: json.dump(captured_data, f, indent=2) print(f"[*] Saved {len(captured_data)} captures to {summary_path}") if __name__ == "__main__": main()