384 lines
15 KiB
Python
384 lines
15 KiB
Python
|
|
"""
|
|||
|
|
Контроллер для управления слоями изображений
|
|||
|
|
Отвечает за бизнес-логику работы со слоями: композитинг, обработку, трансформации
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from PySide6.QtCore import QObject, Signal, Qt
|
|||
|
|
from typing import List, Dict, Optional
|
|||
|
|
from datetime import datetime # ← ДОБАВИТЬ ЭТУ СТРОКУ
|
|||
|
|
import numpy as np
|
|||
|
|
from pathlib import Path
|
|||
|
|
|
|||
|
|
from models.image_model import ImageModel, ImageLayer
|
|||
|
|
from utils.image_processor import ImageProcessor
|
|||
|
|
from utils.metadata_parser import MetadataParser
|
|||
|
|
|
|||
|
|
|
|||
|
|
class LayerController(QObject):
|
|||
|
|
"""
|
|||
|
|
Контроллер для управления слоями изображений
|
|||
|
|
Single Responsibility: только логика работы со слоями
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
# Сигналы для оповещения View
|
|||
|
|
layer_loaded = Signal(object) # ImageLayer
|
|||
|
|
layer_updated = Signal(int) # layer_id
|
|||
|
|
composite_updated = Signal(np.ndarray) # скомпозированное изображение
|
|||
|
|
processing_started = Signal(str) # сообщение
|
|||
|
|
processing_finished = Signal(bool) # успех/неудача
|
|||
|
|
|
|||
|
|
def __init__(self, image_model: ImageModel):
|
|||
|
|
super().__init__()
|
|||
|
|
self.image_model = image_model
|
|||
|
|
self._processing_thread = None
|
|||
|
|
|
|||
|
|
# Подключаемся к сигналам модели
|
|||
|
|
self.image_model.layer_added.connect(self.on_layer_added)
|
|||
|
|
self.image_model.layer_removed.connect(self.on_layer_removed)
|
|||
|
|
self.image_model.layer_visibility_changed.connect(self.on_visibility_changed)
|
|||
|
|
self.image_model.layer_opacity_changed.connect(self.on_opacity_changed)
|
|||
|
|
|
|||
|
|
def load_layer_from_file(self, filepath: Path, source_id: int = None,
|
|||
|
|
wavelength: str = None) -> Optional[int]:
|
|||
|
|
"""
|
|||
|
|
Загружает слой из файла
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
filepath: Путь к JP2 файлу
|
|||
|
|
source_id: ID источника (опционально)
|
|||
|
|
wavelength: Длина волны (опционально)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
ID созданного слоя или None
|
|||
|
|
"""
|
|||
|
|
self.processing_started.emit(f"Загрузка: {filepath.name}")
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# Загружаем изображение
|
|||
|
|
img_data = ImageProcessor.load_jp2(str(filepath))
|
|||
|
|
if img_data is None:
|
|||
|
|
self.processing_finished.emit(False)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# Извлекаем метаданные
|
|||
|
|
metadata = MetadataParser.extract_metadata(str(filepath))
|
|||
|
|
|
|||
|
|
# Извлекаем информацию из метаданных если не передана
|
|||
|
|
if source_id is None and metadata:
|
|||
|
|
# Пытаемся определить инструмент из метаданных
|
|||
|
|
instrument = metadata.get('INSTRUME', '')
|
|||
|
|
wavelength_val = metadata.get('WAVELNTH', '')
|
|||
|
|
|
|||
|
|
# Можно добавить логику определения source_id по инструменту
|
|||
|
|
source_id = self._guess_source_id(instrument, wavelength_val)
|
|||
|
|
|
|||
|
|
if wavelength is None and metadata:
|
|||
|
|
wavelength = metadata.get('WAVELNTH', 'Unknown')
|
|||
|
|
|
|||
|
|
# Получаем дату из метаданных или из имени файла
|
|||
|
|
date = self._extract_date_from_metadata(metadata)
|
|||
|
|
if date is None:
|
|||
|
|
date = datetime.now()
|
|||
|
|
|
|||
|
|
# Добавляем слой в модель
|
|||
|
|
layer_id = self.image_model.add_layer(
|
|||
|
|
filepath, source_id or 0, date, wavelength or "Unknown",
|
|||
|
|
img_data, metadata
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
self.processing_finished.emit(True)
|
|||
|
|
return layer_id
|
|||
|
|
|
|||
|
|
except Exception as e:
|
|||
|
|
print(f"Ошибка загрузки слоя: {e}")
|
|||
|
|
self.processing_finished.emit(False)
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def apply_color_map(self, layer_id: int, color_map: str):
|
|||
|
|
"""
|
|||
|
|
Применяет цветовую карту к слою
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
layer_id: ID слоя
|
|||
|
|
color_map: Название цветовой карты
|
|||
|
|
"""
|
|||
|
|
layer = self._get_layer_by_id(layer_id)
|
|||
|
|
if layer and layer.image_data is not None:
|
|||
|
|
# Применяем цветовую карту к черно-белому изображению
|
|||
|
|
if len(layer.image_data.shape) == 2 or layer.image_data.shape[2] == 1:
|
|||
|
|
colored = ImageProcessor.apply_color_map(
|
|||
|
|
layer.image_data if len(layer.image_data.shape) == 2 else layer.image_data[:, :, 0],
|
|||
|
|
color_map
|
|||
|
|
)
|
|||
|
|
layer.image_data = colored
|
|||
|
|
self.layer_updated.emit(layer_id)
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def adjust_brightness_contrast(self, layer_id: int, brightness: float = 0,
|
|||
|
|
contrast: float = 1.0):
|
|||
|
|
"""
|
|||
|
|
Регулирует яркость и контраст слоя
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
layer_id: ID слоя
|
|||
|
|
brightness: Смещение яркости (-127 до 127)
|
|||
|
|
contrast: Коэффициент контраста (0.5 до 2.0)
|
|||
|
|
"""
|
|||
|
|
layer = self._get_layer_by_id(layer_id)
|
|||
|
|
if layer and layer.image_data is not None:
|
|||
|
|
img = layer.image_data.astype(float)
|
|||
|
|
|
|||
|
|
# Применяем контраст и яркость
|
|||
|
|
img = img * contrast + brightness
|
|||
|
|
|
|||
|
|
# Клиппируем и конвертируем обратно в uint8
|
|||
|
|
img = np.clip(img, 0, 255).astype(np.uint8)
|
|||
|
|
layer.image_data = img
|
|||
|
|
|
|||
|
|
self.layer_updated.emit(layer_id)
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def auto_adjust_levels(self, layer_id: int, low_percent: float = 0.5,
|
|||
|
|
high_percent: float = 99.5):
|
|||
|
|
"""
|
|||
|
|
Автоматическая настройка уровней (перцентильная нормализация)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
layer_id: ID слоя
|
|||
|
|
low_percent: Нижний процент отсечения
|
|||
|
|
high_percent: Верхний процент отсечения
|
|||
|
|
"""
|
|||
|
|
layer = self._get_layer_by_id(layer_id)
|
|||
|
|
if layer and layer.image_data is not None:
|
|||
|
|
# Для черно-белых изображений
|
|||
|
|
if len(layer.image_data.shape) == 2:
|
|||
|
|
img = layer.image_data
|
|||
|
|
normalized = ImageProcessor.percent_normalize(img, low_percent, high_percent)
|
|||
|
|
layer.image_data = normalized
|
|||
|
|
else:
|
|||
|
|
# Для цветных - обрабатываем каждый канал отдельно
|
|||
|
|
for c in range(layer.image_data.shape[2]):
|
|||
|
|
channel = layer.image_data[:, :, c]
|
|||
|
|
normalized = ImageProcessor.percent_normalize(channel, low_percent, high_percent)
|
|||
|
|
layer.image_data[:, :, c] = normalized
|
|||
|
|
|
|||
|
|
self.layer_updated.emit(layer_id)
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def merge_visible_layers(self) -> Optional[np.ndarray]:
|
|||
|
|
"""
|
|||
|
|
Объединяет все видимые слои в одно изображение
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Скомпозированное изображение или None
|
|||
|
|
"""
|
|||
|
|
visible_layers = self.image_model.get_visible_layers()
|
|||
|
|
|
|||
|
|
if not visible_layers:
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
# Собираем данные и прозрачности
|
|||
|
|
layers_data = []
|
|||
|
|
opacities = []
|
|||
|
|
|
|||
|
|
for layer in visible_layers:
|
|||
|
|
if layer.image_data is not None:
|
|||
|
|
layers_data.append(layer.image_data)
|
|||
|
|
opacities.append(layer.opacity)
|
|||
|
|
|
|||
|
|
if layers_data:
|
|||
|
|
# Приводим все слои к одному размеру (берем минимальный)
|
|||
|
|
min_height = min(img.shape[0] for img in layers_data)
|
|||
|
|
min_width = min(img.shape[1] for img in layers_data)
|
|||
|
|
|
|||
|
|
# Обрезаем или ресайзим слои до минимального размера
|
|||
|
|
resized_layers = []
|
|||
|
|
for img in layers_data:
|
|||
|
|
if img.shape[0] != min_height or img.shape[1] != min_width:
|
|||
|
|
from PIL import Image
|
|||
|
|
pil_img = Image.fromarray(img)
|
|||
|
|
pil_img = pil_img.resize((min_width, min_height), Image.Resampling.LANCZOS)
|
|||
|
|
resized_layers.append(np.array(pil_img))
|
|||
|
|
else:
|
|||
|
|
resized_layers.append(img)
|
|||
|
|
|
|||
|
|
# Композитим слои
|
|||
|
|
composite = ImageProcessor.composite_layers(resized_layers, opacities)
|
|||
|
|
self.composite_updated.emit(composite)
|
|||
|
|
return composite
|
|||
|
|
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def export_composite(self, filepath: Path, format: str = "PNG"):
|
|||
|
|
"""
|
|||
|
|
Экспортирует скомпозированное изображение в файл
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
filepath: Путь для сохранения
|
|||
|
|
format: Формат (PNG, JPEG, TIFF)
|
|||
|
|
"""
|
|||
|
|
composite = self.merge_visible_layers()
|
|||
|
|
if composite is not None:
|
|||
|
|
from PIL import Image
|
|||
|
|
pil_img = Image.fromarray(composite)
|
|||
|
|
pil_img.save(filepath, format=format)
|
|||
|
|
return True
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def align_layers(self, reference_layer_id: int = None):
|
|||
|
|
"""
|
|||
|
|
Выравнивает все слои относительно опорного слоя
|
|||
|
|
Использует фазовую корреляцию для сдвига изображений
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
reference_layer_id: ID опорного слоя (если None - используется первый видимый)
|
|||
|
|
"""
|
|||
|
|
visible_layers = self.image_model.get_visible_layers()
|
|||
|
|
|
|||
|
|
if len(visible_layers) < 2:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# Определяем опорный слой
|
|||
|
|
if reference_layer_id:
|
|||
|
|
reference = self._get_layer_by_id(reference_layer_id)
|
|||
|
|
else:
|
|||
|
|
reference = visible_layers[0]
|
|||
|
|
|
|||
|
|
if not reference or reference.image_data is None:
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
self.processing_started.emit("Выравнивание слоев...")
|
|||
|
|
|
|||
|
|
# Для каждого слоя вычисляем сдвиг относительно опорного
|
|||
|
|
for layer in visible_layers:
|
|||
|
|
if layer.id == reference.id or layer.image_data is None:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
# Вычисляем сдвиг с помощью фазовой корреляции
|
|||
|
|
shift = self._calculate_shift(reference.image_data, layer.image_data)
|
|||
|
|
|
|||
|
|
if shift != (0, 0):
|
|||
|
|
# Сдвигаем изображение
|
|||
|
|
layer.image_data = self._shift_image(layer.image_data, shift)
|
|||
|
|
self.layer_updated.emit(layer.id)
|
|||
|
|
|
|||
|
|
self.processing_finished.emit(True)
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def _calculate_shift(self, ref_img: np.ndarray, target_img: np.ndarray) -> tuple:
|
|||
|
|
"""
|
|||
|
|
Вычисляет сдвиг между двумя изображениями с помощью фазовой корреляции
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
ref_img: Опорное изображение
|
|||
|
|
target_img: Целевое изображение
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Сдвиг в пикселях (dy, dx)
|
|||
|
|
"""
|
|||
|
|
# Преобразуем в grayscale если цветные
|
|||
|
|
if len(ref_img.shape) == 3:
|
|||
|
|
ref_gray = np.mean(ref_img, axis=2)
|
|||
|
|
else:
|
|||
|
|
ref_gray = ref_img
|
|||
|
|
|
|||
|
|
if len(target_img.shape) == 3:
|
|||
|
|
target_gray = np.mean(target_img, axis=2)
|
|||
|
|
else:
|
|||
|
|
target_gray = target_img
|
|||
|
|
|
|||
|
|
# Вычисляем кросс-корреляцию через FFT
|
|||
|
|
from scipy import signal
|
|||
|
|
correlation = signal.correlate2d(ref_gray, target_gray, mode='same')
|
|||
|
|
|
|||
|
|
# Находим пик корреляции
|
|||
|
|
y, x = np.unravel_index(np.argmax(correlation), correlation.shape)
|
|||
|
|
|
|||
|
|
# Центр изображения
|
|||
|
|
center_y, center_x = np.array(ref_gray.shape) // 2
|
|||
|
|
|
|||
|
|
# Сдвиг
|
|||
|
|
shift_y = center_y - y
|
|||
|
|
shift_x = center_x - x
|
|||
|
|
|
|||
|
|
return (shift_y, shift_x)
|
|||
|
|
|
|||
|
|
def _shift_image(self, img: np.ndarray, shift: tuple) -> np.ndarray:
|
|||
|
|
"""
|
|||
|
|
Сдвигает изображение на заданное количество пикселей
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
img: Исходное изображение
|
|||
|
|
shift: Сдвиг (dy, dx)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Сдвинутое изображение
|
|||
|
|
"""
|
|||
|
|
from scipy.ndimage import shift as shift_image
|
|||
|
|
|
|||
|
|
shifted = shift_image(img, shift, mode='constant', cval=0)
|
|||
|
|
return shifted.astype(img.dtype)
|
|||
|
|
|
|||
|
|
def _get_layer_by_id(self, layer_id: int) -> Optional[ImageLayer]:
|
|||
|
|
"""Возвращает слой по ID"""
|
|||
|
|
for layer in self.image_model.get_all_layers():
|
|||
|
|
if layer.id == layer_id:
|
|||
|
|
return layer
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _guess_source_id(self, instrument: str, wavelength: str) -> int:
|
|||
|
|
"""Пытается определить source_id по инструменту и длине волны"""
|
|||
|
|
from models.api_model import HelioviewerAPI
|
|||
|
|
|
|||
|
|
instrument = instrument.upper()
|
|||
|
|
wavelength = wavelength.upper()
|
|||
|
|
|
|||
|
|
for source_id, info in HelioviewerAPI.SOURCES.items():
|
|||
|
|
if info['instrument'].upper() in instrument:
|
|||
|
|
if wavelength and info['wavelength'].upper() in wavelength:
|
|||
|
|
return source_id
|
|||
|
|
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
def _extract_date_from_metadata(self, metadata: Dict) -> Optional[datetime]:
|
|||
|
|
"""Извлекает дату из метаданных"""
|
|||
|
|
date_str = metadata.get('DATE-OBS') or metadata.get('DATE-BEG')
|
|||
|
|
if date_str:
|
|||
|
|
try:
|
|||
|
|
# Пробуем разные форматы дат
|
|||
|
|
formats = [
|
|||
|
|
"%Y-%m-%dT%H:%M:%S.%f",
|
|||
|
|
"%Y-%m-%dT%H:%M:%S",
|
|||
|
|
"%Y-%m-%d %H:%M:%S",
|
|||
|
|
"%Y-%m-%d"
|
|||
|
|
]
|
|||
|
|
for fmt in formats:
|
|||
|
|
try:
|
|||
|
|
return datetime.strptime(date_str, fmt)
|
|||
|
|
except ValueError:
|
|||
|
|
continue
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def _update_composite(self):
|
|||
|
|
"""Обновляет композитное изображение"""
|
|||
|
|
composite = self.merge_visible_layers()
|
|||
|
|
if composite is not None:
|
|||
|
|
self.composite_updated.emit(composite)
|
|||
|
|
|
|||
|
|
def on_layer_added(self, layer: ImageLayer):
|
|||
|
|
"""Обработчик добавления слоя"""
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def on_layer_removed(self, layer_id: int):
|
|||
|
|
"""Обработчик удаления слоя"""
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def on_visibility_changed(self, layer_id: int, visible: bool):
|
|||
|
|
"""Обработчик изменения видимости"""
|
|||
|
|
self._update_composite()
|
|||
|
|
|
|||
|
|
def on_opacity_changed(self, layer_id: int, opacity: float):
|
|||
|
|
"""Обработчик изменения прозрачности"""
|
|||
|
|
self._update_composite()
|