fuck yeah!

This commit is contained in:
Vic Sergeev 2026-06-10 17:33:12 +03:00
parent ccb53d9091
commit da10f5e132
44 changed files with 3260 additions and 448 deletions

10
controllers/__init__.py Normal file
View file

@ -0,0 +1,10 @@
# controllers/__init__.py
from controllers.app_controller import AppController
from controllers.layer_controller import LayerController
from controllers.timelapse_controller import TimelapseController
__all__ = [
'AppController',
'LayerController',
'TimelapseController'
]

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,191 @@
"""
Главный контроллер приложения - связывает модель и представление
"""
from PySide6.QtCore import QObject, Signal
from datetime import datetime
from pathlib import Path
import numpy as np
from models.api_model import HelioviewerAPI
from models.image_model import ImageModel
from views.main_window import MainWindow
from utils.image_processor import ImageProcessor
from utils.metadata_parser import MetadataParser
class AppController(QObject):
"""Главный контроллер приложения"""
def __init__(self):
super().__init__()
self.image_model = ImageModel()
self.api = HelioviewerAPI()
self.main_window = None
self.timelapse_controller = None
# Подключаем сигналы модели
self.image_model.layer_added.connect(self.on_layer_added)
self.image_model.layer_removed.connect(self.on_layer_removed)
self.image_model.layer_visibility_changed.connect(self.on_layer_visibility_changed)
self.image_model.layer_opacity_changed.connect(self.on_layer_opacity_changed)
self.image_model.layer_selected.connect(self.on_layer_selected)
# Папка для сохранения по умолчанию
self.download_folder = Path.home() / "SolarImages"
self.download_folder.mkdir(exist_ok=True)
def show_main_window(self):
"""Показывает главное окно"""
self.main_window = MainWindow(self)
self.main_window.show()
# Подключаем сигналы от панели управления
control_panel = self.main_window.get_control_panel()
control_panel.load_image_requested.connect(self.load_image_from_api)
def load_image_from_api(self, source_id: int, date: datetime):
"""Загружает изображение из API"""
self.main_window.update_status(f"Загрузка изображения: {date.strftime('%Y-%m-%d %H:%M')} UTC")
# Скачиваем изображение
filepath = self.api.download_image(source_id, date, self.download_folder)
if filepath:
# Загружаем данные изображения
img_data = ImageProcessor.load_jp2(str(filepath))
if img_data is not None:
# Извлекаем метаданные
metadata = MetadataParser.extract_metadata(str(filepath))
# Получаем информацию о спектре
source_info = self.api.SOURCES.get(source_id, {})
wavelength = source_info.get("wavelength", "Unknown")
# Добавляем слой в модель
layer_id = self.image_model.add_layer(
filepath, source_id, date, wavelength, img_data, metadata
)
self.main_window.update_status(f"✓ Загружено: {filepath.name}")
else:
self.main_window.update_status("✗ Ошибка обработки изображения")
else:
self.main_window.update_status("✗ Ошибка загрузки изображения")
def on_layer_added(self, layer):
"""Обработчик добавления слоя"""
self.main_window.update_layer_list(self.image_model.get_all_layers())
self.main_window.canvas.set_image(layer.id, layer.image_data)
def on_layer_removed(self, layer_id):
"""Обработчик удаления слоя"""
self.main_window.layer_widget.remove_layer(layer_id)
self.main_window.canvas.remove_layer(layer_id)
if not self.image_model.get_all_layers():
self.main_window.metadata_viewer.clear()
def on_layer_visibility_changed(self, layer_id, visible):
"""Обработчик изменения видимости слоя"""
self.main_window.canvas.set_layer_visibility(layer_id, visible)
def on_layer_opacity_changed(self, layer_id, opacity):
"""Обработчик изменения прозрачности слоя"""
self.main_window.canvas.set_layer_opacity(layer_id, opacity)
def on_layer_selected(self, layer_id):
"""Обработчик выбора слоя"""
for layer in self.image_model.get_all_layers():
if layer.id == layer_id:
if layer.metadata:
self.main_window.metadata_viewer.display_metadata(layer.metadata)
break
def set_layer_visibility(self, layer_id, visible):
self.image_model.set_layer_visibility(layer_id, visible)
def set_layer_opacity(self, layer_id, opacity):
self.image_model.set_layer_opacity(layer_id, opacity)
def clear_all_layers(self):
self.image_model.clear()
self.main_window.canvas.clear_all_layers()
self.main_window.metadata_viewer.clear()
# Добавьте эти методы в класс AppController:
def create_timelapse(self, source_id: int, start_date: datetime,
end_date: datetime, output_path: Path,
fps: int = 10, output_format: str = "mp4"):
"""Создает таймлапс"""
from controllers.timelapse_controller import TimelapseController
self.timelapse_controller = TimelapseController()
# Подключаем сигналы
self.timelapse_controller.progress_updated.connect(self.on_timelapse_progress)
self.timelapse_controller.log_message.connect(self.on_timelapse_log)
self.timelapse_controller.finished.connect(self.on_timelapse_finished)
# Запускаем
self.timelapse_controller.create_timelapse(
source_id, start_date, end_date, output_path, fps, output_format
)
def cancel_timelapse(self):
"""Отменяет создание таймлапса"""
if hasattr(self, 'timelapse_controller'):
self.timelapse_controller.cancel()
def on_timelapse_progress(self, current, total, message):
"""Прогресс таймлапса"""
if self.main_window:
self.main_window.update_status(f"Таймлапс: {message}")
def on_timelapse_log(self, message):
"""Лог таймлапса"""
print(f"[Timelapse] {message}")
def on_timelapse_finished(self, success, message):
"""Завершение таймлапса"""
if self.main_window:
if success:
self.main_window.update_status(f"✅ Таймлапс создан")
from PySide6.QtWidgets import QMessageBox
QMessageBox.information(self.main_window, "Готово", f"Таймлапс сохранен:\n{message}")
else:
self.main_window.update_status(f"❌ Ошибка: {message}")
# Добавьте метод:
def create_timelapse(self, source_id, start_date, end_date, output_path, fps=10, output_format="mp4"):
"""Создает таймлапс"""
print(f"[DEBUG] AppController.create_timelapse вызван")
from controllers.timelapse_controller import TimelapseController
self.timelapse_controller = TimelapseController()
# Подключаем сигналы
self.timelapse_controller.progress_updated.connect(self.on_timelapse_progress)
self.timelapse_controller.log_message.connect(self.on_timelapse_log)
self.timelapse_controller.finished.connect(self.on_timelapse_finished)
# Запускаем
self.timelapse_controller.create_timelapse(
source_id, start_date, end_date, output_path, fps, output_format
)
print(f"[DEBUG] TimelapseController создан и запущен")
def on_timelapse_log(self, message):
"""Лог таймлапса"""
print(f"[TIMELAPSE] {message}")
if self.main_window:
self.main_window.update_status(f"Таймлапс: {message}")
def create_timelapse(self, source_id, start_date, end_date, output_path, fps=10, output_format="mp4"):
"""Создает таймлапс - заглушка, реальный вызов из диалога"""
# Реальная реализация в диалоге
pass
def cancel_timelapse(self):
pass

