Astro-Session-Watcher/services/file_service.py

126 lines
5.1 KiB
Python
Raw Normal View History

2026-05-07 17:15:56 +03:00
"""
FileService - сервис для работы с файлами
"""
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional
class FileService:
"""Сервис для перемещения файлов и ведения логов"""
SUPPORTED_EXTENSIONS = {'.cr2', '.dng', '.arw', '.jpg', '.jpeg', '.png', '.raw', '.tiff'}
@classmethod
def is_photo(cls, file_path: Path) -> bool:
"""Проверяет, является ли файл фотографией"""
return file_path.suffix.lower() in cls.SUPPORTED_EXTENSIONS
@classmethod
def resolve_conflict(cls, target_path: Path) -> Path:
"""Разрешает конфликт имён файлов"""
if not target_path.exists():
return target_path
counter = 1
stem = target_path.stem
suffix = target_path.suffix
parent = target_path.parent
while True:
new_name = f"{stem}_{counter}{suffix}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
@classmethod
def generate_new_filename(cls, object_name: str, timestamp: datetime, original_suffix: str) -> str:
"""
Генерирует новое имя файла в формате:
ИмяОбъекта_ГГГГ-ММ-ДД_ЧЧ-ММ-СС.расширение
"""
# Очищаем имя объекта от недопустимых символов
safe_object_name = "".join(c for c in object_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_object_name = safe_object_name.replace(' ', '_')
# Форматируем дату и время
date_str = timestamp.strftime("%Y-%m-%d")
time_str = timestamp.strftime("%H-%M-%S")
return f"{safe_object_name}_{date_str}_{time_str}{original_suffix}"
2026-05-07 17:15:56 +03:00
@classmethod
def write_object_log(cls, folder: Path, filename: str, camera: str, optics: str,
timestamp: Optional[datetime] = None) -> None:
"""Записывает запись в лог объекта"""
if timestamp is None:
timestamp = datetime.now()
log_file = folder / "ObjectLog.txt"
line = f"{timestamp} {camera if camera else 'Unknown'} {optics if optics else 'Unknown'} - {filename}\n"
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
print(f"Ошибка записи лога: {e}")
@classmethod
def move_file(cls, source: Path, target_folder: Path, camera: str, optics: str,
object_name: str = None) -> bool:
"""
Перемещает файл в целевую папку с переименованием
Если object_name указан, файл переименовывается в формат: ИмяОбъектаатаремя.расширение
"""
2026-05-07 17:15:56 +03:00
if not source.exists():
print(f"Файл не существует: {source}")
2026-05-07 17:15:56 +03:00
return False
if not cls.is_photo(source):
print(f"Неподдерживаемый формат: {source}")
2026-05-07 17:15:56 +03:00
return False
try:
target_folder.mkdir(parents=True, exist_ok=True)
# Получаем время создания файла
2026-05-07 17:15:56 +03:00
creation_time = datetime.fromtimestamp(source.stat().st_ctime)
# Генерируем новое имя файла, если указано имя объекта
if object_name:
new_filename = cls.generate_new_filename(object_name, creation_time, source.suffix)
else:
new_filename = source.name
# Записываем в лог (сохраняем оригинальное имя в логе для отслеживания)
cls.write_object_log(target_folder, f"{source.name} -> {new_filename}", camera, optics, creation_time)
# Формируем целевой путь
target_path = cls.resolve_conflict(target_folder / new_filename)
# Перемещаем файл
2026-05-07 17:15:56 +03:00
shutil.move(str(source), str(target_path))
print(f"Файл перемещён и переименован: {source.name} -> {target_path.name}")
2026-05-07 17:15:56 +03:00
return True
2026-05-07 17:15:56 +03:00
except Exception as e:
print(f"Ошибка перемещения файла {source.name}: {e}")
2026-05-07 17:15:56 +03:00
return False
@classmethod
def clear_watch_folder(cls, folder: Path) -> int:
"""Очищает папку наблюдения"""
if not folder.exists():
return 0
count = 0
for file_path in folder.iterdir():
if file_path.is_file() and cls.is_photo(file_path):
try:
file_path.unlink()
count += 1
except Exception as e:
print(f"Ошибка удаления {file_path.name}: {e}")
return count