import os import time import json import random import logging from urllib.parse import urlparse from dotenv import load_dotenv import paho.mqtt.client as mqtt from supabase import create_client, Client # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Load environment variables from the sibling directory # Assuming this script is in server/mqtt-simulator/main.py and .env is in server/mqtt-simulator/.env current_dir = os.path.dirname(os.path.abspath(__file__)) env_path = os.path.join(current_dir, '.env') if os.path.exists(env_path): logger.info(f"Loading .env from: {env_path}") load_dotenv(env_path, override=True) else: logger.warning(f".env file not found at {env_path}") # Get MQTT configuration mqtt_url_str = os.getenv('MQTT_URL') if mqtt_url_str: mqtt_url_str = mqtt_url_str.strip() mqtt_username = os.getenv('MQTT_USERNAME') if mqtt_username: mqtt_username = mqtt_username.strip() mqtt_password = os.getenv('MQTT_PASSWORD') if mqtt_password: mqtt_password = mqtt_password.strip() if not mqtt_url_str: logger.error("MQTT_URL not found in environment variables") # Fallback or exit? Let's exit to be safe as we need a broker. exit(1) # Parse MQTT URL try: parsed_url = urlparse(mqtt_url_str) mqtt_host = parsed_url.hostname mqtt_port = parsed_url.port or 1883 mqtt_protocol = parsed_url.scheme except Exception as e: logger.error(f"Failed to parse MQTT_URL: {e}") exit(1) logger.info(f"MQTT Configuration: Host={mqtt_host}, Port={mqtt_port}, Protocol={mqtt_protocol}") # Mock Configuration def str_to_bool(v): return str(v).strip().lower() in ("true", "1", "t", "yes") enable_watch_mock = str_to_bool(os.getenv('WATCH_MOCK', 'true')) enable_ap_mock = str_to_bool(os.getenv('AP_REPORT_MOCK', 'false')) logger.info(f"Mock Configuration: WATCH_MOCK={enable_watch_mock}, AP_REPORT_MOCK={enable_ap_mock}") # Supabase Configuration & Device Fetching device_ids = [] if enable_watch_mock: supabase_url = os.getenv('SUPABASE_URL') supabase_key = os.getenv('SUPABASE_SERVICE_ROLE_KEY') if supabase_url: supabase_url = supabase_url.strip() if supabase_key: supabase_key = supabase_key.strip() if supabase_url and supabase_key: try: logger.info("Fetching devices from Supabase...") supabase: Client = create_client(supabase_url, supabase_key) # Fetch all devices and filter in Python because watch_id is varchar # '10000' < '900' in string comparison, so .gt('watch_id', 900) fails for 5-digit IDs response = supabase.table('ak_devices').select('watch_id').execute() if response.data: device_ids = [] for d in response.data: wid = d.get('watch_id') # Check if watch_id is valid number and > 900 if wid and str(wid).isdigit() and int(wid) > 900: device_ids.append(wid) logger.info(f"Fetched {len(device_ids)} devices from Supabase (filtered > 900): {device_ids}") # exit(0) else: logger.warning("No devices found in Supabase with watch_id > 900") except Exception as e: logger.error(f"Error fetching from Supabase: {e}") else: logger.warning("SUPABASE_URL or SUPABASE_SERVICE_ROLE_KEY not set in .env") if not device_ids: device_ids = ["watch_sim_001"] logger.info(f"Using default device list: {device_ids}") # Initialize MQTT Client client_id = f"python-simulator-{int(time.time())}" # Handle paho-mqtt v2.0.0+ breaking changes if hasattr(mqtt, 'CallbackAPIVersion'): client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id=client_id) else: client = mqtt.Client(client_id=client_id) if mqtt_username and mqtt_password: client.username_pw_set(mqtt_username, mqtt_password) def on_connect(client, userdata, flags, rc, properties=None): # paho-mqtt v2 passes properties as well, but we might be in v1 or v2. # The signature for v2 is (client, userdata, flags, rc, properties) # The signature for v1 is (client, userdata, flags, rc) # However, the library handles the callback signature matching if we use the correct API version enum. # If we used VERSION2, rc is a ReasonCode object, which can be compared to 0 or checked with .is_failure if hasattr(rc, 'value'): # It's an object in v2 rc_val = rc.value else: rc_val = rc if rc_val == 0: logger.info("Connected to MQTT Broker!") else: logger.error(f"Failed to connect, return code {rc}") client.on_connect = on_connect try: logger.info(f"Connecting to {mqtt_host}:{mqtt_port}...") client.connect(mqtt_host, mqtt_port, 60) client.loop_start() except Exception as e: logger.error(f"Connection failed: {e}") exit(1) # Simulation loop try: print(device_ids) # exit(0) while True: if enable_watch_mock: # Iterate over all device IDs for current_device_id in device_ids: # Simulate Watch Data # Topic: watch/watch/data watch_data = { "ap": "WDDGW20000009", "battery": random.randint(700, 800), "ch": 22, "cmd": 4101, "heartrate": random.randint(60, 100), "id": int(current_device_id), "md": 109, "recvtime": int(time.time() * 1000), "rssi": random.randint(-90, -50), "spo2": random.randint(95, 100), "steps": random.randint(0, 100), "time": int(time.time()), "ver": 258 } topic_watch = "watch/watch/data" client.publish(topic_watch, json.dumps(watch_data)) logger.info(f"Published to {topic_watch} for {len(device_ids)} devices") if enable_ap_mock: # Simulate AP Report # Topic: watch/ap/report ap_report = { "apMac": "AA:BB:CC:DD:EE:FF", "timestamp": int(time.time() * 1000), "status": "online", "clients": [ {"mac": "11:22:33:44:55:66", "rssi": random.randint(-80, -40)} ], "uptime": int(time.time()) } topic_ap = "watch/ap/report" client.publish(topic_ap, json.dumps(ap_report)) logger.info(f"Published to {topic_ap}") time.sleep(5) except KeyboardInterrupt: logger.info("Stopping simulator...") client.loop_stop() client.disconnect()