import socket import ftplib import io import time def _ts(): return time.perf_counter() class InSightCamera: def __init__(self): self._sock = None self._buf = b"" self._ip = None self._ftp: "ftplib.FTP | None" = None # 영구 FTP 세션 # ------------------------------------------------------------------ # # 연결 / 해제 # ------------------------------------------------------------------ # def connect(self, ip: str, port: int = 23): """Telnet 연결 후 FTP 영구 세션도 함께 수립.""" self._ip = ip try: self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024) self._sock.settimeout(5) self._sock.connect((ip, port)) self._buf = b"" banner = self._read_line() print(f"[InSight] 배너: {banner!r}") self._send("admin") pwd_prompt = self._read_line() print(f"[InSight] 프롬프트: {pwd_prompt!r}") self._sock.sendall(b"\r\n") login_result = self._read_line() print(f"[InSight] 로그인: {login_result!r}") print(f"[InSight] Telnet 연결 성공: {ip}:{port}") except Exception as e: print(f"[InSight] Telnet 연결 실패: {e}") self._sock = None return # Telnet 성공 후 FTP 영구 연결 self.connect_ftp() def connect_ftp(self): """FTP 영구 세션 수립. 실패해도 나중에 _ensure_ftp()가 재시도.""" if not self._ip: return try: ftp = ftplib.FTP() ftp.connect(self._ip, 21, timeout=10) ftp.login("admin", "") self._ftp = ftp print("[InSight FTP] 영구 세션 수립") except Exception as e: print(f"[InSight FTP] 세션 수립 실패 (자동 재연결 대기): {e}") self._ftp = None def disconnect(self): self._close_ftp() if self._sock: try: self._sock.close() except Exception: pass self._sock = None print("[InSight] 연결 종료") def _close_ftp(self): if self._ftp: try: self._ftp.quit() except Exception: pass self._ftp = None def is_connected(self) -> bool: return self._sock is not None # ------------------------------------------------------------------ # # 트리거 / 이미지 # ------------------------------------------------------------------ # def software_trigger(self) -> bool: """SE8 전송 → 응답이 '1'이면 True""" t0 = _ts() self._send("SE8") resp = self._read_line() print(f"[InSight][시간] SE8 응답: {resp!r} ({_ts()-t0:.3f}s)") return resp == "1" def trigger_and_get_image(self) -> bytes: """SE8 트리거 → 1초 대기 → FTP 이미지 수신.""" self.software_trigger() time.sleep(1.0) return self.get_image() def get_image(self) -> bytes: """영구 FTP 세션으로 이미지 수신. 끊겼으면 자동 재연결 후 1회 재시도.""" if not self._ip: return b"" ftp = self._ensure_ftp() if ftp is None: return b"" t0 = _ts() try: target = self._find_bmp_first(ftp) print(f"[InSight FTP][시간] 파일 탐색: {_ts()-t0:.3f}s → {target}") buf = io.BytesIO() ftp.retrbinary(f"RETR {target}", buf.write) data = buf.getvalue() print(f"[InSight FTP] 수신 완료: {len(data)} bytes ({_ts()-t0:.3f}s)") return data except Exception as e: print(f"[InSight FTP] 오류: {e} — 세션 재연결 후 재시도") self._ftp = None ftp2 = self._ensure_ftp() if ftp2 is None: return b"" try: target = self._find_bmp_first(ftp2) buf = io.BytesIO() ftp2.retrbinary(f"RETR {target}", buf.write) return buf.getvalue() except Exception as e2: print(f"[InSight FTP] 재시도 실패: {e2}") return b"" def _ensure_ftp(self) -> "ftplib.FTP | None": """FTP 세션이 살아있으면 재사용, 끊겼으면 재연결.""" if self._ftp is not None: try: self._ftp.voidcmd("NOOP") return self._ftp except Exception: self._ftp = None self.connect_ftp() return self._ftp # ------------------------------------------------------------------ # # 내부 헬퍼 # ------------------------------------------------------------------ # def _find_bmp_first(self, ftp: ftplib.FTP) -> str: """BMP 우선, 없으면 JPG. 기본값 /image.bmp.""" try: files = ftp.nlst() print(f"[InSight FTP] 파일 목록: {files}") for f in files: if f.lower().endswith(".bmp"): return f"/{f}" if not f.startswith("/") else f for f in files: if f.lower().endswith(".jpg") or f.lower().endswith(".jpeg"): return f"/{f}" if not f.startswith("/") else f except Exception as e: print(f"[InSight FTP] 파일 탐색 오류: {e}") return "/image.bmp" def _send(self, cmd: str): self._sock.sendall((cmd + "\r\n").encode()) def _read_line(self) -> str: while b"\r\n" not in self._buf: try: chunk = self._sock.recv(4096) except socket.timeout: break if not chunk: break self._buf += chunk if b"\r\n" in self._buf: idx = self._buf.index(b"\r\n") line = self._buf[:idx] self._buf = self._buf[idx + 2:] else: line = self._buf self._buf = b"" return line.decode(errors="ignore").strip()