Files
ant-vision-inspector/camera/insight.py
2026-06-10 16:18:41 +09:00

190 lines
6.2 KiB
Python

import socket
import ftplib
import io
import time
def _ts():
return time.perf_counter()
class InSightCamera:
def __init__(self):
self._sock = None
self._buf = b""
self._ip = None
self._ftp: "ftplib.FTP | None" = None # 영구 FTP 세션
# ------------------------------------------------------------------ #
# 연결 / 해제
# ------------------------------------------------------------------ #
def connect(self, ip: str, port: int = 23):
"""Telnet 연결 후 FTP 영구 세션도 함께 수립."""
self._ip = ip
try:
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024 * 1024)
self._sock.settimeout(5)
self._sock.connect((ip, port))
self._buf = b""
banner = self._read_line()
print(f"[InSight] 배너: {banner!r}")
self._send("admin")
pwd_prompt = self._read_line()
print(f"[InSight] 프롬프트: {pwd_prompt!r}")
self._sock.sendall(b"\r\n")
login_result = self._read_line()
print(f"[InSight] 로그인: {login_result!r}")
print(f"[InSight] Telnet 연결 성공: {ip}:{port}")
except Exception as e:
print(f"[InSight] Telnet 연결 실패: {e}")
self._sock = None
return
# Telnet 성공 후 FTP 영구 연결
self.connect_ftp()
def connect_ftp(self):
"""FTP 영구 세션 수립. 실패해도 나중에 _ensure_ftp()가 재시도."""
if not self._ip:
return
try:
ftp = ftplib.FTP()
ftp.connect(self._ip, 21, timeout=10)
ftp.login("admin", "")
self._ftp = ftp
print("[InSight FTP] 영구 세션 수립")
except Exception as e:
print(f"[InSight FTP] 세션 수립 실패 (자동 재연결 대기): {e}")
self._ftp = None
def disconnect(self):
self._close_ftp()
if self._sock:
try:
self._sock.close()
except Exception:
pass
self._sock = None
print("[InSight] 연결 종료")
def _close_ftp(self):
if self._ftp:
try:
self._ftp.quit()
except Exception:
pass
self._ftp = None
def is_connected(self) -> bool:
return self._sock is not None
# ------------------------------------------------------------------ #
# 트리거 / 이미지
# ------------------------------------------------------------------ #
def software_trigger(self) -> bool:
"""SE8 전송 → 응답이 '1'이면 True"""
t0 = _ts()
self._send("SE8")
resp = self._read_line()
print(f"[InSight][시간] SE8 응답: {resp!r} ({_ts()-t0:.3f}s)")
return resp == "1"
def trigger_and_get_image(self) -> bytes:
"""SE8 트리거 → 1초 대기 → FTP 이미지 수신."""
self.software_trigger()
time.sleep(1.0)
return self.get_image()
def get_image(self) -> bytes:
"""영구 FTP 세션으로 이미지 수신. 끊겼으면 자동 재연결 후 1회 재시도."""
if not self._ip:
return b""
ftp = self._ensure_ftp()
if ftp is None:
return b""
t0 = _ts()
try:
target = self._find_bmp_first(ftp)
print(f"[InSight FTP][시간] 파일 탐색: {_ts()-t0:.3f}s → {target}")
buf = io.BytesIO()
ftp.retrbinary(f"RETR {target}", buf.write)
data = buf.getvalue()
print(f"[InSight FTP] 수신 완료: {len(data)} bytes ({_ts()-t0:.3f}s)")
return data
except Exception as e:
print(f"[InSight FTP] 오류: {e} — 세션 재연결 후 재시도")
self._ftp = None
ftp2 = self._ensure_ftp()
if ftp2 is None:
return b""
try:
target = self._find_bmp_first(ftp2)
buf = io.BytesIO()
ftp2.retrbinary(f"RETR {target}", buf.write)
return buf.getvalue()
except Exception as e2:
print(f"[InSight FTP] 재시도 실패: {e2}")
return b""
def _ensure_ftp(self) -> "ftplib.FTP | None":
"""FTP 세션이 살아있으면 재사용, 끊겼으면 재연결."""
if self._ftp is not None:
try:
self._ftp.voidcmd("NOOP")
return self._ftp
except Exception:
self._ftp = None
self.connect_ftp()
return self._ftp
# ------------------------------------------------------------------ #
# 내부 헬퍼
# ------------------------------------------------------------------ #
def _find_bmp_first(self, ftp: ftplib.FTP) -> str:
"""BMP 우선, 없으면 JPG. 기본값 /image.bmp."""
try:
files = ftp.nlst()
print(f"[InSight FTP] 파일 목록: {files}")
for f in files:
if f.lower().endswith(".bmp"):
return f"/{f}" if not f.startswith("/") else f
for f in files:
if f.lower().endswith(".jpg") or f.lower().endswith(".jpeg"):
return f"/{f}" if not f.startswith("/") else f
except Exception as e:
print(f"[InSight FTP] 파일 탐색 오류: {e}")
return "/image.bmp"
def _send(self, cmd: str):
self._sock.sendall((cmd + "\r\n").encode())
def _read_line(self) -> str:
while b"\r\n" not in self._buf:
try:
chunk = self._sock.recv(4096)
except socket.timeout:
break
if not chunk:
break
self._buf += chunk
if b"\r\n" in self._buf:
idx = self._buf.index(b"\r\n")
line = self._buf[:idx]
self._buf = self._buf[idx + 2:]
else:
line = self._buf
self._buf = b""
return line.decode(errors="ignore").strip()