View file

@ -0,0 +1,384 @@
"""
Контроллер для управления слоями изображений
Отвечает за бизнес-логику работы со слоями: композитинг, обработку, трансформации
"""
from PySide6.QtCore import QObject, Signal, Qt
from typing import List, Dict, Optional
from datetime import datetime # ← ДОБАВИТЬ ЭТУ СТРОКУ
import numpy as np
from pathlib import Path
from models.image_model import ImageModel, ImageLayer
from utils.image_processor import ImageProcessor
from utils.metadata_parser import MetadataParser
class LayerController(QObject):
"""
Контроллер для управления слоями изображений
Single Responsibility: только логика работы со слоями
"""
# Сигналы для оповещения View
layer_loaded = Signal(object) # ImageLayer
layer_updated = Signal(int) # layer_id
composite_updated = Signal(np.ndarray) # скомпозированное изображение
processing_started = Signal(str) # сообщение
processing_finished = Signal(bool) # успех/неудача
def __init__(self, image_model: ImageModel):
super().__init__()
self.image_model = image_model
self._processing_thread = None
# Подключаемся к сигналам модели
self.image_model.layer_added.connect(self.on_layer_added)
self.image_model.layer_removed.connect(self.on_layer_removed)
self.image_model.layer_visibility_changed.connect(self.on_visibility_changed)
self.image_model.layer_opacity_changed.connect(self.on_opacity_changed)
def load_layer_from_file(self, filepath: Path, source_id: int = None,
wavelength: str = None) -> Optional[int]:
"""
Загружает слой из файла
Args:
filepath: Путь к JP2 файлу
source_id: ID источника (опционально)
wavelength: Длина волны (опционально)
Returns:
ID созданного слоя или None
"""
self.processing_started.emit(f"Загрузка: {filepath.name}")
try:
# Загружаем изображение
img_data = ImageProcessor.load_jp2(str(filepath))
if img_data is None:
self.processing_finished.emit(False)
return None
# Извлекаем метаданные
metadata = MetadataParser.extract_metadata(str(filepath))
# Извлекаем информацию из метаданных если не передана
if source_id is None and metadata:
# Пытаемся определить инструмент из метаданных
instrument = metadata.get('INSTRUME', '')
wavelength_val = metadata.get('WAVELNTH', '')
# Можно добавить логику определения source_id по инструменту
source_id = self._guess_source_id(instrument, wavelength_val)
if wavelength is None and metadata:
wavelength = metadata.get('WAVELNTH', 'Unknown')
# Получаем дату из метаданных или из имени файла
date = self._extract_date_from_metadata(metadata)
if date is None:
date = datetime.now()
# Добавляем слой в модель
layer_id = self.image_model.add_layer(
filepath, source_id or 0, date, wavelength or "Unknown",
img_data, metadata
)
self.processing_finished.emit(True)
return layer_id
except Exception as e:
print(f"Ошибка загрузки слоя: {e}")
self.processing_finished.emit(False)
return None
def apply_color_map(self, layer_id: int, color_map: str):
"""
Применяет цветовую карту к слою
Args:
layer_id: ID слоя
color_map: Название цветовой карты
"""
layer = self._get_layer_by_id(layer_id)
if layer and layer.image_data is not None:
# Применяем цветовую карту к черно-белому изображению
if len(layer.image_data.shape) == 2 or layer.image_data.shape[2] == 1:
colored = ImageProcessor.apply_color_map(
layer.image_data if len(layer.image_data.shape) == 2 else layer.image_data[:, :, 0],
color_map
)
layer.image_data = colored
self.layer_updated.emit(layer_id)
self._update_composite()
def adjust_brightness_contrast(self, layer_id: int, brightness: float = 0,
contrast: float = 1.0):
"""
Регулирует яркость и контраст слоя
Args:
layer_id: ID слоя
brightness: Смещение яркости (-127 до 127)
contrast: Коэффициент контраста (0.5 до 2.0)
"""
layer = self._get_layer_by_id(layer_id)
if layer and layer.image_data is not None:
img = layer.image_data.astype(float)
# Применяем контраст и яркость
img = img * contrast + brightness
# Клиппируем и конвертируем обратно в uint8
img = np.clip(img, 0, 255).astype(np.uint8)
layer.image_data = img
self.layer_updated.emit(layer_id)
self._update_composite()
def auto_adjust_levels(self, layer_id: int, low_percent: float = 0.5,
high_percent: float = 99.5):
"""
Автоматическая настройка уровней (перцентильная нормализация)
Args:
layer_id: ID слоя
low_percent: Нижний процент отсечения
high_percent: Верхний процент отсечения
"""
layer = self._get_layer_by_id(layer_id)
if layer and layer.image_data is not None:
# Для черно-белых изображений
if len(layer.image_data.shape) == 2:
img = layer.image_data
normalized = ImageProcessor.percent_normalize(img, low_percent, high_percent)
layer.image_data = normalized
else:
# Для цветных - обрабатываем каждый канал отдельно
for c in range(layer.image_data.shape[2]):
channel = layer.image_data[:, :, c]
normalized = ImageProcessor.percent_normalize(channel, low_percent, high_percent)
layer.image_data[:, :, c] = normalized
self.layer_updated.emit(layer_id)
self._update_composite()
def merge_visible_layers(self) -> Optional[np.ndarray]:
"""
Объединяет все видимые слои в одно изображение
Returns:
Скомпозированное изображение или None
"""
visible_layers = self.image_model.get_visible_layers()
if not visible_layers:
return None
# Собираем данные и прозрачности
layers_data = []
opacities = []
for layer in visible_layers:
if layer.image_data is not None:
layers_data.append(layer.image_data)
opacities.append(layer.opacity)
if layers_data:
# Приводим все слои к одному размеру (берем минимальный)
min_height = min(img.shape[0] for img in layers_data)
min_width = min(img.shape[1] for img in layers_data)
# Обрезаем или ресайзим слои до минимального размера
resized_layers = []
for img in layers_data:
if img.shape[0] != min_height or img.shape[1] != min_width:
from PIL import Image
pil_img = Image.fromarray(img)
pil_img = pil_img.resize((min_width, min_height), Image.Resampling.LANCZOS)
resized_layers.append(np.array(pil_img))
else:
resized_layers.append(img)
# Композитим слои
composite = ImageProcessor.composite_layers(resized_layers, opacities)
self.composite_updated.emit(composite)
return composite
return None
def export_composite(self, filepath: Path, format: str = "PNG"):
"""
Экспортирует скомпозированное изображение в файл
Args:
filepath: Путь для сохранения
format: Формат (PNG, JPEG, TIFF)
"""
composite = self.merge_visible_layers()
if composite is not None:
from PIL import Image
pil_img = Image.fromarray(composite)
pil_img.save(filepath, format=format)
return True
return False
def align_layers(self, reference_layer_id: int = None):
"""
Выравнивает все слои относительно опорного слоя
Использует фазовую корреляцию для сдвига изображений
Args:
reference_layer_id: ID опорного слоя (если None - используется первый видимый)
"""
visible_layers = self.image_model.get_visible_layers()
if len(visible_layers) < 2:
return
# Определяем опорный слой
if reference_layer_id:
reference = self._get_layer_by_id(reference_layer_id)
else:
reference = visible_layers[0]
if not reference or reference.image_data is None:
return
self.processing_started.emit("Выравнивание слоев...")
# Для каждого слоя вычисляем сдвиг относительно опорного
for layer in visible_layers:
if layer.id == reference.id or layer.image_data is None:
continue
# Вычисляем сдвиг с помощью фазовой корреляции
shift = self._calculate_shift(reference.image_data, layer.image_data)
if shift != (0, 0):
# Сдвигаем изображение
layer.image_data = self._shift_image(layer.image_data, shift)
self.layer_updated.emit(layer.id)
self.processing_finished.emit(True)
self._update_composite()
def _calculate_shift(self, ref_img: np.ndarray, target_img: np.ndarray) -> tuple:
"""
Вычисляет сдвиг между двумя изображениями с помощью фазовой корреляции
Args:
ref_img: Опорное изображение
target_img: Целевое изображение
Returns:
Сдвиг в пикселях (dy, dx)
"""
# Преобразуем в grayscale если цветные
if len(ref_img.shape) == 3:
ref_gray = np.mean(ref_img, axis=2)
else:
ref_gray = ref_img
if len(target_img.shape) == 3:
target_gray = np.mean(target_img, axis=2)
else:
target_gray = target_img
# Вычисляем кросс-корреляцию через FFT
from scipy import signal
correlation = signal.correlate2d(ref_gray, target_gray, mode='same')
# Находим пик корреляции
y, x = np.unravel_index(np.argmax(correlation), correlation.shape)
# Центр изображения
center_y, center_x = np.array(ref_gray.shape) // 2
# Сдвиг
shift_y = center_y - y
shift_x = center_x - x
return (shift_y, shift_x)
def _shift_image(self, img: np.ndarray, shift: tuple) -> np.ndarray:
"""
Сдвигает изображение на заданное количество пикселей
Args:
img: Исходное изображение
shift: Сдвиг (dy, dx)
Returns:
Сдвинутое изображение
"""
from scipy.ndimage import shift as shift_image
shifted = shift_image(img, shift, mode='constant', cval=0)
return shifted.astype(img.dtype)
def _get_layer_by_id(self, layer_id: int) -> Optional[ImageLayer]:
"""Возвращает слой по ID"""
for layer in self.image_model.get_all_layers():
if layer.id == layer_id:
return layer
return None
def _guess_source_id(self, instrument: str, wavelength: str) -> int:
"""Пытается определить source_id по инструменту и длине волны"""
from models.api_model import HelioviewerAPI
instrument = instrument.upper()
wavelength = wavelength.upper()
for source_id, info in HelioviewerAPI.SOURCES.items():
if info['instrument'].upper() in instrument:
if wavelength and info['wavelength'].upper() in wavelength:
return source_id
return 0
def _extract_date_from_metadata(self, metadata: Dict) -> Optional[datetime]:
"""Извлекает дату из метаданных"""
date_str = metadata.get('DATE-OBS') or metadata.get('DATE-BEG')
if date_str:
try:
# Пробуем разные форматы дат
formats = [
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d"
]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
except:
pass
return None
def _update_composite(self):
"""Обновляет композитное изображение"""
composite = self.merge_visible_layers()
if composite is not None:
self.composite_updated.emit(composite)
def on_layer_added(self, layer: ImageLayer):
"""Обработчик добавления слоя"""
self._update_composite()
def on_layer_removed(self, layer_id: int):
"""Обработчик удаления слоя"""
self._update_composite()
def on_visibility_changed(self, layer_id: int, visible: bool):
"""Обработчик изменения видимости"""
self._update_composite()
def on_opacity_changed(self, layer_id: int, opacity: float):
"""Обработчик изменения прозрачности"""
self._update_composite()

