#!/usr/bin/env python3 """ EagleEye Local Scanner Agent v1.0 사용자 PC에서 실행되어 공유기 정보를 수집하고 분석 서버로 전송합니다. 표준 라이브러리만 사용 — 별도 설치 불필요. """ from __future__ import annotations import json import base64 import socket import platform import subprocess import sys import os import webbrowser from datetime import datetime import urllib.request import urllib.error import urllib.parse from concurrent.futures import ThreadPoolExecutor, as_completed # ── 설정 ────────────────────────────────────────────────────────────────────── DEFAULT_SERVER_URL = os.environ.get("EAGLEEYE_SERVER", "http://localhost:8000/api/diagnose") SESSION_ID = os.environ.get("EAGLEEYE_SESSION_ID", "default") ADMIN_PORTS = [80, 8080, 443, 8443] SCAN_TIMEOUT = 2 # 포트 연결 타임아웃 (초) HTTP_TIMEOUT = 3 # HTTP 배너 수집 타임아웃 (초) AGENT_VERSION = "1.0.0" # ────────────────────────────────────────────────────────────────────────────── def get_default_gateway() -> str | None: """OS별 기본 게이트웨이(공유기 IP) 탐색""" system = platform.system() try: if system == "Windows": out = subprocess.check_output( ["ipconfig"], encoding="cp949", errors="replace" ) for line in out.splitlines(): if "기본 게이트웨이" in line or "Default Gateway" in line: ip = line.split(":")[-1].strip() if ip and ip != "": return ip elif system == "Darwin": out = subprocess.check_output( ["netstat", "-nr"], encoding="utf-8", errors="replace" ) for line in out.splitlines(): cols = line.split() if cols and cols[0] in ("default", "0.0.0.0"): if len(cols) >= 2: return cols[1] elif system == "Linux": out = subprocess.check_output( ["ip", "route", "show", "default"], encoding="utf-8", errors="replace" ) parts = out.split() if "via" in parts: return parts[parts.index("via") + 1] except Exception: pass # Fallback: 소켓 연결로 로컬 IP 확인 후 마지막 옥텟 → .1 추정 try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() octets = local_ip.split(".") octets[-1] = "1" return ".".join(octets) except Exception: return None def get_local_ip() -> str: """현재 PC의 로컬 IP 주소 탐색""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "unknown" def check_port(host: str, port: int) -> bool: """TCP 포트 개방 여부 확인""" try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(SCAN_TIMEOUT) result = sock.connect_ex((host, port)) sock.close() return result == 0 except Exception: return False def check_http_banner(host: str, port: int) -> dict | None: """열린 포트에서 HTTP 응답 헤더 및 본문 일부 수집 (공유기 모델 힌트)""" scheme = "https" if port in (443, 8443) else "http" url = f"{scheme}://{host}:{port}/" try: req = urllib.request.Request( url, headers={"User-Agent": f"EagleEye/{AGENT_VERSION}"}, ) # HTTPS 인증서 무시 (내부망이므로 자체서명 인증서 허용) if scheme == "https": import ssl ctx = ssl.create_default_context() ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE resp = urllib.request.urlopen(req, timeout=HTTP_TIMEOUT, context=ctx) else: resp = urllib.request.urlopen(req, timeout=HTTP_TIMEOUT) headers = dict(resp.headers) content = resp.read(512).decode("utf-8", errors="ignore") return { "status_code": resp.status, "server": headers.get("Server", ""), "www_authenticate": headers.get("WWW-Authenticate", ""), "content_preview": content[:300], } except urllib.error.HTTPError as e: return { "status_code": e.code, "server": dict(e.headers).get("Server", ""), "www_authenticate": dict(e.headers).get("WWW-Authenticate", ""), "content_preview": "", } except Exception: return None def detect_router_brand(port_details: dict) -> str: """배너 정보로 공유기 브랜드 추정""" keywords = { "ipTIME": ["iptime", "ipTIME"], "ASUS": ["asus", "ASUS", "RT-"], "TP-Link": ["tp-link", "TP-LINK", "Archer"], "KT": ["kt.com", "KT 허브", "olleh"], "SK": ["sk broadband", "SK브로드밴드"], "Linksys": ["linksys", "Linksys"], "Netgear": ["netgear", "NETGEAR"], "D-Link": ["d-link", "D-Link"], } combined = json.dumps(port_details, ensure_ascii=False).lower() for brand, hints in keywords.items(): for hint in hints: if hint.lower() in combined: return brand return "알 수 없음" def check_upnp(gateway_ip: str) -> dict: """SSDP 패킷으로 UPnP 활성화 여부 직접 탐지""" msg = ( "M-SEARCH * HTTP/1.1\r\n" "HOST: 239.255.255.250:1900\r\n" 'MAN: "ssdp:discover"\r\n' "ST: urn:schemas-upnp-org:device:InternetGatewayDevice:1\r\n" "MX: 2\r\n\r\n" ) sock = None try: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) sock.settimeout(3) sock.sendto(msg.encode(), ("239.255.255.250", 1900)) sock.sendto(msg.encode(), (gateway_ip, 1900)) data, addr = sock.recvfrom(2048) return { "enabled": True, "from": addr[0], "response": data.decode("utf-8", errors="ignore")[:300], } except socket.timeout: return {"enabled": False} except Exception as e: return {"enabled": None, "error": str(e)[:60]} finally: if sock: sock.close() def check_default_credentials(gateway_ip: str, open_ports: list) -> dict: """알려진 기본 관리자 계정으로 로그인 취약점 탐지""" DEFAULTS = [ ("admin", "admin"), ("admin", ""), ("admin", "password"), ("admin", "1234"), ("root", "admin"), ("root", ""), ("user", "user"), ] # ipTIME 등 폼 기반 인증 공유기는 Basic Auth 헤더를 무시하고 로그인 페이지를 200으로 반환함 # → 200 응답이어도 이 키워드가 있으면 로그인 페이지로 판단 (취약 아님) LOGIN_KEYWORDS = ( "login", "password", "비밀번호", "로그인", "passwd", "sign in", "username", "id=", "type=\"password\"", "type='password'", ) result = {"tested": False, "no_auth_required": False, "vulnerable": False, "credentials": None} http_ports = [p for p in open_ports if p in (80, 8080)] if not http_ports: return result result["tested"] = True for port in http_ports: url = f"http://{gateway_ip}:{port}/" # 1. 인증 없이 접근 가능 여부 — 2KB 읽어 로그인 폼 키워드 확인 unauth_content = "" try: req = urllib.request.Request(url, headers={"User-Agent": f"EagleEye/{AGENT_VERSION}"}) resp = urllib.request.urlopen(req, timeout=3) if resp.status == 200: unauth_content = resp.read(2048).decode("utf-8", errors="ignore").lower() if not any(k in unauth_content for k in LOGIN_KEYWORDS): result["no_auth_required"] = True result["vulnerable"] = True return result except Exception: pass # 2. HTTP Basic Auth 기본값 시도 # 200이어도 여전히 로그인 페이지 키워드 → 폼 인증 방식 공유기 (ipTIME 등) → 취약 아님 for user, pwd in DEFAULTS: try: creds = base64.b64encode(f"{user}:{pwd}".encode()).decode() req = urllib.request.Request(url, headers={ "User-Agent": f"EagleEye/{AGENT_VERSION}", "Authorization": f"Basic {creds}", }) resp = urllib.request.urlopen(req, timeout=3) if resp.status == 200: content = resp.read(2048).decode("utf-8", errors="ignore").lower() if any(k in content for k in LOGIN_KEYWORDS): # 여전히 로그인 페이지 → Basic Auth 무시하는 방식 → 오탐 방지 continue result["vulnerable"] = True result["credentials"] = f"{user}/{pwd if pwd else '(빈 비밀번호)'}" return result except urllib.error.HTTPError as e: if e.code == 401: continue except Exception: pass return result def _ping_once(ip: str, system: str) -> None: """단일 IP에 ping 1회 — ARP 캐시를 채우기 위한 용도. 결과값은 사용하지 않습니다.""" try: if system == "Windows": subprocess.call( ["ping", "-n", "1", "-w", "800", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2, ) else: subprocess.call( ["ping", "-c", "1", "-W", "1", ip], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2, ) except Exception: pass def scan_local_devices(gateway_ip: str) -> dict: """병렬 ping sweep → ARP 캐시 읽기로 서브넷 전체 기기 탐지. MAC 주소는 수집하지 않습니다.""" prefix = ".".join(gateway_ip.split(".")[:3]) + "." system = platform.system() # 서브넷 전체에 병렬 ping — ARP 캐시를 채우는 것이 목적 targets = [f"{prefix}{i}" for i in range(1, 255)] with ThreadPoolExecutor(max_workers=100) as pool: futures = [pool.submit(_ping_once, ip, system) for ip in targets] for f in as_completed(futures): pass # 결과 불필요, 완료 대기만 # ARP 캐시 읽기 devices = [] try: if system in ("Darwin", "Linux"): out = subprocess.check_output( ["arp", "-a"], encoding="utf-8", errors="replace", timeout=5 ) for line in out.splitlines(): if "(" in line and ")" in line: ip = line[line.find("(")+1:line.find(")")] if ip.startswith(prefix) and ip != gateway_ip and not ip.endswith(".255"): if "ff:ff:ff:ff:ff:ff" in line.lower() or "(incomplete)" in line.lower(): continue devices.append({"ip": ip}) elif system == "Windows": out = subprocess.check_output( ["arp", "-a"], encoding="cp949", errors="replace", timeout=5 ) for line in out.splitlines(): parts = line.split() if parts and parts[0].startswith(prefix) and parts[0] != gateway_ip: devices.append({"ip": parts[0]}) except Exception as e: return {"device_count": -1, "devices": [], "error": str(e)[:60]} return {"device_count": len(devices), "devices": devices[:50]} def get_wifi_ssid() -> str: """현재 연결된 Wi-Fi 이름(SSID) 확인""" system = platform.system() if system == "Darwin": # 1) networksetup — en0 ~ en5 순서로 시도 for iface in ("en0", "en1", "en2", "en3", "en4", "en5"): try: out = subprocess.check_output( ["networksetup", "-getairportnetwork", iface], encoding="utf-8", errors="replace", timeout=5, stderr=subprocess.DEVNULL, ) if "Current Wi-Fi Network:" in out: ssid = out.split("Current Wi-Fi Network:")[-1].strip() if ssid: return ssid except Exception: continue # 2) airport 유틸리티 airport = ("/System/Library/PrivateFrameworks/Apple80211.framework" "/Versions/Current/Resources/airport") try: out = subprocess.check_output( [airport, "-I"], encoding="utf-8", errors="replace", timeout=5, stderr=subprocess.DEVNULL, ) for line in out.splitlines(): if " SSID:" in line and "BSSID:" not in line: ssid = line.split("SSID:")[-1].strip() if ssid: return ssid except Exception: pass # 3) system_profiler (느리지만 확실) try: out = subprocess.check_output( ["system_profiler", "SPAirPortDataType"], encoding="utf-8", errors="replace", timeout=10, stderr=subprocess.DEVNULL, ) in_current = False for line in out.splitlines(): stripped = line.strip() if "Current Network Information:" in stripped: in_current = True elif in_current and stripped and not stripped.startswith("PHY Mode"): # 첫 번째 들여쓰기 항목이 SSID 이름 ssid = stripped.rstrip(":") if ssid: return ssid except Exception: pass elif system == "Windows": try: out = subprocess.check_output( ["netsh", "wlan", "show", "interfaces"], encoding="cp949", errors="replace", timeout=5, ) for line in out.splitlines(): if "SSID" in line and "BSSID" not in line and ":" in line: ssid = line.split(":", 1)[-1].strip() if ssid: return ssid except Exception: pass elif system == "Linux": try: out = subprocess.check_output( ["iwconfig"], encoding="utf-8", errors="replace", timeout=5, stderr=subprocess.DEVNULL, ) for line in out.splitlines(): if "ESSID:" in line: ssid = line.split("ESSID:")[-1].strip().strip('"') if ssid and ssid != "off/any": return ssid except Exception: pass return "알 수 없음" def detect_network_type(gateway_ip: str, wifi_ssid: str) -> str: """게이트웨이 IP 대역으로 네트워크 유형 판별""" if gateway_ip.startswith("172.20.10."): return "iPhone 핫스팟" if gateway_ip.startswith("192.168.43."): return "Android 모바일 핫스팟" if gateway_ip.startswith("192.168.") or gateway_ip.startswith("10."): return "홈/오피스 네트워크" return "네트워크" def check_dns() -> dict: """시스템 DNS 서버 설정 확인 및 이상 탐지""" KNOWN_SUSPICIOUS = {"85.255.112.0", "85.255.120.0", "194.165.16.11", "91.108.4.0"} dns_servers = [] try: system = platform.system() if system == "Darwin": out = subprocess.check_output( ["scutil", "--dns"], encoding="utf-8", errors="replace", timeout=5 ) seen: set[str] = set() for line in out.splitlines(): line = line.strip() if line.startswith("nameserver["): ip = line.split(":")[-1].strip() if ip and ip not in seen: seen.add(ip) dns_servers.append(ip) elif system == "Linux": with open("/etc/resolv.conf") as f: for line in f: if line.startswith("nameserver"): parts = line.split() if len(parts) >= 2: dns_servers.append(parts[1]) elif system == "Windows": out = subprocess.check_output( ["ipconfig", "/all"], encoding="cp949", errors="replace", timeout=5 ) for line in out.splitlines(): if "DNS" in line and ":" in line: ip = line.split(":")[-1].strip() if ip and ip[0].isdigit(): dns_servers.append(ip) except Exception: pass suspicious = [d for d in dns_servers if d in KNOWN_SUSPICIOUS] return { "servers": list(dict.fromkeys(dns_servers))[:5], "suspicious": suspicious, "is_suspicious": len(suspicious) > 0, } def scan_router(gateway_ip: str) -> tuple[list[int], dict]: """공유기 관리 포트 스캔 및 HTTP 배너 수집""" open_ports = [] port_details = {} for port in ADMIN_PORTS: label = f"포트 {port:5d}" if check_port(gateway_ip, port): open_ports.append(port) banner = check_http_banner(gateway_ip, port) port_details[str(port)] = banner or {} print(f" {label} ▶ 열림 [!]") else: print(f" {label} ▶ 닫힘") return open_ports, port_details # ── 사전 설문 ───────────────────────────────────────────────────────────────── SURVEY_QUESTIONS = [ { "key": "housing_type", "question": "주거 형태가 어떻게 되시나요?", "options": ["아파트", "빌라 / 연립주택", "단독주택", "오피스텔 / 기타"], "multi": False, }, { "key": "password_changed", "question": "공유기 관리자 비밀번호를 변경하신 적 있나요?\n (초기값: admin / admin)", "options": ["예, 변경했습니다", "아니오, 그대로입니다", "잘 모르겠습니다"], "multi": False, }, { "key": "firmware_updated", "question": "공유기 펌웨어(소프트웨어)를 최근에 업데이트하셨나요?", "options": ["예 (6개월 이내)", "아니오 / 해본 적 없습니다", "잘 모르겠습니다"], "multi": False, }, { "key": "remote_management", "question": "외부에서 공유기에 접속하는 '원격 관리' 기능을 켜두셨나요?", "options": ["예, 사용 중입니다", "아니오 / 꺼져 있습니다", "잘 모르겠습니다"], "multi": False, }, { "key": "smart_devices", "question": "집에서 사용 중인 스마트 기기를 모두 선택하세요.\n (쉼표로 구분, 예: 1,3,4)", "options": [ "아파트 월패드", "스마트 도어락", "홈 CCTV / IP 카메라", "스마트 TV", "스마트 콘센트 / 플러그", "없음", ], "multi": True, }, { "key": "guest_wifi", "question": "방문객(손님, 택배기사 등)에게 Wi-Fi 비밀번호를 알려주시나요?", "options": ["자주 공유합니다", "가끔 공유합니다", "거의 안 합니다"], "multi": False, }, ] def _ask_single(q: dict) -> str: """단일 선택 질문""" for i, opt in enumerate(q["options"], 1): print(f" {i}. {opt}") while True: try: raw = input(" → 번호 입력: ").strip() if raw == "": return "응답 없음" idx = int(raw) - 1 if 0 <= idx < len(q["options"]): return q["options"][idx] print(" 올바른 번호를 입력해 주세요.") except (ValueError, EOFError): return "응답 없음" def _ask_multi(q: dict) -> list[str]: """복수 선택 질문""" for i, opt in enumerate(q["options"], 1): print(f" {i}. {opt}") while True: try: raw = input(" → 번호 입력 (쉼표 구분, Enter=없음): ").strip() if raw == "": return ["응답 없음"] indices = [int(x.strip()) - 1 for x in raw.split(",")] selected = [q["options"][i] for i in indices if 0 <= i < len(q["options"])] if selected: return selected print(" 올바른 번호를 입력해 주세요.") except (ValueError, EOFError): return ["응답 없음"] def run_survey() -> dict: """스캔 전 사전 설문 실행 — 분석 정확도를 높이는 사용자 정보 수집""" print("\n" + "=" * 56) print(" 📋 사전 설문 (분석 정확도를 높이기 위한 질문입니다)") print(" 모르시면 Enter를 눌러 건너뛰어도 됩니다.") print("=" * 56) answers = {} total = len(SURVEY_QUESTIONS) for i, q in enumerate(SURVEY_QUESTIONS, 1): print(f"\n [{i}/{total}] {q['question']}") if q["multi"]: answers[q["key"]] = _ask_multi(q) else: answers[q["key"]] = _ask_single(q) print("\n 설문 완료. 스캔을 시작합니다.") return answers # ── 공유기 브랜드 ────────────────────────────────────────────────────────────── KNOWN_BRANDS = [ "ipTIME", "ASUS", "TP-Link", "KT 홈허브", "SK 브로드밴드", "Linksys", "Netgear", "D-Link", "기타 (직접 입력)", "모름 (건너뜀)", ] def ask_router_brand() -> str: """브랜드 자동 탐지 실패 시 사용자에게 직접 입력 받기""" print("\n ┌─────────────────────────────────────────────┐") print(" │ 공유기 브랜드를 자동으로 확인하지 못했습니다. │") print(" │ 공유기 본체 스티커를 확인한 뒤 선택해 주세요. │") print(" └─────────────────────────────────────────────┘") print() for i, brand in enumerate(KNOWN_BRANDS, 1): print(f" {i:2d}. {brand}") print() while True: try: raw = input(" 번호를 입력하세요 (Enter = 모름): ").strip() if raw == "": return "알 수 없음" idx = int(raw) - 1 if 0 <= idx < len(KNOWN_BRANDS): selected = KNOWN_BRANDS[idx] if selected == "기타 (직접 입력)": custom = input(" 브랜드명을 직접 입력하세요: ").strip() return custom if custom else "알 수 없음" if selected == "모름 (건너뜀)": return "알 수 없음" print(f" → '{selected}' 선택됨") return selected print(" 올바른 번호를 입력해 주세요.") except (ValueError, EOFError): return "알 수 없음" def collect_system_info() -> dict: """시스템 환경 정보 수집""" return { "os": platform.system(), "os_version": platform.version()[:80], "architecture": platform.machine(), "hostname": socket.gethostname(), "python_version": platform.python_version(), } def build_payload( gateway_ip: str, local_ip: str, open_ports: list[int], port_details: dict, router_brand: str, system_info: dict, survey: dict, upnp: dict | None = None, default_creds: dict | None = None, local_devices: dict | None = None, dns: dict | None = None, wifi_ssid: str = "알 수 없음", network_type: str = "네트워크", ) -> dict: """서버 전송용 JSON 페이로드 구성""" return { "scan_timestamp": datetime.now().isoformat(), "agent_version": AGENT_VERSION, "session_id": SESSION_ID, "network": { "gateway_ip": gateway_ip, "local_ip": local_ip, "open_admin_ports": open_ports, "port_details": port_details, "detected_brand": router_brand, "upnp": upnp or {}, "default_creds": default_creds or {}, "local_devices": local_devices or {}, "dns": dns or {}, "wifi_ssid": wifi_ssid, "network_type": network_type, }, "system": system_info, "survey": survey, "scan_config": { "checked_ports": ADMIN_PORTS, "timeout_seconds": SCAN_TIMEOUT, }, } def fetch_survey(server_url: str) -> dict: """서버에서 웹 설문 답변을 가져옵니다. 실패 시 빈 딕셔너리 반환.""" base = server_url.replace("/api/diagnose", "") try: query = urllib.parse.urlencode({"session_id": SESSION_ID}) req = urllib.request.Request( f"{base}/api/survey/pending?{query}", headers={"User-Agent": f"EagleEye-Agent/{AGENT_VERSION}"}, ) with urllib.request.urlopen(req, timeout=5) as resp: data = json.loads(resp.read().decode("utf-8")) if data: print(" 웹 설문 답변을 가져왔습니다.") return data except Exception: return {} def send_to_server(payload: dict, server_url: str) -> dict: """수집 데이터를 백엔드 서버로 POST 전송""" data = json.dumps(payload, ensure_ascii=False).encode("utf-8") req = urllib.request.Request( server_url, data=data, headers={ "Content-Type": "application/json; charset=utf-8", "User-Agent": f"EagleEye-Agent/{AGENT_VERSION}", }, method="POST", ) with urllib.request.urlopen(req, timeout=120) as resp: return json.loads(resp.read().decode("utf-8")) def save_offline(payload: dict) -> str: """서버 미연결 시 JSON을 로컬 파일로 저장""" ts = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"eagleeye_scan_{ts}.json" with open(filename, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) return filename # ── 메인 ────────────────────────────────────────────────────────────────────── def main(): server_url = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_SERVER_URL print("=" * 56) print(" EagleEye 공유기 보안 스캐너 v1.0") print(" 가정용 와이파이 보안점검 에이전트") print("=" * 56) # 1. 게이트웨이 탐색 print("\n[1/5] 공유기(게이트웨이) IP 탐색 중...") gateway_ip = get_default_gateway() if not gateway_ip: print(" ✗ 공유기 IP를 찾을 수 없습니다. 네트워크 연결을 확인하세요.") sys.exit(1) local_ip = get_local_ip() print(f" 공유기 IP : {gateway_ip}") print(f" 현재 PC IP: {local_ip}") # 2. 포트 스캔 print(f"\n[2/5] 관리자 포트 스캔 중... ({', '.join(str(p) for p in ADMIN_PORTS)})") open_ports, port_details = scan_router(gateway_ip) router_brand = detect_router_brand(port_details) print(f"\n 탐지된 공유기 브랜드: {router_brand}") print(f" 열린 포트: {open_ports if open_ports else '없음'}") # 3. 보안 설정 직접 탐지 print("\n[3/5] 보안 설정 직접 탐지 중...") wifi_ssid = get_wifi_ssid() network_type = detect_network_type(gateway_ip, wifi_ssid) print(f" Wi-Fi 이름 ▶ {wifi_ssid} ({network_type})") if "핫스팟" in network_type: print(" ⚠️ 핫스팟 연결이 감지되었습니다.") print(" 집 Wi-Fi에 연결한 뒤 다시 실행하면 정확한 공유기 점검이 가능합니다.") upnp_result = check_upnp(gateway_ip) if upnp_result.get("enabled") is True: print(" UPnP ▶ 활성화됨 [!]") elif upnp_result.get("enabled") is False: print(" UPnP ▶ 비활성화 (안전)") else: print(" UPnP ▶ 탐지 불가") creds_result = check_default_credentials(gateway_ip, open_ports) if creds_result.get("no_auth_required"): print(" 기본 비밀번호 ▶ 인증 없이 접근 가능! [!!]") elif creds_result.get("vulnerable"): print(f" 기본 비밀번호 ▶ 기본값으로 로그인됨! [{creds_result.get('credentials')}] [!!]") elif creds_result.get("tested"): print(" 기본 비밀번호 ▶ 기본값 로그인 차단 (양호)") else: print(" 기본 비밀번호 ▶ HTTP 포트 없어 테스트 생략") print(" 로컬 기기 ▶ 서브넷 스캔 중... (최대 3초)") devices_result = scan_local_devices(gateway_ip) count = devices_result.get("device_count", -1) if count >= 0: print(f" 로컬 기기 ▶ {count}대 발견") for d in devices_result.get("devices", []): print(f" - {d['ip']}") else: print(" 로컬 기기 ▶ 탐지 불가") dns_result = check_dns() servers = ", ".join(dns_result.get("servers", [])) or "알 수 없음" if dns_result.get("is_suspicious"): print(f" DNS 서버 ▶ 의심 서버 탐지! [{servers}] [!!]") else: print(f" DNS 서버 ▶ {servers}") # 4. 시스템 정보 수집 print("\n[4/5] 시스템 환경 수집 중...") system_info = collect_system_info() print(f" OS: {system_info['os']} / {system_info['architecture']}") # 5. 웹 설문 수신 + 페이로드 구성 + 서버 전송 print(f"\n[5/5] 분석 서버로 전송 중...\n → {server_url}") survey = fetch_survey(server_url) # 브랜드 미탐지 시 — 웹 설문 답변만 사용합니다. # 웹 플로우에서는 이미 질문한 내용을 터미널에서 다시 묻지 않습니다. if router_brand == "알 수 없음": survey_brand = survey.get("router_brand", "") if survey_brand and survey_brand not in ("기타/모름", "기타 / 모름", "알 수 없음", ""): router_brand = survey_brand print(f" 웹 설문 브랜드 사용: {router_brand}") else: print(" 공유기 브랜드: 웹 설문 값 없음 — 알 수 없음으로 진행") payload = build_payload( gateway_ip, local_ip, open_ports, port_details, router_brand, system_info, survey, upnp=upnp_result, default_creds=creds_result, local_devices=devices_result, dns=dns_result, wifi_ssid=wifi_ssid, network_type=network_type, ) try: result = send_to_server(payload, server_url) risk = result.get("risk_level", "확인불가") # 웹 리포트 URL이 있으면 브라우저로 열기 result_url = result.get("result_url") if result_url: print(f"\n 브라우저에서 리포트 열기...\n → {result_url}") webbrowser.open(result_url) else: # 브라우저 없으면 터미널 출력 fallback print("\n" + "=" * 56) print(result.get("report", "리포트를 받지 못했습니다.")) print("=" * 56) print(f"\n 종합 위험도: {risk}") print(" 스캔이 완료되었습니다.\n") except urllib.error.URLError as e: print(f"\n ✗ 서버 연결 실패: {e}") print(" 서버가 실행 중인지 확인하거나, EAGLEEYE_SERVER 환경변수를 설정하세요.") saved = save_offline(payload) print(f" → 수집 데이터를 '{saved}' 파일로 저장했습니다.") sys.exit(1) if __name__ == "__main__": main()