190 lines
6.2 KiB
Python
190 lines
6.2 KiB
Python
import socket
|
|
import ftplib
|
|
import io
|
|
import time
|
|
|
|
|
|
def _ts():
|
|
return time.perf_counter()
|
|
|
|
|
|
class InSightCamera:
|
|
def __init__(self):
|
|
self._sock = None
|
|
self._buf = b""
|
|
self._ip = None
|
|
self._ftp: "ftplib.FTP | None" = None # 영구 FTP 세션
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# 연결 / 해제
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def connect(self, ip: str, port: int = 23):
|
|
"""Telnet 연결 후 FTP 영구 세션도 함께 수립."""
|
|
self._ip = ip
|
|
try:
|
|
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024)
|
|
self._sock.settimeout(5)
|
|
self._sock.connect((ip, port))
|
|
self._buf = b""
|
|
|
|
banner = self._read_line()
|
|
print(f"[InSight] 배너: {banner!r}")
|
|
|
|
self._send("admin")
|
|
pwd_prompt = self._read_line()
|
|
print(f"[InSight] 프롬프트: {pwd_prompt!r}")
|
|
|
|
self._sock.sendall(b"\r\n")
|
|
login_result = self._read_line()
|
|
print(f"[InSight] 로그인: {login_result!r}")
|
|
|
|
print(f"[InSight] Telnet 연결 성공: {ip}:{port}")
|
|
except Exception as e:
|
|
print(f"[InSight] Telnet 연결 실패: {e}")
|
|
self._sock = None
|
|
return
|
|
|
|
# Telnet 성공 후 FTP 영구 연결
|
|
self.connect_ftp()
|
|
|
|
def connect_ftp(self):
|
|
"""FTP 영구 세션 수립. 실패해도 나중에 _ensure_ftp()가 재시도."""
|
|
if not self._ip:
|
|
return
|
|
try:
|
|
ftp = ftplib.FTP()
|
|
ftp.connect(self._ip, 21, timeout=10)
|
|
ftp.login("admin", "")
|
|
self._ftp = ftp
|
|
print("[InSight FTP] 영구 세션 수립")
|
|
except Exception as e:
|
|
print(f"[InSight FTP] 세션 수립 실패 (자동 재연결 대기): {e}")
|
|
self._ftp = None
|
|
|
|
def disconnect(self):
|
|
self._close_ftp()
|
|
if self._sock:
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
self._sock = None
|
|
print("[InSight] 연결 종료")
|
|
|
|
def _close_ftp(self):
|
|
if self._ftp:
|
|
try:
|
|
self._ftp.quit()
|
|
except Exception:
|
|
pass
|
|
self._ftp = None
|
|
|
|
def is_connected(self) -> bool:
|
|
return self._sock is not None
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# 트리거 / 이미지
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def software_trigger(self) -> bool:
|
|
"""SE8 전송 → 응답이 '1'이면 True"""
|
|
t0 = _ts()
|
|
self._send("SE8")
|
|
resp = self._read_line()
|
|
print(f"[InSight][시간] SE8 응답: {resp!r} ({_ts()-t0:.3f}s)")
|
|
return resp == "1"
|
|
|
|
def trigger_and_get_image(self) -> bytes:
|
|
"""SE8 트리거 → 1초 대기 → FTP 이미지 수신."""
|
|
self.software_trigger()
|
|
time.sleep(1.0)
|
|
return self.get_image()
|
|
|
|
def get_image(self) -> bytes:
|
|
"""영구 FTP 세션으로 이미지 수신. 끊겼으면 자동 재연결 후 1회 재시도."""
|
|
if not self._ip:
|
|
return b""
|
|
|
|
ftp = self._ensure_ftp()
|
|
if ftp is None:
|
|
return b""
|
|
|
|
t0 = _ts()
|
|
try:
|
|
target = self._find_bmp_first(ftp)
|
|
print(f"[InSight FTP][시간] 파일 탐색: {_ts()-t0:.3f}s → {target}")
|
|
buf = io.BytesIO()
|
|
ftp.retrbinary(f"RETR {target}", buf.write)
|
|
data = buf.getvalue()
|
|
print(f"[InSight FTP] 수신 완료: {len(data)} bytes ({_ts()-t0:.3f}s)")
|
|
return data
|
|
except Exception as e:
|
|
print(f"[InSight FTP] 오류: {e} — 세션 재연결 후 재시도")
|
|
self._ftp = None
|
|
ftp2 = self._ensure_ftp()
|
|
if ftp2 is None:
|
|
return b""
|
|
try:
|
|
target = self._find_bmp_first(ftp2)
|
|
buf = io.BytesIO()
|
|
ftp2.retrbinary(f"RETR {target}", buf.write)
|
|
return buf.getvalue()
|
|
except Exception as e2:
|
|
print(f"[InSight FTP] 재시도 실패: {e2}")
|
|
return b""
|
|
|
|
def _ensure_ftp(self) -> "ftplib.FTP | None":
|
|
"""FTP 세션이 살아있으면 재사용, 끊겼으면 재연결."""
|
|
if self._ftp is not None:
|
|
try:
|
|
self._ftp.voidcmd("NOOP")
|
|
return self._ftp
|
|
except Exception:
|
|
self._ftp = None
|
|
self.connect_ftp()
|
|
return self._ftp
|
|
|
|
# ------------------------------------------------------------------ #
|
|
# 내부 헬퍼
|
|
# ------------------------------------------------------------------ #
|
|
|
|
def _find_bmp_first(self, ftp: ftplib.FTP) -> str:
|
|
"""BMP 우선, 없으면 JPG. 기본값 /image.bmp."""
|
|
try:
|
|
files = ftp.nlst()
|
|
print(f"[InSight FTP] 파일 목록: {files}")
|
|
for f in files:
|
|
if f.lower().endswith(".bmp"):
|
|
return f"/{f}" if not f.startswith("/") else f
|
|
for f in files:
|
|
if f.lower().endswith(".jpg") or f.lower().endswith(".jpeg"):
|
|
return f"/{f}" if not f.startswith("/") else f
|
|
except Exception as e:
|
|
print(f"[InSight FTP] 파일 탐색 오류: {e}")
|
|
return "/image.bmp"
|
|
|
|
def _send(self, cmd: str):
|
|
self._sock.sendall((cmd + "\r\n").encode())
|
|
|
|
def _read_line(self) -> str:
|
|
while b"\r\n" not in self._buf:
|
|
try:
|
|
chunk = self._sock.recv(4096)
|
|
except socket.timeout:
|
|
break
|
|
if not chunk:
|
|
break
|
|
self._buf += chunk
|
|
|
|
if b"\r\n" in self._buf:
|
|
idx = self._buf.index(b"\r\n")
|
|
line = self._buf[:idx]
|
|
self._buf = self._buf[idx + 2:]
|
|
else:
|
|
line = self._buf
|
|
self._buf = b""
|
|
|
|
return line.decode(errors="ignore").strip()
|