Astro-Session-Watcher/services/calibration_service.py

113 lines
No EOL
4.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CalibrationService - управление съёмкой калибровочных кадров
"""
from pathlib import Path
from datetime import datetime
from typing import Optional, Callable
from PySide6.QtCore import QObject, Signal
from services.file_service import FileService
from services.watch_service import WatchService
class CalibrationService(QObject):
"""Сервис для съёмки калибровочных кадров с авто-остановкой"""
# Сигналы для UI
progress_updated = Signal(int, int) # (current, target)
capture_completed = Signal(str) # тип кадра
capture_error = Signal(str)
def __init__(self):
super().__init__()
self._watch_service = WatchService()
self._target_count = 0
self._current_count = 0
self._calibration_type = None
self._target_folder = None
self._stop_requested = False
def start_calibration(self, calibration_type: str, target_folder: Path,
camera_name: str, target_count: int,
on_file_received: Callable) -> bool:
"""
Начинает съёмку калибровочных кадров
calibration_type: 'bias', 'dark', 'flat'
"""
self._calibration_type = calibration_type
self._target_count = target_count
self._current_count = 0
self._stop_requested = False
# Создаём папку назначения
self._target_folder = target_folder
self._target_folder.mkdir(parents=True, exist_ok=True)
# Очищаем папку наблюдения от старых файлов
watch_folder = self._get_watch_folder()
if watch_folder:
FileService.clear_watch_folder(watch_folder)
# Запускаем отслеживание
def on_file(file_path: Path):
if self._stop_requested:
return
# Обрабатываем файл
if self._process_calibration_file(file_path, camera_name):
self._current_count += 1
self.progress_updated.emit(self._current_count, self._target_count)
if self._current_count >= self._target_count:
self.stop_calibration()
self.capture_completed.emit(calibration_type)
if on_file_received:
on_file_received(file_path)
watch_path = Path(".") # Нужно получить реальный путь
return self._watch_service.start(watch_path, on_file)
def _process_calibration_file(self, file_path: Path, camera_name: str) -> bool:
"""Обрабатывает калибровочный файл"""
if not FileService.is_photo(file_path):
return False
# Генерируем имя файла с типом кадра
timestamp = datetime.now()
suffix = file_path.suffix
type_prefix = {
'bias': 'Bias',
'dark': 'Dark',
'flat': 'Flat'
}.get(self._calibration_type, 'Calib')
new_filename = f"{type_prefix}_{camera_name}_{timestamp.strftime('%Y%m%d_%H%M%S')}{suffix}"
target_path = self._target_folder / new_filename
target_path = FileService.resolve_conflict(target_path)
try:
import shutil
shutil.move(str(file_path), str(target_path))
print(f"Калибровочный кадр сохранён: {target_path.name}")
return True
except Exception as e:
print(f"Ошибка сохранения кадра: {e}")
return False
def stop_calibration(self):
"""Останавливает съёмку калибровочных кадров"""
self._stop_requested = True
self._watch_service.stop()
def _get_watch_folder(self) -> Optional[Path]:
"""Возвращает папку наблюдения (нужно из main_window)"""
# TODO: Получить из главного окна
return None
def get_progress(self) -> tuple:
"""Возвращает прогресс (current, target)"""
return (self._current_count, self._target_count)