Astro-Session-Watcher/services/session_service.py

159 lines
No EOL
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SessionService - сервис управления сессией
"""
from datetime import datetime
from pathlib import Path
from typing import Optional, Callable
from models.astro_object import AstroObject
from models.session import Session
from services.file_service import FileService
class SessionService:
"""Сервис для управления сессией наблюдения"""
def __init__(self):
self._current_session: Optional[Session] = None
self._file_service = FileService()
def start_session(self, watch_folder: Path, object_name: str, camera: str, optics: str) -> Session:
"""Начинает новую сессию"""
start_time = datetime.now()
self._current_session = Session(
camera=camera,
optics=optics,
start_time=start_time
)
session_folder = watch_folder / self._current_session.get_session_name()
session_folder.mkdir(parents=True, exist_ok=True)
self._current_session.session_folder = session_folder
self._log_session_event(f"Session started at {start_time}")
self._log_session_event(f"Camera: {camera}")
self._log_session_event(f"Optics: {optics}")
self.create_new_object(object_name)
return self._current_session
def create_new_object(self, object_name: str) -> AstroObject:
"""Создаёт новый объект съёмки"""
if not self._current_session:
raise ValueError("Session not started")
if self._current_session.get_current_object():
current_obj = self._current_session.get_current_object()
self._log_session_event(f"Object finished: {current_obj.name}, photos: {current_obj.photo_count}")
object_folder = self._current_session.session_folder / object_name
object_folder.mkdir(parents=True, exist_ok=True)
astro_object = AstroObject(
name=object_name,
folder=object_folder,
photo_count=0
)
self._current_session.add_object(astro_object)
self._log_session_event(f"New object: {object_name}")
return astro_object
def handle_file(self, file_path: Path) -> bool:
"""Обрабатывает новый файл: перемещает в папку текущего объекта с переименованием"""
if not self._current_session:
return False
current_object = self._current_session.get_current_object()
if not current_object:
return False
# Передаём имя объекта для переименования файла
success = self._file_service.move_file(
file_path,
current_object.folder,
self._current_session.camera,
self._current_session.optics,
current_object.name # Добавляем имя объекта для переименования
)
if success:
current_object.increment_photo_count()
return success
def move_remaining_files(self, watch_folder: Path, on_file_moved: Optional[Callable] = None) -> int:
"""Перемещает все существующие файлы из папки наблюдения"""
if not self._current_session:
return 0
current_object = self._current_session.get_current_object()
if not current_object:
return 0
count = 0
if watch_folder.exists():
for file_path in watch_folder.iterdir():
if file_path.is_file() and self._file_service.is_photo(file_path):
success = self._file_service.move_file(
file_path,
current_object.folder,
self._current_session.camera,
self._current_session.optics,
current_object.name # Добавляем имя объекта для переименования
)
if success:
current_object.increment_photo_count()
count += 1
if on_file_moved:
on_file_moved(file_path)
return count
def finish_session(self) -> Session:
"""Завершает сессию"""
if not self._current_session:
raise ValueError("No active session")
self._current_session.finish()
self._log_session_event(f"Session finished at {self._current_session.end_time}")
self._log_session_event("=== SESSION SUMMARY ===")
for obj in self._current_session.objects:
self._log_session_event(f"Object: {obj.name}, Photos: {obj.photo_count}")
return self._current_session
def get_current_session(self) -> Optional[Session]:
return self._current_session
def get_current_object(self) -> Optional[AstroObject]:
if self._current_session:
return self._current_session.get_current_object()
return None
def get_current_object_folder(self) -> Optional[Path]:
current_obj = self.get_current_object()
return current_obj.folder if current_obj else None
def change_camera(self, camera: str):
if self._current_session:
self._current_session.camera = camera
self._log_session_event(f"Camera changed to: {camera}")
def change_optics(self, optics: str):
if self._current_session:
self._current_session.optics = optics
self._log_session_event(f"Optics changed to: {optics}")
def _log_session_event(self, message: str):
if self._current_session and self._current_session.session_folder:
log_file = self._current_session.session_folder / "SessionLog.txt"
timestamp = datetime.now()
line = f"{timestamp} - {message}\n"
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
print(f"Ошибка записи лога: {e}")
def is_active(self) -> bool:
return self._current_session is not None and self._current_session.is_active()