This commit is contained in:
Vic Sergeev 2026-05-07 17:15:56 +03:00
parent f7e794774d
commit 09d181eba8
37 changed files with 1898 additions and 5 deletions

View file

@ -0,0 +1,6 @@
from services.config_service import ConfigService
from services.file_service import FileService
from services.session_service import SessionService
from services.watch_service import WatchService
__all__ = ['ConfigService', 'FileService', 'SessionService', 'WatchService']

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,161 @@
"""
ConfigService - управление настройками приложения
"""
import json
import os
from typing import List, Optional
from dataclasses import dataclass, asdict
@dataclass
class AppConfig:
"""Конфигурация приложения"""
cameras: List[str]
lenses: List[str]
celestial_bodies: List[str]
last_watch_folder: str
last_camera: str
last_lens: str
class ConfigService:
"""Сервис для работы с конфигурацией"""
SETTINGS_FILE = "astro_settings.json"
CELESTIAL_BODIES_FILE = "celestial_bodies.json"
def __init__(self):
self.config = AppConfig(
cameras=[],
lenses=[],
celestial_bodies=[],
last_watch_folder="",
last_camera="",
last_lens=""
)
self.load_all()
def load_all(self):
"""Загружает все настройки"""
self._load_settings()
self._load_celestial_bodies()
if not self.config.celestial_bodies:
self.config.celestial_bodies = [
"M31 (Andromeda Galaxy)",
"M42 (Orion Nebula)",
"M45 (Pleiades)",
"M57 (Ring Nebula)",
"Sun",
"Moon",
"Jupiter",
"Saturn"
]
self._save_celestial_bodies()
def _load_settings(self):
if os.path.exists(self.SETTINGS_FILE):
try:
with open(self.SETTINGS_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
self.config.cameras = data.get('cameras', [])
self.config.lenses = data.get('lenses', [])
self.config.last_watch_folder = data.get('last_watch_folder', '')
self.config.last_camera = data.get('last_camera', '')
self.config.last_lens = data.get('last_lens', '')
except Exception as e:
print(f"Ошибка загрузки настроек: {e}")
def save_settings(self):
try:
with open(self.SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump({
'cameras': self.config.cameras,
'lenses': self.config.lenses,
'last_watch_folder': self.config.last_watch_folder,
'last_camera': self.config.last_camera,
'last_lens': self.config.last_lens
}, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Ошибка сохранения настроек: {e}")
def _load_celestial_bodies(self):
if os.path.exists(self.CELESTIAL_BODIES_FILE):
try:
with open(self.CELESTIAL_BODIES_FILE, 'r', encoding='utf-8') as f:
self.config.celestial_bodies = json.load(f)
except Exception as e:
print(f"Ошибка загрузки небесных тел: {e}")
def _save_celestial_bodies(self):
try:
with open(self.CELESTIAL_BODIES_FILE, 'w', encoding='utf-8') as f:
json.dump(self.config.celestial_bodies, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Ошибка сохранения небесных тел: {e}")
def get_cameras(self) -> List[str]:
return self.config.cameras.copy()
def add_camera(self, camera: str):
if camera and camera not in self.config.cameras:
self.config.cameras.append(camera)
self.save_settings()
def remove_camera(self, camera: str):
if camera in self.config.cameras:
self.config.cameras.remove(camera)
self.save_settings()
def get_lenses(self) -> List[str]:
return self.config.lenses.copy()
def add_lens(self, lens: str):
if lens and lens not in self.config.lenses:
self.config.lenses.append(lens)
self.save_settings()
def remove_lens(self, lens: str):
if lens in self.config.lenses:
self.config.lenses.remove(lens)
self.save_settings()
def get_celestial_bodies(self) -> List[str]:
return self.config.celestial_bodies.copy()
def add_celestial_body(self, name: str):
if name and name not in self.config.celestial_bodies:
self.config.celestial_bodies.append(name)
self._save_celestial_bodies()
def remove_celestial_body(self, name: str):
if name in self.config.celestial_bodies:
self.config.celestial_bodies.remove(name)
self._save_celestial_bodies()
def update_celestial_body(self, old_name: str, new_name: str):
if old_name in self.config.celestial_bodies:
idx = self.config.celestial_bodies.index(old_name)
self.config.celestial_bodies[idx] = new_name
self._save_celestial_bodies()
def get_last_watch_folder(self) -> str:
return self.config.last_watch_folder
def set_last_watch_folder(self, folder: str):
self.config.last_watch_folder = folder
self.save_settings()
def get_last_camera(self) -> str:
return self.config.last_camera
def set_last_camera(self, camera: str):
self.config.last_camera = camera
self.save_settings()
def get_last_lens(self) -> str:
return self.config.last_lens
def set_last_lens(self, lens: str):
self.config.last_lens = lens
self.save_settings()

View file

@ -0,0 +1,88 @@
"""
FileService - сервис для работы с файлами
"""
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional
class FileService:
"""Сервис для перемещения файлов и ведения логов"""
SUPPORTED_EXTENSIONS = {'.cr2', '.dng', '.arw', '.jpg', '.jpeg', '.png', '.raw', '.tiff'}
@classmethod
def is_photo(cls, file_path: Path) -> bool:
"""Проверяет, является ли файл фотографией"""
return file_path.suffix.lower() in cls.SUPPORTED_EXTENSIONS
@classmethod
def resolve_conflict(cls, target_path: Path) -> Path:
"""Разрешает конфликт имён файлов"""
if not target_path.exists():
return target_path
counter = 1
stem = target_path.stem
suffix = target_path.suffix
parent = target_path.parent
while True:
new_name = f"{stem}_{counter}{suffix}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
@classmethod
def write_object_log(cls, folder: Path, filename: str, camera: str, optics: str,
timestamp: Optional[datetime] = None) -> None:
"""Записывает запись в лог объекта"""
if timestamp is None:
timestamp = datetime.now()
log_file = folder / "ObjectLog.txt"
line = f"{timestamp} {camera if camera else 'Unknown'} {optics if optics else 'Unknown'} - {filename}\n"
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
print(f"Ошибка записи лога: {e}")
@classmethod
def move_file(cls, source: Path, target_folder: Path, camera: str, optics: str) -> bool:
"""Перемещает файл в целевую папку"""
if not source.exists():
return False
if not cls.is_photo(source):
return False
try:
target_folder.mkdir(parents=True, exist_ok=True)
creation_time = datetime.fromtimestamp(source.stat().st_ctime)
cls.write_object_log(target_folder, source.name, camera, optics, creation_time)
target_path = cls.resolve_conflict(target_folder / source.name)
shutil.move(str(source), str(target_path))
return True
except Exception as e:
print(f"Ошибка перемещения {source.name}: {e}")
return False
@classmethod
def clear_watch_folder(cls, folder: Path) -> int:
"""Очищает папку наблюдения"""
if not folder.exists():
return 0
count = 0
for file_path in folder.iterdir():
if file_path.is_file() and cls.is_photo(file_path):
try:
file_path.unlink()
count += 1
except Exception as e:
print(f"Ошибка удаления {file_path.name}: {e}")
return count

View file

@ -0,0 +1,155 @@
"""
SessionService - сервис управления сессией
"""
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable
from models.astro_object import AstroObject
from models.session import Session
from services.file_service import FileService
class SessionService:
"""Сервис для управления сессией наблюдения"""
def __init__(self):
self._current_session: Optional[Session] = None
self._file_service = FileService()
def start_session(self, watch_folder: Path, object_name: str, camera: str, optics: str) -> Session:
"""Начинает новую сессию"""
start_time = datetime.now()
self._current_session = Session(
camera=camera,
optics=optics,
start_time=start_time
)
session_folder = watch_folder / self._current_session.get_session_name()
session_folder.mkdir(parents=True, exist_ok=True)
self._current_session.session_folder = session_folder
self._log_session_event(f"Session started at {start_time}")
self._log_session_event(f"Camera: {camera}")
self._log_session_event(f"Optics: {optics}")
self.create_new_object(object_name)
return self._current_session
def create_new_object(self, object_name: str) -> AstroObject:
"""Создаёт новый объект съёмки"""
if not self._current_session:
raise ValueError("Session not started")
if self._current_session.get_current_object():
current_obj = self._current_session.get_current_object()
self._log_session_event(f"Object finished: {current_obj.name}, photos: {current_obj.photo_count}")
object_folder = self._current_session.session_folder / object_name
object_folder.mkdir(parents=True, exist_ok=True)
astro_object = AstroObject(
name=object_name,
folder=object_folder,
photo_count=0
)
self._current_session.add_object(astro_object)
self._log_session_event(f"New object: {object_name}")
return astro_object
def handle_file(self, file_path: Path) -> bool:
"""Обрабатывает новый файл"""
if not self._current_session:
return False
current_object = self._current_session.get_current_object()
if not current_object:
return False
success = self._file_service.move_file(
file_path,
current_object.folder,
self._current_session.camera,
self._current_session.optics
)
if success:
current_object.increment_photo_count()
return success
def move_remaining_files(self, watch_folder: Path, on_file_moved: Optional[Callable] = None) -> int:
"""Перемещает все существующие файлы из папки наблюдения"""
if not self._current_session:
return 0
current_object = self._current_session.get_current_object()
if not current_object:
return 0
count = 0
if watch_folder.exists():
for file_path in watch_folder.iterdir():
if file_path.is_file() and self._file_service.is_photo(file_path):
if self._file_service.move_file(
file_path,
current_object.folder,
self._current_session.camera,
self._current_session.optics
):
current_object.increment_photo_count()
count += 1
if on_file_moved:
on_file_moved(file_path)
return count
def finish_session(self) -> Session:
"""Завершает сессию"""
if not self._current_session:
raise ValueError("No active session")
self._current_session.finish()
self._log_session_event(f"Session finished at {self._current_session.end_time}")
self._log_session_event("=== SESSION SUMMARY ===")
for obj in self._current_session.objects:
self._log_session_event(f"Object: {obj.name}, Photos: {obj.photo_count}")
return self._current_session
def get_current_session(self) -> Optional[Session]:
return self._current_session
def get_current_object(self) -> Optional[AstroObject]:
if self._current_session:
return self._current_session.get_current_object()
return None
def get_current_object_folder(self) -> Optional[Path]:
current_obj = self.get_current_object()
return current_obj.folder if current_obj else None
def change_camera(self, camera: str):
if self._current_session:
self._current_session.camera = camera
self._log_session_event(f"Camera changed to: {camera}")
def change_optics(self, optics: str):
if self._current_session:
self._current_session.optics = optics
self._log_session_event(f"Optics changed to: {optics}")
def _log_session_event(self, message: str):
if self._current_session and self._current_session.session_folder:
log_file = self._current_session.session_folder / "SessionLog.txt"
timestamp = datetime.now()
line = f"{timestamp} - {message}\n"
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
print(f"Ошибка записи лога: {e}")
def is_active(self) -> bool:
return self._current_session is not None and self._current_session.is_active()

View file

@ -0,0 +1,99 @@
"""
WatchService - сервис отслеживания файлов с очередью
"""
import time
import threading
import queue
from pathlib import Path
from typing import Callable, Optional
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from services.file_service import FileService
class PhotoHandler(FileSystemEventHandler):
"""Обработчик событий файловой системы"""
def __init__(self, callback: Callable[[Path], None]):
self.callback = callback
self._pending_files = queue.Queue()
self._processing = True
self._processor_thread = threading.Thread(target=self._process_queue, daemon=True)
self._processor_thread.start()
def on_created(self, event):
if not event.is_directory:
src_path = Path(event.src_path)
if FileService.is_photo(src_path):
time.sleep(0.1) # Даём время на запись файла
self._pending_files.put(src_path)
def _process_queue(self):
while self._processing:
try:
file_path = self._pending_files.get(timeout=1)
if self.callback and file_path.exists():
self.callback(file_path)
except queue.Empty:
continue
except Exception as e:
print(f"Ошибка обработки файла: {e}")
def stop(self):
self._processing = False
class WatchService:
"""Сервис для отслеживания папки на новые файлы"""
def __init__(self):
self._observer: Optional[Observer] = None
self._event_handler: Optional[PhotoHandler] = None
self._is_running = False
def start(self, watch_folder: Path, on_new_file: Callable[[Path], None]) -> bool:
if self._is_running:
return False
if not watch_folder.exists():
return False
try:
self._event_handler = PhotoHandler(on_new_file)
self._observer = Observer()
self._observer.schedule(self._event_handler, str(watch_folder), recursive=False)
self._observer.start()
self._is_running = True
return True
except Exception as e:
print(f"Ошибка запуска отслеживания: {e}")
return False
def stop(self):
if self._observer:
self._observer.stop()
self._observer.join()
self._observer = None
if self._event_handler:
self._event_handler.stop()
self._event_handler = None
self._is_running = False
def is_running(self) -> bool:
return self._is_running
def move_all_existing_files(self, watch_folder: Path, on_file_moved: Callable[[Path], None]) -> int:
"""Перемещает все существующие файлы"""
count = 0
if watch_folder.exists():
for file_path in watch_folder.iterdir():
if file_path.is_file() and FileService.is_photo(file_path):
try:
on_file_moved(file_path)
count += 1
time.sleep(0.05)
except Exception as e:
print(f"Ошибка перемещения {file_path.name}: {e}")
return count