feat: 초기 프로젝트 구조 추가
This commit is contained in:
3
logic/__init__.py
Normal file
3
logic/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
# logic 패키지 — Inspector, GroupManager 노출
|
||||
from .inspector import Inspector
|
||||
from .group_manager import GroupManager
|
||||
25
logic/group_manager.py
Normal file
25
logic/group_manager.py
Normal file
@@ -0,0 +1,25 @@
|
||||
# 그룹 관리 — A/B 모델 그룹 수동 전환 (최대 4종 per group)
|
||||
class GroupManager:
|
||||
MAX_PER_GROUP = 4
|
||||
|
||||
def __init__(self):
|
||||
self._group_a: list = []
|
||||
self._group_b: list = []
|
||||
self._active: str = "A"
|
||||
|
||||
def set_group_a(self, model_list: list):
|
||||
self._group_a = model_list[: self.MAX_PER_GROUP]
|
||||
|
||||
def set_group_b(self, model_list: list):
|
||||
self._group_b = model_list[: self.MAX_PER_GROUP]
|
||||
|
||||
def get_active_group(self) -> list:
|
||||
return self._group_a if self._active == "A" else self._group_b
|
||||
|
||||
def get_active_name(self) -> str:
|
||||
return self._active
|
||||
|
||||
def switch_group(self) -> str:
|
||||
"""A↔B 전환 후 활성 그룹 이름 반환"""
|
||||
self._active = "B" if self._active == "A" else "A"
|
||||
return self._active
|
||||
120
logic/inspector.py
Normal file
120
logic/inspector.py
Normal file
@@ -0,0 +1,120 @@
|
||||
# 검사 판별 로직 — PatMax 결과 판독 + 모델 판별 + Pass/Fail 판정
|
||||
import cv2
|
||||
import numpy as np
|
||||
|
||||
# Cognex 카메라 셀 매핑 (GV 방식 fallback용으로 유지)
|
||||
PATTERN_RESULT_CELLS = {
|
||||
"A27": {"id": 1, "name": "LOW REF", "model": "LX3", "type": "RH"},
|
||||
"A77": {"id": 2, "name": "LOW REF", "model": "LX3", "type": "LH"},
|
||||
"A127": {"id": 3, "name": "LOW REF NAS", "model": "LX3", "type": "RH"},
|
||||
"A177": {"id": 4, "name": "LOW REF NAS", "model": "LX3", "type": "LH"},
|
||||
}
|
||||
|
||||
|
||||
class Inspector:
|
||||
|
||||
# ── Python PatMax 매칭 (주 경로) ─────────────────────────────────── #
|
||||
|
||||
def match_image(self, image_bytes: bytes, matcher: "PatternMatcher") -> dict:
|
||||
"""
|
||||
FTP로 받은 이미지 바이트를 Python PatternMatcher로 매칭.
|
||||
반환 형식은 read_patmax_results와 동일하여 identify_model에서 그대로 사용.
|
||||
"""
|
||||
if not image_bytes:
|
||||
return {}
|
||||
|
||||
arr = np.frombuffer(image_bytes, dtype=np.uint8)
|
||||
img = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED)
|
||||
if img is None:
|
||||
return {}
|
||||
|
||||
all_scores = matcher.match_all(img)
|
||||
results = {}
|
||||
|
||||
for pid, score in all_scores.items():
|
||||
info = matcher.get_product_info(pid)
|
||||
if info is None:
|
||||
continue
|
||||
results[f"PY_{pid}"] = {
|
||||
"matched": score >= matcher.score_threshold,
|
||||
"score": score,
|
||||
"model": info,
|
||||
"raw": f"python_match={score:.1f}",
|
||||
}
|
||||
|
||||
return results
|
||||
|
||||
# ── Cognex GV 셀 방식 (fallback) ────────────────────────────────── #
|
||||
|
||||
def read_patmax_results(self, insight) -> dict:
|
||||
"""A27/A77/A127/A177 셀 조회 → #ERR이면 실패, 그 외 점수 파싱."""
|
||||
results = {}
|
||||
for cell, model_info in PATTERN_RESULT_CELLS.items():
|
||||
try:
|
||||
insight._send(f"GV{cell}")
|
||||
code = insight._read_line()
|
||||
if code != "1":
|
||||
results[cell] = {
|
||||
"matched": False, "score": 0.0,
|
||||
"model": model_info, "raw": ""
|
||||
}
|
||||
continue
|
||||
value = insight._read_line()
|
||||
if "#ERR" in value or value.strip() == "":
|
||||
results[cell] = {
|
||||
"matched": False, "score": 0.0,
|
||||
"model": model_info, "raw": value
|
||||
}
|
||||
else:
|
||||
# "(736.1,742.0) -1.8 = 82.9" 형식에서 = 뒤 값 추출
|
||||
try:
|
||||
score = float(value.split("=")[-1].strip())
|
||||
except Exception:
|
||||
score = 0.0
|
||||
results[cell] = {
|
||||
"matched": True, "score": score,
|
||||
"model": model_info, "raw": value
|
||||
}
|
||||
except Exception as e:
|
||||
print(f"[PatMax] {cell} 읽기 오류: {e}")
|
||||
results[cell] = {
|
||||
"matched": False, "score": 0.0,
|
||||
"model": model_info, "raw": ""
|
||||
}
|
||||
return results
|
||||
|
||||
# ── 공통: 모델 판별 + 판정 ──────────────────────────────────────── #
|
||||
|
||||
def identify_model(self, results: dict, allowed_model_ids: list) -> dict:
|
||||
"""매칭된 패턴 중 점수가 가장 높은 것을 선택해 허용 모델 여부 판별."""
|
||||
matched_patterns = [
|
||||
(cell, info) for cell, info in results.items()
|
||||
if info["matched"]
|
||||
]
|
||||
|
||||
if not matched_patterns:
|
||||
return {
|
||||
"matched": False, "in_allowed": False,
|
||||
"model": None, "score": 0.0,
|
||||
"cognex_pass": False, "status": "인식 불가"
|
||||
}
|
||||
|
||||
_best_cell, best_info = max(matched_patterns, key=lambda x: x[1]["score"])
|
||||
model = best_info["model"]
|
||||
in_allowed = model["id"] in allowed_model_ids
|
||||
|
||||
return {
|
||||
"matched": True,
|
||||
"in_allowed": in_allowed,
|
||||
"model": model,
|
||||
"score": best_info["score"],
|
||||
"cognex_pass": in_allowed,
|
||||
"status": (
|
||||
f"{model['name']} {model['model']} {model['type']} ({best_info['score']:.1f}점)"
|
||||
if in_allowed
|
||||
else f"허용 외 모델: {model['name']} {model['model']} {model['type']}"
|
||||
),
|
||||
}
|
||||
|
||||
def judge(self, cognex_pass: bool, basler_pass: bool) -> str:
|
||||
return "PASS" if cognex_pass and basler_pass else "FAIL"
|
||||
222
logic/pattern_matcher.py
Normal file
222
logic/pattern_matcher.py
Normal file
@@ -0,0 +1,222 @@
|
||||
# Python PatMax 대체 구현
|
||||
# ORB 특징점 매칭 (위치·회전 불변) + 엣지 NCC fallback (특징점 부족 시)
|
||||
import os
|
||||
import pickle
|
||||
import cv2
|
||||
import numpy as np
|
||||
from typing import Optional
|
||||
|
||||
_PATTERNS_PATH = os.path.join("assets", "patterns.pkl")
|
||||
SCORE_THRESHOLD = 60.0
|
||||
_MAX_IMG_SIZE = 1200 # ORB 처리 전 긴 변 최대 픽셀 (속도·메모리 제한)
|
||||
_GOOD_MATCH_REF = 20 # good match 이 수 이상 → 100점 기준
|
||||
|
||||
|
||||
class PatternMatcher:
|
||||
|
||||
def __init__(self, threshold: float = SCORE_THRESHOLD):
|
||||
# {product_id: {"method": "orb"|"ncc", "des": ndarray|None, ...}}
|
||||
self._patterns: dict = {}
|
||||
self._threshold = threshold
|
||||
|
||||
# ── 학습 ──────────────────────────────────────────────────────────── #
|
||||
|
||||
def train(self, image: np.ndarray, product_id: int, product_info: dict,
|
||||
roi=None):
|
||||
"""
|
||||
roi: (x, y, w, h) 픽셀 좌표, None이면 전체 이미지.
|
||||
ORB로 특징점을 자동 검출해 등록. 특징점 10개 미만이면 엣지 NCC 방식으로 전환.
|
||||
"""
|
||||
gray = _to_gray(image)
|
||||
if roi is not None:
|
||||
x, y, rw, rh = roi
|
||||
x, y = max(0, x), max(0, y)
|
||||
rw = min(rw, gray.shape[1] - x)
|
||||
rh = min(rh, gray.shape[0] - y)
|
||||
gray = gray[y:y + rh, x:x + rw].copy()
|
||||
|
||||
gray = _resize_if_large(gray)
|
||||
kp, des = cv2.ORB_create(nfeatures=1000).detectAndCompute(gray, None)
|
||||
|
||||
if des is not None and len(kp) >= 10:
|
||||
method = "orb"
|
||||
n_kp = len(kp)
|
||||
edges = None
|
||||
else:
|
||||
method = "ncc"
|
||||
n_kp = 0
|
||||
des = None
|
||||
edges = _to_edges(gray)
|
||||
|
||||
name = (f"{product_info.get('name')} "
|
||||
f"{product_info.get('model')} {product_info.get('type')}")
|
||||
suffix = f" ORB 특징점 {n_kp}개" if method == "orb" else " 엣지 NCC (특징점 부족)"
|
||||
print(f"[PatternMatcher] 등록: id={product_id} {name}{suffix}")
|
||||
|
||||
self._patterns[product_id] = {
|
||||
"method": method,
|
||||
"gray": gray, # 축소된 그레이 (시각화·fallback용)
|
||||
"des": des, # ORB 디스크립터 (np.ndarray) or None
|
||||
"n_kp": n_kp,
|
||||
"edges": edges, # Canny 엣지 or None
|
||||
"info": product_info,
|
||||
"roi": roi,
|
||||
}
|
||||
|
||||
# ── 매칭 ──────────────────────────────────────────────────────────── #
|
||||
|
||||
def match_all(self, image: np.ndarray) -> dict:
|
||||
"""모든 등록 패턴에 대한 점수 반환 {product_id: score(0~100)}"""
|
||||
gray = _resize_if_large(_to_gray(image))
|
||||
ncc_edges = None # NCC용 엣지는 필요할 때만 계산
|
||||
scores = {}
|
||||
for pid, data in self._patterns.items():
|
||||
if data.get("method") == "orb" and data.get("des") is not None:
|
||||
scores[pid] = _orb_score(gray, data["des"])
|
||||
else:
|
||||
if ncc_edges is None:
|
||||
ncc_edges = _to_edges(gray)
|
||||
tmpl = data.get("edges") or _to_edges(data["gray"])
|
||||
scores[pid] = _best_rotated_score(ncc_edges, tmpl)
|
||||
return scores
|
||||
|
||||
# ── 저장 / 로드 ────────────────────────────────────────────────────── #
|
||||
|
||||
def save(self, path: str = _PATTERNS_PATH):
|
||||
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
|
||||
with open(path, "wb") as f:
|
||||
pickle.dump({"patterns": self._patterns, "threshold": self._threshold}, f)
|
||||
print(f"[PatternMatcher] 저장: {len(self._patterns)}개 → {path}")
|
||||
|
||||
def load(self, path: str = _PATTERNS_PATH) -> bool:
|
||||
if not os.path.exists(path):
|
||||
return False
|
||||
try:
|
||||
with open(path, "rb") as f:
|
||||
data = pickle.load(f)
|
||||
self._patterns = data.get("patterns", {})
|
||||
self._threshold = data.get("threshold", SCORE_THRESHOLD)
|
||||
# 구버전 호환: method 키 없으면 NCC로 처리
|
||||
for pat in self._patterns.values():
|
||||
if "method" not in pat:
|
||||
pat["method"] = "ncc"
|
||||
if not pat.get("edges"):
|
||||
pat["edges"] = _to_edges(pat["gray"])
|
||||
print(f"[PatternMatcher] 로드: {len(self._patterns)}개 ← {path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"[PatternMatcher] 로드 실패: {e}")
|
||||
return False
|
||||
|
||||
# ── 조회 ──────────────────────────────────────────────────────────── #
|
||||
|
||||
def has_pattern(self, product_id: int) -> bool:
|
||||
return product_id in self._patterns
|
||||
|
||||
def remove_pattern(self, product_id: int):
|
||||
self._patterns.pop(product_id, None)
|
||||
|
||||
def get_product_info(self, product_id: int) -> Optional[dict]:
|
||||
data = self._patterns.get(product_id)
|
||||
return data["info"] if data else None
|
||||
|
||||
def get_pattern_summary(self, product_id: int) -> str:
|
||||
"""패턴 등록 방식 및 특징점 수 요약 문자열."""
|
||||
data = self._patterns.get(product_id)
|
||||
if not data:
|
||||
return ""
|
||||
if data.get("method") == "orb":
|
||||
return f"ORB 특징점 {data.get('n_kp', '?')}개 검출됨"
|
||||
return "엣지 NCC 방식 (특징점 부족)"
|
||||
|
||||
@property
|
||||
def registered_ids(self) -> list:
|
||||
return list(self._patterns.keys())
|
||||
|
||||
@property
|
||||
def score_threshold(self) -> float:
|
||||
return self._threshold
|
||||
|
||||
|
||||
# ── 공통 유틸 ──────────────────────────────────────────────────────────── #
|
||||
|
||||
def _to_gray(image: np.ndarray) -> np.ndarray:
|
||||
if len(image.shape) == 3:
|
||||
return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
||||
return image.copy()
|
||||
|
||||
|
||||
def _resize_if_large(image: np.ndarray) -> np.ndarray:
|
||||
"""긴 변이 _MAX_IMG_SIZE 초과 시 비율 유지 축소."""
|
||||
h, w = image.shape[:2]
|
||||
if max(h, w) <= _MAX_IMG_SIZE:
|
||||
return image
|
||||
scale = _MAX_IMG_SIZE / max(h, w)
|
||||
return cv2.resize(image, (int(w * scale), int(h * scale)),
|
||||
interpolation=cv2.INTER_AREA)
|
||||
|
||||
|
||||
# ── ORB 특징점 매칭 ────────────────────────────────────────────────────── #
|
||||
|
||||
def _orb_score(image: np.ndarray, template_des: np.ndarray) -> float:
|
||||
"""
|
||||
ORB 디스크립터로 유사도 점수 계산 (0~100).
|
||||
Lowe's ratio test(0.75)로 good match 필터링.
|
||||
good match _GOOD_MATCH_REF개 이상이면 100점.
|
||||
"""
|
||||
orb = cv2.ORB_create(nfeatures=1000)
|
||||
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=False)
|
||||
kp2, des2 = orb.detectAndCompute(image, None)
|
||||
if des2 is None or len(kp2) < 2:
|
||||
return 0.0
|
||||
try:
|
||||
matches = bf.knnMatch(template_des, des2, k=2)
|
||||
except cv2.error:
|
||||
return 0.0
|
||||
good = [p[0] for p in matches
|
||||
if len(p) == 2 and p[0].distance < 0.75 * p[1].distance]
|
||||
return min(len(good) / _GOOD_MATCH_REF * 100.0, 100.0)
|
||||
|
||||
|
||||
# ── 엣지 NCC fallback (특징점 없는 매끄러운 제품용) ───────────────────── #
|
||||
|
||||
_ROTATION_ANGLES = list(range(-15, 16, 5))
|
||||
|
||||
|
||||
def _to_edges(image: np.ndarray) -> np.ndarray:
|
||||
return cv2.Canny(cv2.GaussianBlur(image, (3, 3), 0), 50, 150)
|
||||
|
||||
|
||||
def _rotate_template(image: np.ndarray, angle_deg: float) -> np.ndarray:
|
||||
if abs(angle_deg) < 0.1:
|
||||
return image
|
||||
h, w = image.shape[:2]
|
||||
cx, cy = w / 2.0, h / 2.0
|
||||
M = cv2.getRotationMatrix2D((cx, cy), angle_deg, 1.0)
|
||||
cos_a, sin_a = abs(M[0, 0]), abs(M[0, 1])
|
||||
new_w = int(h * sin_a + w * cos_a)
|
||||
new_h = int(h * cos_a + w * sin_a)
|
||||
M[0, 2] += (new_w - w) / 2.0
|
||||
M[1, 2] += (new_h - h) / 2.0
|
||||
return cv2.warpAffine(image, M, (new_w, new_h))
|
||||
|
||||
|
||||
def _best_rotated_score(search: np.ndarray, template: np.ndarray) -> float:
|
||||
return max(_ncc_score(search, _rotate_template(template, float(a)))
|
||||
for a in _ROTATION_ANGLES)
|
||||
|
||||
|
||||
def _ncc_score(image: np.ndarray, template: np.ndarray) -> float:
|
||||
h, w = image.shape[:2]
|
||||
th, tw = template.shape[:2]
|
||||
if th > h or tw > w:
|
||||
scale = 0.8 * min(h / th, w / tw)
|
||||
template = cv2.resize(template,
|
||||
(max(1, int(tw * scale)), max(1, int(th * scale))),
|
||||
interpolation=cv2.INTER_AREA)
|
||||
th, tw = template.shape[:2]
|
||||
if th > h or tw > w:
|
||||
return 0.0
|
||||
result = cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)
|
||||
_, max_val, _, _ = cv2.minMaxLoc(result)
|
||||
return float(max(0.0, max_val)) * 100.0
|
||||
Reference in New Issue
Block a user