HelioParser/main.py
2026-06-10 12:25:03 +03:00

449 lines
No EOL
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox, filedialog
import requests
from datetime import datetime
import os
import json
# --- Модель (Model) ---
class HelioviewerModel:
"""
Модель отвечает за взаимодействие с Helioviewer API.
Она не зависит от интерфейса и содержит только бизнес-логику.
"""
BASE_URL = "https://api.helioviewer.org/v1/"
def __init__(self):
self.datasources = {} # Словарь для хранения доступных источников: {source_id: описание}
# Используем предустановленный список популярных каналов вместо динамической загрузки
self.init_default_sources()
def init_default_sources(self):
"""Инициализирует список популярных источников данных."""
self.datasources = {
# SDO (Solar Dynamics Observatory)
14: "🌞 SDO - AIA 335 (Fe XVI, корона, 2 млн K)",
13: "🔥 SDO - AIA 304 (He II, хромосфера, протуберанцы)",
12: "🌊 SDO - AIA 211 (Fe XIV, активные области)",
11: "💥 SDO - AIA 193 (Fe XII, корональные выбросы массы)",
10: "🌀 SDO - AIA 171 (Fe IX, спокойная корона, петли)",
9: "⚡ SDO - AIA 131 (Fe VIII, вспышечная плазма)",
8: "❄️ SDO - AIA 94 (Fe XVIII, горячие вспышки)",
# SOHO (Solar and Heliospheric Observatory)
0: "👁️ SOHO - EIT 171 (Fe IX/X)",
2: "🟡 SOHO - EIT 284 (Fe XV)",
4: "🌑 SOHO - LASCO C2 (коронограф, видимый свет)",
5: "🌘 SOHO - LASCO C3 (коронограф, широкое поле)",
# STEREO A и B
16: "⭐ STEREO A - EUVI 195",
17: "⭐ STEREO A - EUVI 171",
18: "⭐ STEREO A - EUVI 304",
19: "⭐ STEREO A - COR1",
20: "⭐ STEREO A - COR2",
# Дополнительные каналы
1: "📊 SOHO - MDI (магнитограмма)",
3: "🌡️ SOHO - EIT 304 (He II)",
}
def load_datasources_from_api(self):
"""Пытается загрузить актуальный список источников из API (экспериментально)."""
url = f"{self.BASE_URL}getDataSources/"
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
# API может вернуть JSON или строку
if response.headers.get('content-type', '').startswith('application/json'):
data = response.json()
if isinstance(data, list):
new_sources = {}
for source in data:
if isinstance(source, dict) and 'sourceId' in source:
source_id = source.get('sourceId')
# Формируем читаемое описание
name = f"{source.get('observatory', '?')} - {source.get('instrument', '?')}"
measurement = source.get('measurement', '')
if measurement:
name += f" - {measurement}"
new_sources[source_id] = name
if new_sources:
self.datasources = new_sources
return True
except Exception as e:
print(f"Не удалось загрузить источники из API: {e}")
return False
def get_jp2_image(self, source_id, date, jpip_link=False):
"""
Получает JP2 изображение или JPIP ссылку на него.
Args:
source_id (int): ID источника данных.
date (datetime): Желаемая дата и время снимка.
jpip_link (bool): Если True, возвращает JPIP-ссылку (строку).
Если False, возвращает бинарные данные изображения.
Returns:
bytes or str: Данные изображения или JPIP-ссылка. None в случае ошибки.
"""
# Форматируем дату в ISO 8601 UTC, как требует API
formatted_date = date.strftime("%Y-%m-%dT%H:%M:%SZ")
if jpip_link:
# Для JPIP ссылки используем отдельный эндпоинт
url = f"{self.BASE_URL}getJPIPClosest/"
params = {
'sourceId': source_id,
'date': formatted_date
}
else:
# Для скачивания изображения
url = f"{self.BASE_URL}getJP2Image/"
params = {
'sourceId': source_id,
'date': formatted_date
}
try:
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
if jpip_link:
# Возвращаем текст (JPIP ссылку)
return response.text.strip()
else:
# Возвращаем бинарные данные изображения
return response.content
except requests.exceptions.RequestException as e:
print(f"Ошибка при загрузке: {e}")
return None
def get_available_sources(self):
"""Возвращает словарь доступных источников данных."""
return self.datasources
# --- Представление (View) ---
class HelioviewerView:
"""
Представление отвечает за графический интерфейс пользователя (GUI).
"""
def __init__(self, root, controller):
self.controller = controller
self.root = root
self.root.title("Helioviewer Солнечный загрузчик")
self.root.geometry("900x700")
self.root.resizable(True, True)
# Устанавливаем иконку (опционально)
try:
self.root.iconbitmap(default='icon.ico')
except:
pass
# Стили
style = ttk.Style()
style.theme_use('clam')
# Настройка цветов (темная тема для астрономического приложения)
self.root.configure(bg='#2b2b2b')
style.configure('TLabel', background='#2b2b2b', foreground='white')
style.configure('TFrame', background='#2b2b2b')
style.configure('TLabelframe', background='#2b2b2b', foreground='white')
style.configure('TLabelframe.Label', background='#2b2b2b', foreground='white')
style.configure('TButton', background='#3c3c3c', foreground='white')
style.configure('TCombobox', fieldbackground='#3c3c3c', foreground='white')
# Основной фрейм с прокруткой
self.canvas = tk.Canvas(root, bg='#2b2b2b', highlightthickness=0)
scrollbar = ttk.Scrollbar(root, orient="vertical", command=self.canvas.yview)
self.scrollable_frame = ttk.Frame(self.canvas)
self.scrollable_frame.bind(
"<Configure>",
lambda e: self.canvas.configure(scrollregion=self.canvas.bbox("all"))
)
self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw")
self.canvas.configure(yscrollcommand=scrollbar.set)
# Показываем прокрутку только если нужно
self.canvas.pack(side="left", fill="both", expand=True)
scrollbar.pack(side="right", fill="y")
self.canvas.bind_all("<MouseWheel>", self._on_mousewheel)
main_frame = self.scrollable_frame
# Заголовок
title_label = ttk.Label(main_frame, text="🌞 Helioviewer Солнечный загрузчик",
font=('Arial', 16, 'bold'))
title_label.pack(pady=10)
# 1. Фрейм для выбора источника данных
source_frame = ttk.LabelFrame(main_frame, text="📡 1. Выбор спектрального канала", padding="10")
source_frame.pack(fill=tk.X, pady=(0, 10), padx=10)
# Создаем фрейм с прокруткой для списка каналов
source_list_frame = ttk.Frame(source_frame)
source_list_frame.pack(fill=tk.BOTH, expand=True)
# Текстовая метка
ttk.Label(source_list_frame, text="Доступные инструменты и спектры:").pack(anchor=tk.W, pady=(0, 5))
# Список с прокруткой для выбора канала
list_frame = ttk.Frame(source_list_frame)
list_frame.pack(fill=tk.BOTH, expand=True)
scrollbar_list = ttk.Scrollbar(list_frame)
scrollbar_list.pack(side=tk.RIGHT, fill=tk.Y)
self.source_listbox = tk.Listbox(list_frame, yscrollcommand=scrollbar_list.set,
height=10, bg='#3c3c3c', fg='white',
selectmode=tk.SINGLE, font=('Consolas', 9))
self.source_listbox.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar_list.config(command=self.source_listbox.yview)
# Привязываем выбор
self.source_listbox.bind('<<ListboxSelect>>', self.on_source_selected)
# Кнопка обновления
refresh_button = ttk.Button(source_frame, text="🔄 Обновить список",
command=self.controller.refresh_sources)
refresh_button.pack(pady=(10, 0))
# 2. Фрейм для выбора даты
date_frame = ttk.LabelFrame(main_frame, text="📅 2. Выбор даты и времени (UTC)", padding="10")
date_frame.pack(fill=tk.X, pady=(0, 10), padx=10)
# Дата
date_inner_frame = ttk.Frame(date_frame)
date_inner_frame.pack(fill=tk.X, pady=5)
ttk.Label(date_inner_frame, text="Дата (ГГГГ-ММ-ДД):").pack(side=tk.LEFT, padx=(0, 5))
self.date_entry = ttk.Entry(date_inner_frame, width=15)
self.date_entry.pack(side=tk.LEFT, padx=(0, 10))
self.date_entry.insert(0, datetime.now().strftime("%Y-%m-%d"))
# Кнопка "Сегодня"
today_button = ttk.Button(date_inner_frame, text="Сегодня",
command=self.set_today_date)
today_button.pack(side=tk.LEFT)
# Время
time_inner_frame = ttk.Frame(date_frame)
time_inner_frame.pack(fill=tk.X, pady=5)
ttk.Label(time_inner_frame, text="Время (ЧЧ:ММ:СС):").pack(side=tk.LEFT, padx=(0, 5))
self.time_entry = ttk.Entry(time_inner_frame, width=15)
self.time_entry.pack(side=tk.LEFT)
self.time_entry.insert(0, "12:00:00")
# Кнопка "Сейчас"
now_button = ttk.Button(time_inner_frame, text="Сейчас",
command=self.set_now_time)
now_button.pack(side=tk.LEFT, padx=(10, 0))
ttk.Label(time_inner_frame, text=" (Время в UTC)",
font=('TkDefaultFont', 8, 'italic')).pack(side=tk.LEFT, padx=(5, 0))
# 3. Фрейм для кнопок действий
action_frame = ttk.LabelFrame(main_frame, text="⚡ 3. Действия", padding="10")
action_frame.pack(fill=tk.X, pady=(0, 10), padx=10)
button_frame = ttk.Frame(action_frame)
button_frame.pack()
self.download_button = ttk.Button(button_frame, text="💾 Скачать изображение (JP2)",
command=self.controller.download_image,
width=25)
self.download_button.pack(side=tk.LEFT, padx=5)
self.get_jpip_button = ttk.Button(button_frame, text="🔗 Получить JPIP ссылку",
command=self.controller.get_jpip_link,
width=25)
self.get_jpip_button.pack(side=tk.LEFT, padx=5)
# 4. Текстовая область для вывода информации
info_frame = ttk.LabelFrame(main_frame, text="📝 4. Информация / JPIP ссылка", padding="10")
info_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10), padx=10)
self.info_text = tk.Text(info_frame, height=10, wrap=tk.WORD,
bg='#1e1e1e', fg='#00ff00',
selectbackground='#004400')
scrollbar_info = ttk.Scrollbar(info_frame, orient=tk.VERTICAL, command=self.info_text.yview)
self.info_text.configure(yscrollcommand=scrollbar_info.set)
scrollbar_info.pack(side=tk.RIGHT, fill=tk.Y)
self.info_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
# Статус бар
self.status_var = tk.StringVar()
self.status_var.set("🌙 Готов. Выберите спектральный канал и дату.")
status_bar = ttk.Label(root, textvariable=self.status_var, relief=tk.SUNKEN,
anchor=tk.W, padding=(5, 3))
status_bar.pack(side=tk.BOTTOM, fill=tk.X)
# Выделенный ID источника
self.selected_source_id = None
def _on_mousewheel(self, event):
self.canvas.yview_scroll(int(-1 * (event.delta / 120)), "units")
def on_source_selected(self, event):
selection = self.source_listbox.curselection()
if selection:
item_text = self.source_listbox.get(selection[0])
if ":" in item_text:
try:
self.selected_source_id = int(item_text.split(":", 1)[0])
self.update_status(f"Выбран канал: {item_text}")
except ValueError:
self.selected_source_id = None
def set_sources(self, sources_dict):
"""Обновляет список источников."""
self.source_listbox.delete(0, tk.END)
if not sources_dict:
self.source_listbox.insert(tk.END, "Нет доступных источников")
return
# Сортируем по ID
for src_id in sorted(sources_dict.keys()):
display_text = f"{src_id}: {sources_dict[src_id]}"
self.source_listbox.insert(tk.END, display_text)
if self.source_listbox.size() > 0:
self.source_listbox.selection_set(0)
self.on_source_selected(None)
def set_today_date(self):
self.date_entry.delete(0, tk.END)
self.date_entry.insert(0, datetime.now().strftime("%Y-%m-%d"))
def set_now_time(self):
self.time_entry.delete(0, tk.END)
self.time_entry.insert(0, datetime.now().strftime("%H:%M:%S"))
def get_selected_source_id(self):
return self.selected_source_id
def get_selected_datetime(self):
"""Возвращает datetime объект из введенных пользователем даты и времени."""
date_str = self.date_entry.get()
time_str = self.time_entry.get()
datetime_str = f"{date_str} {time_str}"
try:
return datetime.strptime(datetime_str, "%Y-%m-%d %H:%M:%S")
except ValueError:
messagebox.showerror("Ошибка ввода",
"Неверный формат даты или времени.\nИспользуйте ГГГГ-ММ-ДД и ЧЧ:ММ:СС.")
return None
def show_info(self, message, is_error=False):
"""Показывает сообщение в текстовой области."""
timestamp = datetime.now().strftime("%H:%M:%S")
if is_error:
self.info_text.insert(tk.END, f"[{timestamp}] ❌ {message}\n", 'error')
self.info_text.tag_config('error', foreground='red')
else:
self.info_text.insert(tk.END, f"[{timestamp}] ✅ {message}\n", 'success')
self.info_text.tag_config('success', foreground='#00ff00')
self.info_text.see(tk.END)
def clear_info(self):
"""Очищает текстовую область."""
self.info_text.delete(1.0, tk.END)
def update_status(self, message):
"""Обновляет текст в статус-баре."""
self.status_var.set(message)
def ask_save_filename(self, default_name="helioviewer_image.jp2"):
"""Открывает диалог для выбора пути сохранения файла."""
filetypes = [("JPEG2000 files", "*.jp2"), ("All files", "*.*")]
return filedialog.asksaveasfilename(defaultextension=".jp2",
filetypes=filetypes,
initialfile=default_name)
# --- Контроллер (Controller) ---
class HelioviewerController:
"""
Контроллер связывает Model и View.
"""
def __init__(self, root):
self.model = HelioviewerModel()
self.view = HelioviewerView(root, self)
self.refresh_sources() # Инициализируем список источников
def refresh_sources(self):
"""Обновляет список источников."""
self.view.update_status("🔄 Загрузка списка источников...")
sources = self.model.get_available_sources()
self.view.set_sources(sources)
self.view.update_status(f"✅ Загружено {len(sources)} спектральных каналов")
self.view.show_info(f"Загружено {len(sources)} источников данных")
def _perform_action(self, get_jpip):
"""Общая логика для скачивания или получения ссылки."""
source_id = self.view.get_selected_source_id()
if source_id is None:
messagebox.showwarning("Нет источника", "Пожалуйста, выберите спектральный канал.")
return
date_time = self.view.get_selected_datetime()
if date_time is None:
return
action_name = "JPIP ссылки" if get_jpip else "изображения"
self.view.update_status(f"📡 Запрос {action_name} для канала {source_id}...")
self.view.show_info(f"Запрашиваю {action_name} для {date_time} (канал {source_id})")
result = self.model.get_jp2_image(source_id, date_time, jpip_link=get_jpip)
if result is None:
self.view.update_status(f"Не удалось получить {action_name}.")
self.view.show_info(f"Не удалось получить {action_name}", is_error=True)
return
if get_jpip:
# Показываем JPIP ссылку
self.view.show_info(f"JPIP ссылка получена:\n{result}")
self.view.update_status("✅ JPIP ссылка получена. Можно вставить в JHelioviewer")
else:
# Сохраняем изображение
default_name = f"solar_{source_id}_{date_time.strftime('%Y%m%d_%H%M%S')}.jp2"
filename = self.view.ask_save_filename(default_name)
if filename:
try:
with open(filename, 'wb') as f:
f.write(result)
file_size = len(result) / 1024 # размер в КБ
self.view.show_info(f"Изображение сохранено: {filename}\nРазмер: {file_size:.1f} KB")
self.view.update_status(f"✅ Изображение сохранено: {os.path.basename(filename)}")
except IOError as e:
self.view.show_info(f"Ошибка сохранения: {e}", is_error=True)
self.view.update_status("❌ Ошибка сохранения")
else:
self.view.update_status("Сохранение отменено")
def download_image(self):
"""Скачивает изображение (JP2)"""
self._perform_action(get_jpip=False)
def get_jpip_link(self):
"""Получает JPIP ссылку"""
self._perform_action(get_jpip=True)
# --- Точка входа в программу ---
if __name__ == "__main__":
root = tk.Tk()
app = HelioviewerController(root)
root.mainloop()