import numpy as np
import ndcube
import astropy.wcs
import astropy.units as u
import astropy.coordinates
from astropy.coordinates import SkyCoord, SpectralCoord
from astropy.wcs.wcsapi import SlicedLowLevelWCS, HighLevelWCSWrapper
import matplotlib.pyplot as plt
from . import plotting
from matplotlib.widgets import Slider, Button
def make_def_wcs(naxis=3, ctype=None, cunit=None):
"""
Generate a default WCS object
Parameters
-----------
naxis: `int`
Number of axes that the NDCube will have.
ctype: `tuple`
Tuple of strings containing the axes types.
cunit: `tuple`
Tuple of strings containing the units for each axes. Must have the same number of elements as ctype.
"""
wcs = astropy.wcs.WCS(naxis=naxis)
wcs.wcs.ctype = ctype
wcs.wcs.cunit = cunit
wcs.wcs.set()
return wcs
class StokesParamCube(ndcube.ndcube.NDCube):
"""Class representing a 2D map of a single Stokes profile with dimensions (wavelength, coord1, coord2)."""
def __init__(self, data, wcs, **kwargs):
# Init base NDCube with data and wcs
super(StokesParamCube, self).__init__(data, wcs=wcs, **kwargs)
self.n_spectral = None
self._spectral_axis = None
if self.wcs.pixel_n_dim == 3:
self.n_spectral = self.data.shape[0]
self._spectral_axis = self._spectral_slice().array_index_to_world_values(np.arange(self.n_spectral)) * u.Quantity(1, self.wcs.world_axis_units[2])
def _spectral_slice(self):
"""Slice of the WCS containing only the spectral axis"""
wcs_slice = [0] * self.wcs.pixel_n_dim
wcs_slice[0] = slice(0, self.n_spectral)
wcs_slice = SlicedLowLevelWCS(self.wcs.low_level_wcs, wcs_slice)
return wcs_slice
def get_wav_ind(self,wavelength):
"""Test if a wavelength is inside the wavelength axis for the object and return the array index corresponding to that wavelength """
# Test if the value is either an integer index or astropy quantity
if isinstance(wavelength, int):
ix = wavelength
if (ix < 0) or (ix > self.n_spectral-1):
ix = 0 if ix < 0 else (self.n_spectral-1)
print('Warning: Wavelength selected outside of axis range: {} {}'.\
format(self._spectral_axis[0], self._spectral_axis[-1]))
print('Defaulting to nearest wavelength at {}'.\
format(self._spectral_axis[ix]))
elif isinstance(wavelength,u.Quantity):
# Unit check the input wavelength
wav = wavelength.to(self._spectral_slice().world_axis_units[0])
ix = int(self._spectral_slice().world_to_array_index_values((wav.value,))[0])
# Check that the wavelength is inside the cube spectral
# axis range.
if (ix < 0) or (ix > self.n_spectral-1):
ix = 0 if ix < 0 else (self.n_spectral-1)
print('Warning: Wavelength selected outside of axis range: {} {}'.\
format(self._spectral_axis[0], self._spectral_axis[-1]))
print('Defaulting to nearest wavelength at {}'.\
format(self._spectral_axis[ix]))
else:
print('Warning: the quantity entered must be either an Astropy quantity with a unit of length or an integer index value.')
pass
return ix
def plot(self, plot_u=u.nm, wavelength=None, **kwargs):
if wavelength is None:
# Default to the first wavelength.
ix = 0
else:
ix = self.get_wav_ind(wavelength)
wav_slider = plotting._plot_3d_cube(self._spectral_axis, self.data, plot_u, proj=self[ix,:,:].wcs, meta=self.meta, init=ix, origin='lower', **kwargs)
return wav_slider
class StokesParamMap(ndcube.ndcube.NDCube):
"""Class representing a 2D map of bandpass intensities of a single Stokes parameter
with dimensions (coord1, coord2).
"""
def __init__(self, data, wcs, **kwargs):
# Init base NDCube with data and wcs
super().__init__(data, wcs=wcs, **kwargs)
self.wav0 = None
self.wav1 = None
if 'wav0' in self.meta:
self.wav0 = self.meta['wav0']
if 'wav1' in self.meta:
self.wav1 = self.meta['wav1']
if 'stokes' in self.meta:
self.stokes = self.meta['stokes']
def average(self):
"Return StokesProfile over a pixel area."
pass
def plot(self, plot_u=u.nm, **kwargs):
"""Plot a map of bandpass intensities"""
plot_wav0 = round(self.meta['wav0'].to(plot_u).value,3)
plot_title = ''
if 'inst' in self.meta.keys():
plot_title += self.meta['inst'] + ' '
if self.meta['wav1'] is None:
plot_title += 'Stokes ' + self.meta['stokes'] + '\n $\lambda$ = ' + str(plot_wav0) + ' ' + str(plot_u)
else:
plot_wav1 = round(self.meta['wav1'].to(plot_u).value,3)
plot_title += 'Stokes ' + self.meta['stokes'] + '\n $\lambda\in$ [' + str(plot_wav0) + ', ' + str(plot_wav1) + '] ' + str(plot_u)
plotting._plot_image(self.data, proj=self.wcs, meta=self.meta, origin='lower', plot_title=plot_title, **kwargs)
class StokesProfile(ndcube.ndcube.NDCube):
"""Class representing a profile of a single Stokes parameter with dimensions (wavelength)
"""
def __init__(self, data, wcs, **kwargs):
# Init base NDCube with data and wcs
super().__init__(data, wcs=wcs, **kwargs)
# Define spectral_axis attribute from WCS
self.n_spectral = self.data.shape[0]
self._spectral_axis = self._spectral_slice().array_index_to_world_values(np.arange(self.n_spectral)) * u.Quantity(1, self.wcs.world_axis_units[0])
#print(self.wcs)
def _spectral_slice(self):
"""Slice of the WCS containing only the spectral axis"""
# Assume only a single spectral dimension.
return self.wcs.low_level_wcs
def plot(self, plot_u=u.nm, **kwargs):
""" Single panel plot showing the dispersed spectrum"""
plotting._plot_profile(self._spectral_axis, self.data, plot_u, meta=self.meta, **kwargs)
[docs]class StokesCube(ndcube.ndcube.NDCube):
"""
Class representing a 2D map of Stokes profiles with dimensions (stokes, wavelength, coord1, coord2).
Parameters
----------
data: `numpy.ndarray`
The array holding the actual data in this object. The array index order must be
(stokes, wavelength, coord1, coord2).
wcs: `astropy.wcs.wcsapi.BaseLowLevelWCS`, `astropy.wcs.wcsapi.BaseHighLevelWCS`, optional
The WCS object containing the axes' information. If not provided, a WCS is constructed
using `wavelength_unit` and `coordinate_unit`, which default to pixels.
stokes_params: `tuple` of `str`
Tuple containing all or part of ('I', 'Q', 'U', 'V') defining the number and kind of
Stokes parameters available.
normalize: `bool`
Normalization for polarization parameters Q, U, V. If `True`, then polarization parameters
are normalized by the intensity at each wavelength, e.g. Q => Q/I. If a non-zero scalar is
provided then that will be used as the normalization, e.g. for a chosen continuum intensity.
Additional kwargs are passed to the `NDCube` base class.
"""
def __init__(self, data, wcs=None, stokes_params=('I', 'Q', 'U', 'V'), normalize=False, **kwargs):
if wcs is None:
# Define a default WCS where coordinates and wavelength axis are
# in pixel units. Note: cannot use "WAVE" ctype;
# astropy.wcs.WCS enforces length units for that name
wcs = make_def_wcs(naxis=4, ctype=["COORD2", "COORD1", "WAVEIX", "STOKES"],
cunit=['pix', 'pix', 'pix', ''])
# Init base NDCube with data and wcs
super().__init__(data, wcs=wcs, **kwargs)
self.normalize = normalize
self._frame = None
self.n_spectral = None
self._spectral_axis = None
self._stokes_axis = None
if self.wcs.pixel_n_dim == 4:
# Check and define Stokes axis.
if len(stokes_params) != self.data.shape[0]:
raise Exception(f"Data contains {self.data.shape[0]} Stokes parameters, "+
f"but {len(stokes_params)} parameters were expected: {stokes_params}")
self._stokes_axis = stokes_params
# TODO: stokes index map for N params < 4; use below
# Define spectral_axis attribute from WCS
self.n_spectral = self.data.shape[1]
self._spectral_axis = self._spectral_slice().array_index_to_world_values(np.arange(self.n_spectral)) * u.Quantity(1, self.wcs.world_axis_units[2])
# Define the observer frame if it exists.
tmp_pixel = np.zeros(self.wcs.pixel_n_dim, dtype='int')
tmp_world = self.wcs.pixel_to_world(*tmp_pixel)
if hasattr(tmp_world[0], 'frame'):
self.meta['frame'] = tmp_world[0].frame
self._frame = self.meta['frame']
@property
def stokes_axis(self):
"""The available Stokes parameters"""
return self._stokes_axis
@property
def spectral_axis(self):
"""The spectral axis in physical units"""
return self._spectral_axis
@property
def frame(self):
"The observed frame for the dat if it exists"
return self._frame
def _spectral_slice(self):
"""Slice of the WCS containing only the spectral axis"""
wcs_slice = [0] * self.wcs.pixel_n_dim
wcs_slice[1] = slice(0, self.n_spectral)
wcs_slice = SlicedLowLevelWCS(self.wcs.low_level_wcs, wcs_slice)
return wcs_slice
[docs] def coord1_axis(self, coord2):
"""The physical axis across the first spatial dimension"""
# TODO: allow coord2 to be None assuming uniform coord1, return 1D array structure
n_coord1 = self.data.shape[2]
return self[0,0,:,coord2].wcs.array_index_to_world(np.arange(n_coord1))
[docs] def coord2_axis(self, coord1):
"""The physical axis across the second spatial dimension"""
# TODO: allow coord1 to be None assuming uniform coord2, return 1D array structure
n_coord2 = self.data.shape[3]
return self[0,0,coord1,:].wcs.array_index_to_world(np.arange(n_coord2))
##############################
####### Stokes Slices ########
##############################
def _stokes_slice(self, stokes_ix, normalize=False):
"""Return a 3D NDCube (wavelength, coord1, coord2) for a given Stokes parameter"""
# Construct the WCS object for the smaller 3D cube.
# This function should only called if the
d_sh = self.data.shape
wcs_slice = [0] * self.wcs.pixel_n_dim
wcs_slice[0] = stokes_ix
wcs_slice[1] = slice(0, d_sh[1])
wcs_slice[2] = slice(0, d_sh[2])
wcs_slice[3] = slice(0, d_sh[3])
#print(wcs_slice)
wcs_slice = SlicedLowLevelWCS(self.wcs.low_level_wcs, wcs_slice)
#newcube = StokesParamCube(self.data[stokes_ix,:,:,:], HighLevelWCSWrapper(wcs_slice), self._stokes_axis[stokes_ix])
#cube_meta = {'stokes': self._stokes_axis[stokes_ix]}
cube_meta = self.meta.copy()
cube_meta['stokes'] = self._stokes_axis[stokes_ix]
newcube = StokesParamCube(self.data[stokes_ix,:,:,:], HighLevelWCSWrapper(wcs_slice), meta=cube_meta)
# Normalize the spectra if wanted.
if stokes_ix != 0:
if self.normalize is True:
# Normalize by I
I = self._stokes_slice(0)
newcube = StokesParamCube(newcube.data / I.data, newcube.wcs, self._stokes_axis[stokes_ix], meta=newcube.meta)
elif self.normalize:
# normalize by non-zero float
# TODO: sanity check input
newcube = StokesParamCube(newcube.data / self.normalize, newcube.wcs, self._stokes_axis[stokes_ix], meta=newcube.meta)
#newcube.meta = {'stokes': self._stokes_axis[stokes_ix]}
return newcube
@property
def I(self):
"""Intensity as a 3D NDCube (wavelength, coord1, coord2)"""
return self._stokes_slice(0)
@property
def Q(self):
"""Linear polarization Q as a 3D NDCube (wavelength, coord1, coord2)"""
return self._stokes_slice(1)
@property
def U(self):
"""Linear polarization U as a 3D NDCube (wavelength, coord1, coord2)"""
return self._stokes_slice(2)
@property
def V(self):
"""Circular polarization as a 3D NDCube (wavelength, coord1, coord2)"""
return self._stokes_slice(3)
@property
def P(self):
"""Total polarization P = sqrt(Q**2 + U**2 + V**2) a 3D NDCube (wavelength, coord1, coord2)"""
Q = self.Q
U = self.U
V = self.V
P = np.sqrt(Q.data**2 + U.data**2 + V.data**2)
return StokesParamCube(P, Q.wcs, meta={'stokes': 'P'})
@property
def L(self):
"""Linear polarization L = sqrt(Q**2 + U**2) a 3D NDCube (wavelength, coord1, coord2)"""
Q = self.Q
U = self.U
L = np.sqrt(Q.data**2 + U.data**2)
return StokesParamCube(L, Q.wcs, meta={'stokes': 'L'})
@property
def theta(self):
"""Linear polarization angle theta = 0.5 arctan(U/Q) a 3D NDCube (wavelength, coord1, coord2)"""
Q = self.Q
U = self.U
theta = 0.5 * np.arctan2(U.data, Q.data)
return StokesParamCube(np.degrees(theta) * u.degree, Q.wcs, meta={'stokes': 'theta'})
###########################
####### Stokes Maps #######
###########################
def _stokes_map(self, stokes_ix, wavelength, stop_wavelength=None):
"""Return a 2D NDCube (coord1, coord2) for a given Stokes parameter and wavelength selection"""
newcube = self._stokes_slice(stokes_ix)
# Starting index.
ix_0 = self.get_wav_ind(wavelength)
wav0 = self._spectral_axis[ix_0]
print('ix_0, wav0 = ', ix_0, wav0)
newcube.meta['wav0'] = wav0
# Stopping index.
ix_1 = None
wav1 = None
if stop_wavelength is not None:
if self.get_wav_ind(stop_wavelength) < ix_0:
print('Input stop_wavelength ahead of wavelength. Defaulting to the single wavelength map.')
else:
ix_1 = self.get_wav_ind(stop_wavelength)
wav1 = self._spectral_axis[ix_1]
print('ix_1, wav1 = ', ix_1, wav1)
newcube.meta['wav1'] = wav1
# Select the data to be included.
if ix_1 is None:
newcube_data = newcube.data[ix_0,:,:]
else:
# Sum over the selected wavelengths.
newcube_data = np.sum(newcube.data[ix_0:ix_1+1,:,:], axis=0)
newcube_wcs = newcube[ix_0,:,:].wcs
newmap = StokesParamMap(newcube_data, newcube_wcs, meta=newcube.meta)
return newmap
[docs] def get_wav_ind(self,wavelength):
"""Test if a wavelength is inside the wavelength axis for the object and return the array index corresponding to that wavelength """
# Test if the value is either an integer index or astropy quantity
if isinstance(wavelength, int):
ix = wavelength
if (ix < 0) or (ix > self.n_spectral-1):
ix = 0 if ix < 0 else (self.n_spectral-1)
print('Warning: Wavelength selected outside of axis range: {} {}'.\
format(self._spectral_axis[0], self._spectral_axis[-1]))
print('Defaulting to nearest wavelength at {}'.\
format(self._spectral_axis[ix]))
elif isinstance(wavelength,u.Quantity):
# Unit check the input wavelength
wav = wavelength.to(self._spectral_slice().world_axis_units[0])
ix = int(self._spectral_slice().world_to_array_index_values((wav.value,))[0])
# Check that the wavelength is inside the cube spectral
# axis range.
if (ix < 0) or (ix > self.n_spectral-1):
ix = 0 if ix < 0 else (self.n_spectral-1)
print('Warning: Wavelength selected outside of axis range: {} {}'.\
format(self._spectral_axis[0], self._spectral_axis[-1]))
print('Defaulting to nearest wavelength at {}'.\
format(self._spectral_axis[ix]))
else:
print('Warning: the quantity entered must be either an Astropy quantity with a unit of length or an integer index value.')
pass
return ix
[docs] def I_map(self, wavelength, stop_wavelength=None):
"""Intensity as a 2D NDCube (coord1, coord2)"""
return self._stokes_map(0, wavelength, stop_wavelength=stop_wavelength)
[docs] def Q_map(self, wavelength, stop_wavelength=None):
"""Linear polarization Q as a 2D NDCube (coord1, coord2)"""
return self._stokes_map(1, wavelength, stop_wavelength=stop_wavelength)
[docs] def U_map(self, wavelength, stop_wavelength=None):
"""Linear polarization U as a 2D NDCube (coord1, coord2)"""
return self._stokes_map(2, wavelength, stop_wavelength=stop_wavelength)
[docs] def V_map(self, wavelength, stop_wavelength=None):
"""Circular polarization as a 2D NDCube (coord1, coord2)"""
return self._stokes_map(3, wavelength, stop_wavelength=stop_wavelength)
[docs] def P_map(self, wavelength, stop_wavelength=None):
"""Total polarization P = sqrt(Q**2 + U**2 + V**2) as a 2D NDCube (coord1, coord2)"""
Q = self.Q_map(wavelength, stop_wavelength=stop_wavelength)
U = self.U_map(wavelength, stop_wavelength=stop_wavelength)
V = self.V_map(wavelength, stop_wavelength=stop_wavelength)
P = np.sqrt(Q.data**2 + U.data**2 + V.data**2)
meta = Q.meta
meta['stokes'] = 'P'
return StokesParamMap(P, Q.wcs, meta=meta)
[docs] def L_map(self, wavelength, stop_wavelength=None):
"""Linear polarization L = sqrt(Q**2 + U**2) as a 2D NDCube (coord1, coord2)"""
Q = self.Q_map(wavelength, stop_wavelength=stop_wavelength)
U = self.U_map(wavelength, stop_wavelength=stop_wavelength)
L = np.sqrt(Q.data**2 + U.data**2)
meta = Q.meta
meta['stokes'] = 'L'
return StokesParamMap(L, Q.wcs, meta=meta)
[docs] def theta_map(self, wavelength, stop_wavelength=None):
"""Linear polarization angle theta = 0.5 arctan(U/Q) as a 2D NDCube (coord1, coord2)"""
Q = self.Q_map(wavelength, stop_wavelength=stop_wavelength)
U = self.U_map(wavelength, stop_wavelength=stop_wavelength)
theta = np.arctan2(U.data, Q.data)
meta = Q.meta
meta['stokes'] = 'theta'
return StokesParamMap(np.degrees(theta) * u.degree, Q.wcs, meta=meta)
##############################
####### Stokes Profile #######
##############################
def _stokes_profile(self, stokes_ix, coords):
"""Return a 1D NDCube (wavelength) for a given Stokes parameter and coordinate selection"""
# Tranform input coordinates into a SkyCoord object.
coords, coords_pix = self.get_spatial_ind(coords)
newcube = self._stokes_slice(stokes_ix)
newcube = newcube[:, coords_pix[0],coords_pix[1]]
newcube.meta['x0_pix'] = coords_pix[1]
newcube.meta['y0_pix'] = coords_pix[0]
newcube.meta['x0'] = self.coord2_axis(0)[coords_pix[1]].Tx
newcube.meta['y0'] = self.coord1_axis(0)[coords_pix[0]].Ty
return StokesProfile(newcube.data, newcube.wcs, meta=newcube.meta)
[docs] def get_spatial_ind(self,coords):
"""Test if a set of coordinates fit inside the dimensions of the 2D images."""
# TODO: allow to specify coords in physical units
if (isinstance(coords, list) or isinstance(coords, tuple)) and (len(coords) == 2):
if isinstance(coords[0], u.Quantity) and isinstance(coords[1], u.Quantity):
coords = SkyCoord(Tx = coords[0], Ty= coords[1], frame=self.frame)
elif isinstance(coords, SkyCoord):
if coords.frame.__class__ is astropy.coordinates.builtin_frames.icrs.ICRS:
"""This is the default frame"""
Tx = coords.ra.to(self.wcs.world_axis_units[0])
Ty = coords.dec.to(self.wcs.world_axis_units[1])
coords = SkyCoord(Tx = Tx, Ty = Ty, frame = self.frame)
else:
print('Invalid coordinate type.')
return None
coords_pix = self[0,0,:,:].wcs.world_to_array_index(coords)
return coords, coords_pix
[docs] def I_profile(self, coords):
"""Intensity profile at a specific coordinate"""
return self._stokes_profile(0, coords)
[docs] def Q_profile(self, coords):
"""Linear polarization Q profile at a specific coordinate"""
return self._stokes_profile(1, coords)
[docs] def U_profile(self, coords):
"""Linear polarization U profile at a specific coordinate"""
return self._stokes_profile(2, coords)
[docs] def V_profile(self, coords):
"""Circular polarization profile at a specific coordinate"""
return self._stokes_profile(3, coords)
[docs] def P_profile(self, coords):
"""Total polarization P = sqrt(Q**2 + U**2 + V**2) profile at a specific coordinate"""
Q = self.Q_profile(coords)
U = self.U_profile(coords)
V = self.V_profile(coords)
P = np.sqrt(Q.data**2 + U.data**2 + V.data**2)
meta = Q.meta
meta['stokes'] = 'P'
return StokesProfile(P, Q.wcs, meta=meta)
[docs] def L_profile(self, coords):
"""Linear polarization L = sqrt(Q**2 + U**2) profile at a specific coordinate"""
Q = self.Q_profile(coords)
U = self.U_profile(coords)
L = np.sqrt(Q.data**2 + U.data**2)
meta = Q.meta
meta['stokes'] = 'L'
return StokesProfile(L, Q.wcs, meta=meta)
[docs] def theta_profile(self, coords):
"""Linear polarization angle theta = 0.5 arctan(U/Q) profile at a specific coordinate"""
Q = self.Q_profile(coords)
U = self.U_profile(coords)
theta = np.arctan2(U.data, Q.data)
meta=Q.meta
meta['stokes'] = 'theta'
return StokesProfile(np.degrees(theta) * u.degree, Q.wcs)
###############################################
##### Plotting functionality for the cube #####
###############################################
[docs] def plot(self, wavelength=None, coords=None, context=None, plot_u=u.nm, **kwargs):
"""Create a four panel plot showing I,Q,U,V maps at a specific wavelength"""
if (coords is not None):
# Tranform input coordinates into a SkyCoord object.
coords, coords_pix = self.get_spatial_ind(coords)
plt_meta = self.meta.copy()
plt_meta['x0_pix'] = coords_pix[1]
plt_meta['y0_pix'] = coords_pix[0]
plt_meta['x0'] = self.coord2_axis(0)[coords_pix[1]].Tx
plt_meta['y0'] = self.coord1_axis(0)[coords_pix[0]].Ty
if context is None:
return plotting._plot_all_profiles(self._spectral_axis,
self.data[:,:,coords_pix[0], coords_pix[1]],
self.data, plot_u, meta=plt_meta,
proj=self[0,0,:,:].wcs, **kwargs)
else:
context_ind = self._stokes_axis.index(context)
plt_meta['stokes'] = context
return plotting._plot_context_all_profiles(self._spectral_axis,
self.data[:,:,coords_pix[0],coords_pix[1]],
self.data[context_ind,:,:,:], plot_u,
proj=self[0,0,:,:].wcs, meta=plt_meta,
**kwargs)
elif coords is None:
# Default is to plot all four Stokes parameters.
return plotting._plot_all_data(self._spectral_axis, self.data, plot_u, proj=self[0,0,:,:].wcs, meta=self.meta, **kwargs)
class MagVectorMap(ndcube.ndcube.NDCube):
"""Class representing a 2D map of one magnetic field component.
with dimensions (coord1, coord2).
"""
def __init__(self, data, wcs, **kwargs):
# Init base NDCube with data and wcs
super().__init__(data, wcs=wcs, **kwargs)
def plot(self, **kwargs):
"""Plot a map of bandpass intensities"""
# Set title in keyword arguments
plot_title = 'Magnetic parameter: ' + self.meta['magnetic_param']
plotting._plot_image(self.data, proj=self.wcs, meta=self.meta, plot_title=plot_title, origin='lower', **kwargs)
[docs]class MagVectorCube(ndcube.ndcube.NDCube):
"""
Class representing a 2D map of inverted magnetic field vectors.
Parameters
----------
data: `numpy.ndarray`
The array holding the magnetic field data stored in the object. The array index order must be
(magnetic, coord1, coord2) or should this be (magnetic, coord2, coord1)?
wcs: `astropy.wcs.wcsapi.BaseLowLevelWCS`, `astropy.wcs.wcsapi.BaseHighLevelWCS`, optional
The WCS object containing the axes' information. If not provided, a WCS is constructed
using `wavelength_unit` and `coordinate_unit`, which default to pixels.
magnetic_params: `tuple` or `str`
Tuple containing all or part of the magnetic field components ('B', 'inclination', 'azimuth')
"""
def __init__(self, data, wcs=None, magnetic_params=('B', 'inclination', 'azimuth'), **kwargs):
if wcs is None:
# Define a default WCS where coordinates are defined in pixel units.
wcs = make_def_wcs(naxis=3, ctype=["COORD2", "COORD1", "Parameter"],
cunit=['pix', 'pix', ''])
# Init base NDCube with data and wcs
super().__init__(data, wcs=wcs, **kwargs)
if self.wcs.pixel_n_dim == 3:
# Check and define Stokes axis
#if len(magnetic_params) != self.data.shape[0]:
#raise Exception(f"Data contains {self.data.shape[0]} magnetic parameters, " + f"but {magnetic_params} parameters ({len(magnetic_params)} were expected")
self._magnetic_axis = magnetic_params
@property
def magnetic_axis(self):
"""The available magnetic parameters"""
return self._magnetic_axis
def _magnetic_map(self, magnetic_ix):
"""Return a 2D NDCube (coord1, coord2) for a given magnetic parameter"""
newcube_data = self.data[magnetic_ix,:,:]
newcube_wcs = self.wcs[magnetic_ix,:,:]
new_meta = {'magnetic_param': self._magnetic_axis[magnetic_ix]}
newmap = MagVectorMap(newcube_data, newcube_wcs, meta=new_meta)
#newcube = ndcube.NDCube(self.data, self.wcs)[magnetic_ix,:,:]
return newmap
@property
def B(self):
"""Magnetic field strength as a 2D NDcube (coord1, coord2)"""
return self._magnetic_map(0)
@property
def inclination(self):
"""Magnetic inclination as a 2D NDCube (coord1, coord2)"""
return self._magnetic_map(1)
@property
def azimuth(self):
"""Magnetic azimuth as 2D NDCube (coord1, coord2)"""
return self._magnetic_map(2)
[docs] def coord1_axis(self, coord2):
"""The physical axis across the first spatial dimension"""
# TODO: allow coord2 to be None assuming uniform coord1, return 1D array structure
n_coord1 = self.data.shape[1]
return self.wcs[0,:,coord2].array_index_to_world(np.arange(n_coord1))
[docs] def coord2_axis(self, coord1):
"""The physical axis across the second spatial dimension"""
# TODO: allow coord1 to be None assuming uniform coord2, return 1D array structure
n_coord2 = self.data.shape[2]
return self.wcs[0,coord1,:].array_index_to_world(np.arange(n_coord2))