Astro-Session-Watcher/services/watch_service.py

119 lines
No EOL
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
WatchService - сервис отслеживания файлов с очередью
"""
import time
import threading
import queue
from pathlib import Path
from typing import Callable, Optional
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from services.file_service import FileService
class PhotoHandler(FileSystemEventHandler):
"""Обработчик событий файловой системы"""
def __init__(self, callback: Callable[[Path], None]):
self.callback = callback
self._pending_files = queue.Queue()
self._processing = True
self._processor_thread = threading.Thread(target=self._process_queue, daemon=True)
self._processor_thread.start()
def on_created(self, event):
if not event.is_directory:
src_path = Path(event.src_path)
if FileService.is_photo(src_path):
print(f"[Watchdog] Обнаружен файл: {src_path}")
time.sleep(0.1)
self._pending_files.put(src_path)
def on_modified(self, event):
"""Также обрабатываем modified, так как некоторые программы сначала создают временный файл"""
if not event.is_directory:
src_path = Path(event.src_path)
if FileService.is_photo(src_path):
print(f"[Watchdog] Изменён файл: {src_path}")
time.sleep(0.1)
self._pending_files.put(src_path)
def _process_queue(self):
while self._processing:
try:
file_path = self._pending_files.get(timeout=1)
if self.callback and file_path.exists():
self.callback(file_path)
except queue.Empty:
continue
except Exception as e:
print(f"Ошибка обработки файла: {e}")
def stop(self):
self._processing = False
class WatchService:
"""Сервис для отслеживания папки на новые файлы"""
def __init__(self):
self._observer: Optional[Observer] = None
self._event_handler: Optional[PhotoHandler] = None
self._is_running = False
def start(self, watch_folder: Path, on_new_file: Callable[[Path], None]) -> bool:
"""Запускает отслеживание папки"""
if self._is_running:
print("Watcher already running")
return False
if not watch_folder.exists():
print(f"Папка не существует: {watch_folder}")
return False
try:
print(f"Запуск отслеживания папки: {watch_folder}")
self._event_handler = PhotoHandler(on_new_file)
self._observer = Observer()
self._observer.schedule(self._event_handler, str(watch_folder), recursive=False)
self._observer.start()
self._is_running = True
print(f"Отслеживание успешно запущено для: {watch_folder}")
return True
except Exception as e:
print(f"Ошибка запуска отслеживания: {e}")
import traceback
traceback.print_exc()
return False
def stop(self):
"""Останавливает отслеживание"""
print("Остановка отслеживания...")
if self._observer:
self._observer.stop()
self._observer.join()
self._observer = None
if self._event_handler:
self._event_handler.stop()
self._event_handler = None
self._is_running = False
print("Отслеживание остановлено")
def is_running(self) -> bool:
return self._is_running
def move_all_existing_files(self, watch_folder: Path, on_file_moved: Callable[[Path], None]) -> int:
"""Перемещает все существующие файлы из папки наблюдения"""
count = 0
if watch_folder.exists():
for file_path in watch_folder.iterdir():
if file_path.is_file() and FileService.is_photo(file_path):
try:
on_file_moved(file_path)
count += 1
time.sleep(0.05)
except Exception as e:
print(f"Ошибка перемещения {file_path.name}: {e}")
return count