working logic+working watching files+added calibration feature+instructions

This commit is contained in:
Vic Sergeev 2026-05-07 21:13:00 +03:00
parent 09d181eba8
commit 97ed8217bf
25 changed files with 1743 additions and 192 deletions

View file

@ -0,0 +1,113 @@
"""
CalibrationService - управление съёмкой калибровочных кадров
"""
from pathlib import Path
from datetime import datetime
from typing import Optional, Callable
from PySide6.QtCore import QObject, Signal
from services.file_service import FileService
from services.watch_service import WatchService
class CalibrationService(QObject):
"""Сервис для съёмки калибровочных кадров с авто-остановкой"""
# Сигналы для UI
progress_updated = Signal(int, int) # (current, target)
capture_completed = Signal(str) # тип кадра
capture_error = Signal(str)
def __init__(self):
super().__init__()
self._watch_service = WatchService()
self._target_count = 0
self._current_count = 0
self._calibration_type = None
self._target_folder = None
self._stop_requested = False
def start_calibration(self, calibration_type: str, target_folder: Path,
camera_name: str, target_count: int,
on_file_received: Callable) -> bool:
"""
Начинает съёмку калибровочных кадров
calibration_type: 'bias', 'dark', 'flat'
"""
self._calibration_type = calibration_type
self._target_count = target_count
self._current_count = 0
self._stop_requested = False
# Создаём папку назначения
self._target_folder = target_folder
self._target_folder.mkdir(parents=True, exist_ok=True)
# Очищаем папку наблюдения от старых файлов
watch_folder = self._get_watch_folder()
if watch_folder:
FileService.clear_watch_folder(watch_folder)
# Запускаем отслеживание
def on_file(file_path: Path):
if self._stop_requested:
return
# Обрабатываем файл
if self._process_calibration_file(file_path, camera_name):
self._current_count += 1
self.progress_updated.emit(self._current_count, self._target_count)
if self._current_count >= self._target_count:
self.stop_calibration()
self.capture_completed.emit(calibration_type)
if on_file_received:
on_file_received(file_path)
watch_path = Path(".") # Нужно получить реальный путь
return self._watch_service.start(watch_path, on_file)
def _process_calibration_file(self, file_path: Path, camera_name: str) -> bool:
"""Обрабатывает калибровочный файл"""
if not FileService.is_photo(file_path):
return False
# Генерируем имя файла с типом кадра
timestamp = datetime.now()
suffix = file_path.suffix
type_prefix = {
'bias': 'Bias',
'dark': 'Dark',
'flat': 'Flat'
}.get(self._calibration_type, 'Calib')
new_filename = f"{type_prefix}_{camera_name}_{timestamp.strftime('%Y%m%d_%H%M%S')}{suffix}"
target_path = self._target_folder / new_filename
target_path = FileService.resolve_conflict(target_path)
try:
import shutil
shutil.move(str(file_path), str(target_path))
print(f"Калибровочный кадр сохранён: {target_path.name}")
return True
except Exception as e:
print(f"Ошибка сохранения кадра: {e}")
return False
def stop_calibration(self):
"""Останавливает съёмку калибровочных кадров"""
self._stop_requested = True
self._watch_service.stop()
def _get_watch_folder(self) -> Optional[Path]:
"""Возвращает папку наблюдения (нужно из main_window)"""
# TODO: Получить из главного окна
return None
def get_progress(self) -> tuple:
"""Возвращает прогресс (current, target)"""
return (self._current_count, self._target_count)

View file

