Astro-Session-Watcher/services/file_service.py

126 lines
No EOL
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FileService - сервис для работы с файлами
"""
import shutil
from datetime import datetime
from pathlib import Path
from typing import Optional
class FileService:
"""Сервис для перемещения файлов и ведения логов"""
SUPPORTED_EXTENSIONS = {'.cr2', '.dng', '.arw', '.jpg', '.jpeg', '.png', '.raw', '.tiff'}
@classmethod
def is_photo(cls, file_path: Path) -> bool:
"""Проверяет, является ли файл фотографией"""
return file_path.suffix.lower() in cls.SUPPORTED_EXTENSIONS
@classmethod
def resolve_conflict(cls, target_path: Path) -> Path:
"""Разрешает конфликт имён файлов"""
if not target_path.exists():
return target_path
counter = 1
stem = target_path.stem
suffix = target_path.suffix
parent = target_path.parent
while True:
new_name = f"{stem}_{counter}{suffix}"
new_path = parent / new_name
if not new_path.exists():
return new_path
counter += 1
@classmethod
def generate_new_filename(cls, object_name: str, timestamp: datetime, original_suffix: str) -> str:
"""
Генерирует новое имя файла в формате:
ИмяОбъекта_ГГГГ-ММ-ДД_ЧЧ-ММ-СС.расширение
"""
# Очищаем имя объекта от недопустимых символов
safe_object_name = "".join(c for c in object_name if c.isalnum() or c in (' ', '-', '_')).strip()
safe_object_name = safe_object_name.replace(' ', '_')
# Форматируем дату и время
date_str = timestamp.strftime("%Y-%m-%d")
time_str = timestamp.strftime("%H-%M-%S")
return f"{safe_object_name}_{date_str}_{time_str}{original_suffix}"
@classmethod
def write_object_log(cls, folder: Path, filename: str, camera: str, optics: str,
timestamp: Optional[datetime] = None) -> None:
"""Записывает запись в лог объекта"""
if timestamp is None:
timestamp = datetime.now()
log_file = folder / "ObjectLog.txt"
line = f"{timestamp} {camera if camera else 'Unknown'} {optics if optics else 'Unknown'} - {filename}\n"
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(line)
except Exception as e:
print(f"Ошибка записи лога: {e}")
@classmethod
def move_file(cls, source: Path, target_folder: Path, camera: str, optics: str,
object_name: str = None) -> bool:
"""
Перемещает файл в целевую папку с переименованием
Если object_name указан, файл переименовывается в формат: ИмяОбъектаатаремя.расширение
"""
if not source.exists():
print(f"Файл не существует: {source}")
return False
if not cls.is_photo(source):
print(f"Неподдерживаемый формат: {source}")
return False
try:
target_folder.mkdir(parents=True, exist_ok=True)
# Получаем время создания файла
creation_time = datetime.fromtimestamp(source.stat().st_ctime)
# Генерируем новое имя файла, если указано имя объекта
if object_name:
new_filename = cls.generate_new_filename(object_name, creation_time, source.suffix)
else:
new_filename = source.name
# Записываем в лог (сохраняем оригинальное имя в логе для отслеживания)
cls.write_object_log(target_folder, f"{source.name} -> {new_filename}", camera, optics, creation_time)
# Формируем целевой путь
target_path = cls.resolve_conflict(target_folder / new_filename)
# Перемещаем файл
shutil.move(str(source), str(target_path))
print(f"Файл перемещён и переименован: {source.name} -> {target_path.name}")
return True
except Exception as e:
print(f"Ошибка перемещения файла {source.name}: {e}")
return False
@classmethod
def clear_watch_folder(cls, folder: Path) -> int:
"""Очищает папку наблюдения"""
if not folder.exists():
return 0
count = 0
for file_path in folder.iterdir():
if file_path.is_file() and cls.is_photo(file_path):
try:
file_path.unlink()
count += 1
except Exception as e:
print(f"Ошибка удаления {file_path.name}: {e}")
return count