113 lines
No EOL
4.2 KiB
Python
113 lines
No EOL
4.2 KiB
Python
"""
|
||
CalibrationService - управление съёмкой калибровочных кадров
|
||
"""
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
from typing import Optional, Callable
|
||
from PySide6.QtCore import QObject, Signal
|
||
|
||
from services.file_service import FileService
|
||
from services.watch_service import WatchService
|
||
|
||
|
||
class CalibrationService(QObject):
|
||
"""Сервис для съёмки калибровочных кадров с авто-остановкой"""
|
||
|
||
# Сигналы для UI
|
||
progress_updated = Signal(int, int) # (current, target)
|
||
capture_completed = Signal(str) # тип кадра
|
||
capture_error = Signal(str)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._watch_service = WatchService()
|
||
self._target_count = 0
|
||
self._current_count = 0
|
||
self._calibration_type = None
|
||
self._target_folder = None
|
||
self._stop_requested = False
|
||
|
||
def start_calibration(self, calibration_type: str, target_folder: Path,
|
||
camera_name: str, target_count: int,
|
||
on_file_received: Callable) -> bool:
|
||
"""
|
||
Начинает съёмку калибровочных кадров
|
||
|
||
calibration_type: 'bias', 'dark', 'flat'
|
||
"""
|
||
self._calibration_type = calibration_type
|
||
self._target_count = target_count
|
||
self._current_count = 0
|
||
self._stop_requested = False
|
||
|
||
# Создаём папку назначения
|
||
self._target_folder = target_folder
|
||
self._target_folder.mkdir(parents=True, exist_ok=True)
|
||
|
||
# Очищаем папку наблюдения от старых файлов
|
||
watch_folder = self._get_watch_folder()
|
||
if watch_folder:
|
||
FileService.clear_watch_folder(watch_folder)
|
||
|
||
# Запускаем отслеживание
|
||
def on_file(file_path: Path):
|
||
if self._stop_requested:
|
||
return
|
||
|
||
# Обрабатываем файл
|
||
if self._process_calibration_file(file_path, camera_name):
|
||
self._current_count += 1
|
||
self.progress_updated.emit(self._current_count, self._target_count)
|
||
|
||
if self._current_count >= self._target_count:
|
||
self.stop_calibration()
|
||
self.capture_completed.emit(calibration_type)
|
||
|
||
if on_file_received:
|
||
on_file_received(file_path)
|
||
|
||
watch_path = Path(".") # Нужно получить реальный путь
|
||
return self._watch_service.start(watch_path, on_file)
|
||
|
||
def _process_calibration_file(self, file_path: Path, camera_name: str) -> bool:
|
||
"""Обрабатывает калибровочный файл"""
|
||
if not FileService.is_photo(file_path):
|
||
return False
|
||
|
||
# Генерируем имя файла с типом кадра
|
||
timestamp = datetime.now()
|
||
suffix = file_path.suffix
|
||
|
||
type_prefix = {
|
||
'bias': 'Bias',
|
||
'dark': 'Dark',
|
||
'flat': 'Flat'
|
||
}.get(self._calibration_type, 'Calib')
|
||
|
||
new_filename = f"{type_prefix}_{camera_name}_{timestamp.strftime('%Y%m%d_%H%M%S')}{suffix}"
|
||
|
||
target_path = self._target_folder / new_filename
|
||
target_path = FileService.resolve_conflict(target_path)
|
||
|
||
try:
|
||
import shutil
|
||
shutil.move(str(file_path), str(target_path))
|
||
print(f"Калибровочный кадр сохранён: {target_path.name}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"Ошибка сохранения кадра: {e}")
|
||
return False
|
||
|
||
def stop_calibration(self):
|
||
"""Останавливает съёмку калибровочных кадров"""
|
||
self._stop_requested = True
|
||
self._watch_service.stop()
|
||
|
||
def _get_watch_folder(self) -> Optional[Path]:
|
||
"""Возвращает папку наблюдения (нужно из main_window)"""
|
||
# TODO: Получить из главного окна
|
||
return None
|
||
|
||
def get_progress(self) -> tuple:
|
||
"""Возвращает прогресс (current, target)"""
|
||
return (self._current_count, self._target_count) |