@ -4,18 +4,6 @@ ConfigService - управление настройками приложения
import json
import os
from typing import List, Optional
from dataclasses import dataclass, asdict
@dataclass
class AppConfig:
"""Конфигурация приложения"""
cameras: List[str]
lenses: List[str]
celestial_bodies: List[str]
last_watch_folder: str
last_camera: str
last_lens: str
class ConfigService:
@ -25,14 +13,15 @@ class ConfigService:
CELESTIAL_BODIES_FILE = "celestial_bodies.json"
def __init__(self):
self.config = AppConfig(
cameras=[],
lenses=[],
celestial_bodies=[],
last_watch_folder="",
last_camera="",
last_lens=""
)
self.config = {
'cameras': [],
'lenses': [],
'telescopes': [],
'celestial_bodies': [],
'last_watch_folder': '',
'last_camera': '',
'last_lens': ''
}
self.load_all()
def load_all(self):
@ -40,8 +29,9 @@ class ConfigService:
self._load_settings()
self._load_celestial_bodies()
if not self.config.celestial_bodies:
self.config.celestial_bodies = [
# Если список небесных тел пуст - добавляем стандартные
if not self.config['celestial_bodies']:
self.config['celestial_bodies'] = [
"M31 (Andromeda Galaxy)",
"M42 (Orion Nebula)",
"M45 (Pleiades)",
@ -54,108 +44,151 @@ class ConfigService:
self._save_celestial_bodies()
def _load_settings(self):
"""Загружает основные настройки (камеры, объективы, телескопы, последнюю папку)"""
if os.path.exists(self.SETTINGS_FILE):
try:
with open(self.SETTINGS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
self.config.cameras = data.get('cameras', [])
self.config.lenses = data.get('lenses', [])
self.config.last_watch_folder = data.get('last_watch_folder', '')
self.config.last_camera = data.get('last_camera', '')
self.config.last_lens = data.get('last_lens', '')
self.config['cameras'] = data.get('cameras', [])
self.config['lenses'] = data.get('lenses', [])
self.config['telescopes'] = data.get('telescopes', [])
self.config['last_watch_folder'] = data.get('last_watch_folder', '')
self.config['last_camera'] = data.get('last_camera', '')
self.config['last_lens'] = data.get('last_lens', '')
except Exception as e:
print(f"Ошибка загрузки настроек: {e}")
def save_settings(self):
"""Сохраняет основные настройки"""
try:
with open(self.SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump({
'cameras': self.config.cameras,
'lenses': self.config.lenses,
'last_watch_folder': self.config.last_watch_folder,
'last_camera': self.config.last_camera,
'last_lens': self.config.last_lens
'cameras': self.config['cameras'],
'lenses': self.config['lenses'],
'telescopes': self.config['telescopes'],
'last_watch_folder': self.config['last_watch_folder'],
'last_camera': self.config['last_camera'],
'last_lens': self.config['last_lens']
}, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Ошибка сохранения настроек: {e}")
def _load_celestial_bodies(self):
"""Загружает список небесных тел"""
if os.path.exists(self.CELESTIAL_BODIES_FILE):
try:
with open(self.CELESTIAL_BODIES_FILE, 'r', encoding='utf-8') as f:
self.config.celestial_bodies = json.load(f)
self.config['celestial_bodies'] = json.load(f)
except Exception as e:
print(f"Ошибка загрузки небесных тел: {e}")
def _save_celestial_bodies(self):
"""Сохраняет список небесных тел"""
try:
with open(self.CELESTIAL_BODIES_FILE, 'w', encoding='utf-8') as f:
json.dump(self.config.celestial_bodies, f, ensure_ascii=False, indent=2)
json.dump(self.config['celestial_bodies'], f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Ошибка сохранения небесных тел: {e}")
# ===== Методы для работы с камерами =====
def get_cameras(self) -> List[str]:
return self.config.cameras.copy()
return self.config['cameras'].copy()
def add_camera(self, camera: str):
if camera and camera not in self.config.cameras:
self.config.cameras.append(camera)
if camera and camera not in self.config['cameras']:
self.config['cameras'].append(camera)
self.save_settings()
def remove_camera(self, camera: str):
if camera in self.config.cameras:
self.config.cameras.remove(camera)
if camera in self.config['cameras']:
self.config['cameras'].remove(camera)
self.save_settings()
# ===== Методы для работы с объективами =====
def get_lenses(self) -> List[str]:
return self.config.lenses.copy()
return self.config['lenses'].copy()
def add_lens(self, lens: str):
if lens and lens not in self.config.lenses:
self.config.lenses.append(lens)
if lens and lens not in self.config['lenses']:
self.config['lenses'].append(lens)
self.save_settings()
def remove_lens(self, lens: str):
if lens in self.config.lenses:
self.config.lenses.remove(lens)
if lens in self.config['lenses']:
self.config['lenses'].remove(lens)
self.save_settings()
def update_lens(self, old_name: str, new_name: str):
if old_name in self.config['lenses']:
idx = self.config['lenses'].index(old_name)
self.config['lenses'][idx] = new_name
self.save_settings()
# ===== Методы для работы с телескопами =====
def get_telescopes(self) -> List[str]:
return self.config.get('telescopes', []).copy()
def add_telescope(self, telescope: str):
if 'telescopes' not in self.config:
self.config['telescopes'] = []
if telescope and telescope not in self.config['telescopes']:
self.config['telescopes'].append(telescope)
self.save_settings()
def remove_telescope(self, telescope: str):
if 'telescopes' in self.config and telescope in self.config['telescopes']:
self.config['telescopes'].remove(telescope)
self.save_settings()
def update_telescope(self, old_name: str, new_name: str):
if 'telescopes' in self.config and old_name in self.config['telescopes']:
idx = self.config['telescopes'].index(old_name)
self.config['telescopes'][idx] = new_name
self.save_settings()
# ===== Методы для работы с небесными телами =====
def get_celestial_bodies(self) -> List[str]:
return self.config.celestial_bodies.copy()
return self.config['celestial_bodies'].copy()
def add_celestial_body(self, name: str):
if name and name not in self.config.celestial_bodies:
self.config.celestial_bodies.append(name)
if name and name not in self.config['celestial_bodies']:
self.config['celestial_bodies'].append(name)
self._save_celestial_bodies()
def remove_celestial_body(self, name: str):
if name in self.config.celestial_bodies:
self.config.celestial_bodies.remove(name)
if name in self.config['celestial_bodies']:
self.config['celestial_bodies'].remove(name)
self._save_celestial_bodies()
def update_celestial_body(self, old_name: str, new_name: str):
if old_name in self.config.celestial_bodies:
idx = self.config.celestial_bodies.index(old_name)
self.config.celestial_bodies[idx] = new_name
if old_name in self.config['celestial_bodies']:
idx = self.config['celestial_bodies'].index(old_name)
self.config['celestial_bodies'][idx] = new_name
self._save_celestial_bodies()
# ===== Методы для работы с последней папкой и оборудованием =====
def get_last_watch_folder(self) -> str:
return self.config.last_watch_folder
return self.config.get('last_watch_folder', '')
def set_last_watch_folder(self, folder: str):
self.config.last_watch_folder = folder
self.config['last_watch_folder'] = folder
self.save_settings()
def get_last_camera(self) -> str:
return self.config.last_camera
return self.config.get('last_camera', '')
def set_last_camera(self, camera: str):
self.config.last_camera = camera
self.config['last_camera'] = camera
self.save_settings()
def get_last_lens(self) -> str:
return self.config.last_lens
return self.config.get('last_lens', '')
def set_last_lens(self, lens: str):
self.config.last_lens = lens
self.config['last_lens'] = lens
self.save_settings()

View file

@ -35,6 +35,22 @@ class FileService:
return new_path
counter += 1
@classmethod
def generate_new_filename(cls, object_name: str, timestamp: datetime, original_suffix: str) -> str:
"""
Генерирует новое имя файла в формате:
ИмяОбъекта_ГГГГ-ММ-ДД_ЧЧ-ММ-СС.расширение
"""
# Очищаем имя объекта от недопустимых символов
safe_object_name = "".join(c for c in object_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_object_name = safe_object_name.replace(' ', '_')
# Форматируем дату и время
date_str = timestamp.strftime("%Y-%m-%d")
time_str = timestamp.strftime("%H-%M-%S")
return f"{safe_object_name}_{date_str}_{time_str}{original_suffix}"
@classmethod
def write_object_log(cls, folder: Path, filename: str, camera: str, optics: str,
timestamp: Optional[datetime] = None) -> None:
@ -52,23 +68,45 @@ class FileService:
print(f"Ошибка записи лога: {e}")
@classmethod
def move_file(cls, source: Path, target_folder: Path, camera: str, optics: str) -> bool:
"""Перемещает файл в целевую папку"""
def move_file(cls, source: Path, target_folder: Path, camera: str, optics: str,
object_name: str = None) -> bool:
"""
Перемещает файл в целевую папку с переименованием
Если object_name указан, файл переименовывается в формат: ИмяОбъектаатаремя.расширение
"""
if not source.exists():
print(f"Файл не существует: {source}")
return False
if not cls.is_photo(source):
print(f"Неподдерживаемый формат: {source}")
return False
try:
target_folder.mkdir(parents=True, exist_ok=True)
# Получаем время создания файла
creation_time = datetime.fromtimestamp(source.stat().st_ctime)
cls.write_object_log(target_folder, source.name, camera, optics, creation_time)
target_path = cls.resolve_conflict(target_folder / source.name)
# Генерируем новое имя файла, если указано имя объекта
if object_name:
new_filename = cls.generate_new_filename(object_name, creation_time, source.suffix)
else:
new_filename = source.name
# Записываем в лог (сохраняем оригинальное имя в логе для отслеживания)
cls.write_object_log(target_folder, f"{source.name} -> {new_filename}", camera, optics, creation_time)
# Формируем целевой путь
target_path = cls.resolve_conflict(target_folder / new_filename)
# Перемещаем файл
shutil.move(str(source), str(target_path))
print(f"Файл перемещён и переименован: {source.name} -> {target_path.name}")
return True
except Exception as e:
print(f"Ошибка перемещения {source.name}: {e}")
print(f"Ошибка перемещения файла {source.name}: {e}")
return False
@classmethod

View file

@ -59,7 +59,7 @@ class SessionService:
return astro_object
def handle_file(self, file_path: Path) -> bool:
"""Обрабатывает новый файл"""
"""Обрабатывает новый файл: перемещает в папку текущего объекта с переименованием"""
if not self._current_session:
return False
@ -67,11 +67,13 @@ class SessionService:
if not current_object:
return False
# Передаём имя объекта для переименования файла
success = self._file_service.move_file(
file_path,
current_object.folder,
self._current_session.camera,
self._current_session.optics
self._current_session.optics,
current_object.name # Добавляем имя объекта для переименования
)
if success:
@ -92,12 +94,14 @@ class SessionService:
if watch_folder.exists():
for file_path in watch_folder.iterdir():
if file_path.is_file() and self._file_service.is_photo(file_path):
if self._file_service.move_file(
success = self._file_service.move_file(
file_path,
current_object.folder,
self._current_session.camera,
self._current_session.optics
):
self._current_session.optics,
current_object.name # Добавляем имя объекта для переименования
)
if success:
current_object.increment_photo_count()
count += 1
if on_file_moved:

View file

@ -25,7 +25,17 @@ class PhotoHandler(FileSystemEventHandler):
if not event.is_directory:
src_path = Path(event.src_path)
if FileService.is_photo(src_path):
time.sleep(0.1) # Даём время на запись файла
print(f"[Watchdog] Обнаружен файл: {src_path}")
time.sleep(0.1)
self._pending_files.put(src_path)
def on_modified(self, event):
"""Также обрабатываем modified, так как некоторые программы сначала создают временный файл"""
if not event.is_directory:
src_path = Path(event.src_path)
if FileService.is_photo(src_path):
print(f"[Watchdog] Изменён файл: {src_path}")
time.sleep(0.1)
self._pending_files.put(src_path)
def _process_queue(self):
@ -52,24 +62,33 @@ class WatchService:
self._is_running = False
def start(self, watch_folder: Path, on_new_file: Callable[[Path], None]) -> bool:
"""Запускает отслеживание папки"""
if self._is_running:
print("Watcher already running")
return False
if not watch_folder.exists():
print(f"Папка не существует: {watch_folder}")
return False
try:
print(f"Запуск отслеживания папки: {watch_folder}")
self._event_handler = PhotoHandler(on_new_file)
self._observer = Observer()
self._observer.schedule(self._event_handler, str(watch_folder), recursive=False)
self._observer.start()
self._is_running = True
print(f"Отслеживание успешно запущено для: {watch_folder}")
return True
except Exception as e:
print(f"Ошибка запуска отслеживания: {e}")
import traceback
traceback.print_exc()
return False
def stop(self):
"""Останавливает отслеживание"""
print("Остановка отслеживания...")
if self._observer:
self._observer.stop()
self._observer.join()
@ -80,12 +99,13 @@ class WatchService:
self._event_handler = None
self._is_running = False
print("Отслеживание остановлено")
def is_running(self) -> bool:
return self._is_running
def move_all_existing_files(self, watch_folder: Path, on_file_moved: Callable[[Path], None]) -> int:
"""Перемещает все существующие файлы"""
"""Перемещает все существующие файлы из папки наблюдения"""
count = 0
if watch_folder.exists():
for file_path in watch_folder.iterdir():