View file

@ -0,0 +1,117 @@
from PySide6.QtCore import QObject, QThread, Signal
from datetime import datetime, timedelta
from pathlib import Path
import shutil
import tempfile
from models.api_model import HelioviewerAPI
from utils.video_creator import VideoCreator
class TimelapseWorker(QThread):
log = Signal(str)
progress = Signal(int, int)
finished = Signal(bool, str)
def __init__(self, source_id, start_date, end_date, output_path, fps=10):
super().__init__()
self.source_id = source_id
self.start_date = start_date
self.end_date = end_date
self.output_path = output_path
self.fps = fps
self.cancelled = False
self.temp_dir = None
def cancel(self):
self.cancelled = True
self.log.emit("⏹️ Отмена процесса...")
def run(self):
try:
# Создаем временную папку
self.temp_dir = Path(tempfile.mkdtemp(prefix="timelapse_"))
self.log.emit(f"📁 Временная папка: {self.temp_dir}")
# Генерируем даты
dates = []
current = self.start_date.replace(hour=12, minute=0, second=0)
while current <= self.end_date:
dates.append(current)
current += timedelta(days=1)
total = len(dates)
self.log.emit(f"📊 Всего файлов: {total}")
downloaded = []
# Скачиваем
for i, date in enumerate(dates):
if self.cancelled:
self.cleanup()
self.finished.emit(False, "Отменено")
return
percent = int((i + 1) / total * 100)
self.progress.emit(i + 1, total)
self.log.emit(f"📥 [{i + 1}/{total}] {percent}% - {date.strftime('%Y-%m-%d')}")
filepath = HelioviewerAPI.download_image(self.source_id, date, self.temp_dir)
if filepath:
downloaded.append(filepath)
self.log.emit(f"✅ [{i + 1}/{total}] Успешно")
else:
self.log.emit(f"❌ [{i + 1}/{total}] Ошибка")
if self.cancelled:
self.cleanup()
self.finished.emit(False, "Отменено")
return
if not downloaded:
self.cleanup()
self.finished.emit(False, "Нет файлов")
return
# Создаем видео
self.log.emit("🎬 Создание видео...")
self.progress.emit(total, total)
video_path = VideoCreator.create_timelapse(downloaded, self.output_path, self.fps)
# Очистка
self.cleanup()
if video_path and video_path.exists():
self.finished.emit(True, str(video_path))
else:
self.finished.emit(False, "Ошибка создания видео")
except Exception as e:
self.cleanup()
self.finished.emit(False, str(e))
def cleanup(self):
if self.temp_dir and self.temp_dir.exists():
shutil.rmtree(self.temp_dir)
self.log.emit("🗑️ Временные файлы удалены")
class TimelapseController(QObject):
def __init__(self):
super().__init__()
self.worker = None
self.log = Signal(str)
self.progress = Signal(int, int)
self.finished = Signal(bool, str)
def create(self, source_id, start_date, end_date, output_path, fps=10):
self.worker = TimelapseWorker(source_id, start_date, end_date, output_path, fps)
self.worker.log.connect(self.log)
self.worker.progress.connect(self.progress)
self.worker.finished.connect(self.finished)
self.worker.start()
return self.worker
def cancel(self):
if self.worker:
self.worker.cancel()