Astro-Session-Watcher/services/calibration_service.py

113 lines
4.2 KiB
Python
Raw Permalink Normal View History

"""
CalibrationService - управление съёмкой калибровочных кадров
"""
from pathlib import Path
from datetime import datetime
from typing import Optional, Callable
from PySide6.QtCore import QObject, Signal
from services.file_service import FileService
from services.watch_service import WatchService
class CalibrationService(QObject):
"""Сервис для съёмки калибровочных кадров с авто-остановкой"""
# Сигналы для UI
progress_updated = Signal(int, int) # (current, target)
capture_completed = Signal(str) # тип кадра
capture_error = Signal(str)
def __init__(self):
super().__init__()
self._watch_service = WatchService()
self._target_count = 0
self._current_count = 0
self._calibration_type = None
self._target_folder = None
self._stop_requested = False
def start_calibration(self, calibration_type: str, target_folder: Path,
camera_name: str, target_count: int,
on_file_received: Callable) -> bool:
"""
Начинает съёмку калибровочных кадров
calibration_type: 'bias', 'dark', 'flat'
"""
self._calibration_type = calibration_type
self._target_count = target_count
self._current_count = 0
self._stop_requested = False
# Создаём папку назначения
self._target_folder = target_folder
self._target_folder.mkdir(parents=True, exist_ok=True)
# Очищаем папку наблюдения от старых файлов
watch_folder = self._get_watch_folder()
if watch_folder:
FileService.clear_watch_folder(watch_folder)
# Запускаем отслеживание
def on_file(file_path: Path):
if self._stop_requested:
return
# Обрабатываем файл
if self._process_calibration_file(file_path, camera_name):
self._current_count += 1
self.progress_updated.emit(self._current_count, self._target_count)
if self._current_count >= self._target_count:
self.stop_calibration()
self.capture_completed.emit(calibration_type)
if on_file_received:
on_file_received(file_path)
watch_path = Path(".") # Нужно получить реальный путь
return self._watch_service.start(watch_path, on_file)
def _process_calibration_file(self, file_path: Path, camera_name: str) -> bool:
"""Обрабатывает калибровочный файл"""
if not FileService.is_photo(file_path):
return False
# Генерируем имя файла с типом кадра
timestamp = datetime.now()
suffix = file_path.suffix
type_prefix = {
'bias': 'Bias',
'dark': 'Dark',
'flat': 'Flat'
}.get(self._calibration_type, 'Calib')
new_filename = f"{type_prefix}_{camera_name}_{timestamp.strftime('%Y%m%d_%H%M%S')}{suffix}"
target_path = self._target_folder / new_filename
target_path = FileService.resolve_conflict(target_path)
try:
import shutil
shutil.move(str(file_path), str(target_path))
print(f"Калибровочный кадр сохранён: {target_path.name}")
return True
except Exception as e:
print(f"Ошибка сохранения кадра: {e}")
return False
def stop_calibration(self):
"""Останавливает съёмку калибровочных кадров"""
self._stop_requested = True
self._watch_service.stop()
def _get_watch_folder(self) -> Optional[Path]:
"""Возвращает папку наблюдения (нужно из main_window)"""
# TODO: Получить из главного окна
return None
def get_progress(self) -> tuple:
"""Возвращает прогресс (current, target)"""
return (self._current_count, self._target_count)