Source code for hs_process.spec_mod

# -*- coding: utf-8 -*-
from copy import deepcopy
from findiff import FinDiff
import itertools
import math
from math import factorial
import numpy as np
import os
import pandas as pd
from scipy import interpolate
import spectral.io.envi as envi
import spectral.io.spyfile as SpyFile

from hs_process.utilities import hstools


[docs]class spec_mod(object): ''' Class for manipulating data within the spectral domain, which is usually pixel-based. ''' def __init__(self, spyfile): self.spyfile = spyfile self.tools = hstools(spyfile) def _metadata_clip(self, wl_bands, meta_bands): '''Modifies metadata for spectral_clip() function.''' metadata = deepcopy(self.tools.spyfile.metadata) hist_str = (" -> hs_process.spectral_clip[<SpecPyFloatText label: " "'wl_bands?' value:{0}>]".format(wl_bands)) metadata['history'] += hist_str metadata['bands'] = len(meta_bands) band = [] wavelength = [] for idx, (key, val) in enumerate(meta_bands.items()): band.append(idx + 1) wavelength.append(val) band_str = '{' + ', '.join(str(b) for b in band) + '}' wavelength.sort() wavelength_str = '{' + ', '.join(str(wl) for wl in wavelength) + '}' metadata['band names'] = band_str metadata['wavelength'] = wavelength_str # self.tools.spyfile.metadata = metadata return metadata def _metadata_derivative(self, metadata, order=1): '''Modifies metadata for spec_derivative() function.''' metadata_dydx = deepcopy(metadata) hist_str = (" -> hs_process.spec_derivative[<" "SpecPyFloatText label: 'order?' value:{0}>]" "".format(order)) metadata_dydx['history'] += hist_str # if stdev_dydx is None: # if 'stdev' in metadata_dydx: # metadata_dydx.pop('stdev', None) # else: # metadata_dydx['stdev'] = stdev_dydx return metadata_dydx def _metadata_mimic(self, sensor, meta_bands_sensor, fwhm_dict): '''Modifies metadata for spectral_mimic() function.''' metadata = deepcopy(self.tools.spyfile.metadata.copy()) band_names = list(meta_bands_sensor.keys()) hist_str = (" -> hs_process.spectral_mimic[<" "SpecPyFloatText label: 'sensor?' value:{0}; " "SpecPyFloatText label: 'band_names?' value:{1}>]" "".format(sensor, band_names)) metadata['history'] += hist_str metadata['bands'] = len(band_names) wavelength = list(meta_bands_sensor.values()) band_str = '{' + ', '.join(str(b) for b in band_names) + '}' wavelength_str = '{' + ', '.join(str(wl) for wl in wavelength) + '}' fwhm_str = '{' + ', '.join(str(fwhm_dict[b]) for b in band_names) + '}' metadata['band names'] = band_str metadata['wavelength'] = wavelength_str metadata['fwhm'] = fwhm_str # self.tools.spyfile.metadata = metadata return metadata def _metadata_resample(self, bandwidth, meta_bands): '''Modifies metadata for spectral_resample() function.''' metadata = deepcopy(self.tools.spyfile.metadata) hist_str = (" -> hs_process.spectral_resample[<SpecPyFloatText label: " "'bandwidth?' value:{0}>]".format(bandwidth)) metadata['history'] += hist_str metadata['bands'] = len(meta_bands) band = [] wavelength = [] fwhm = [] for idx, (key, val) in enumerate(meta_bands.items()): band.append(idx + 1) wavelength.append(val) fwhm.append(bandwidth) band_str = '{' + ', '.join(str(b) for b in band) + '}' wavelength.sort() wavelength_str = '{' + ', '.join('{0:.2f}'.format(wl) for wl in wavelength) + '}' fwhm_str = '{' + ', '.join(str(fw) for fw in fwhm) + '}' metadata['band names'] = band_str metadata['wavelength'] = wavelength_str metadata['fwhm'] = fwhm_str return metadata def _metadata_smooth(self, window_size, order): '''Modifies metadata for spectral_smooth() function.''' metadata = deepcopy(self.tools.spyfile.metadata.copy()) hist_str = (" -> hs_process.spectral_smooth[<" "SpecPyFloatText label: 'window_size?' value:{0}; " "SpecPyFloatText label: 'polynomial_order?' value:{1}>" "]".format(window_size, order)) metadata['history'] += hist_str metadata['bands'] = self.spyfile.nbands # self.tools.spyfile.metadata = metadata return metadata def _mimic_center_wl(self, center_wl, wl_hs, spec_response): '''Finds the center wavelength based on the method of ``center_wl``.''' if center_wl == 'weighted': wl = np.average(wl_hs, weights=spec_response) elif center_wl == 'peak': wl = wl_hs[np.argmax(spec_response)] return wl def _mimic_get_response(self, sensor, df_band_response, col_wl): '''Gets ``df_band_response`` based on the ``sensor`` passed.''' if sensor == 'custom': msg = ('``col_wl`` ({0}) must be a column in ``df_band_response``.' ''.format(col_wl)) assert col_wl in df_band_response.columns, msg else: dir_data = self.tools.dir_data() name_response = sensor + '_band_response.csv' fname_response = os.path.join(dir_data, name_response) df_band_response = pd.read_csv(fname_response) col_wl = 'wl_nm' df = df_band_response.rename(columns = {col_wl:'wl_nm'}) wl_min = min(self.tools.meta_bands.values())-100 wl_max = max(self.tools.meta_bands.values())+100 # adding because interp needs some room df = df[df['wl_nm'] >= wl_min] df = df[df['wl_nm'] <= wl_max] s = (df==0).all(0) # Checks if columns are all zero or not df = df.drop(columns=s[s].index) # Removes all zero cols band_names = list(df.columns) band_names.remove('wl_nm') return df, band_names def _mimic_get_fwhm(self, df): ''' Gets the FWHM off the band responses in ``df``. Parameters: df: The band response dataframe (first column should be "wl_nm" indicating the wavelength). ''' fwhm_dict = {} for c in df.columns: if c == 'wl_nm': continue df_half_max = df[df[c] >= 0.5] fwhm_dict[c] = df_half_max['wl_nm'].max() - df_half_max['wl_nm'].min() return fwhm_dict def _resample_get_bin_indices(self, bandwidth, bins_n): ''' Gets an array of indices that indicate which bands should be averaged/ binned. Returns: inds (``numpy.array``): The band number (or index) that each original (non-binned) band should be consolidated into. ''' # get wavelength as list of float values wl_array = np.array( [float(i) for i in self.spyfile.metadata['wavelength']]) if bandwidth is None: _, bandwidth = np.linspace( int(wl_array.min()), int(math.ceil(wl_array.max())), num=bins_n, retstep=True, dtype=int) bandwidth = int(np.round(bandwidth)) bins = np.array(range(int(wl_array.min()), int(math.ceil(wl_array.max())+bandwidth), bandwidth)) inds = np.digitize(wl_array, bins) # assign an idx to each bin group return wl_array, bandwidth, inds # now, inds means band name (group_id) def _savitzky_golay(self, y, window_size=5, order=2, deriv=0, rate=1): ''' Smooth (and optionally differentiate) data with a Savitzky-Golay filter. The Savitzky-Golay filter removes high frequency noise from data. It has the advantage of preserving the original shape and features of the signal better than other types of filtering approaches, such as moving averages techniques. Parameters: y (``numpy.array``; shape (N,)): the values of the time history of the signal. window_size (``int``): the length of the window; must be an odd integer number (default: 5). order (``int``): the order of the polynomial used in the filtering; must be less than ``window_size`` - 1 (default: 2). deriv (``int``): the order of the derivative to compute (default: 0, means only smoothing). Returns: ys (``ndarray``; shape (N)): the smoothed signal (or it's n-th derivative). Notes: The Savitzky-Golay is a type of low-pass filter, particularly suited for smoothing noisy data. The main idea behind this approach is to make for each point a least-square fit with a polynomial of high order over a odd-sized window centered at the point. Examples: t = np.linspace(-4, 4, 500) y = np.exp( -t**2 ) + np.random.normal(0, 0.05, t.shape) ysg = savitzky_golay(y, window_size=31, order=4) import matplotlib.pyplot as plt plt.plot(t, y, label='Noisy signal') plt.plot(t, np.exp(-t**2), 'k', lw=1.5, label='Original signal') plt.plot(t, ysg, 'r', label='Filtered signal') plt.legend() plt.show() ''' try: window_size = np.abs(np.int(window_size)) order = np.abs(np.int(order)) except ValueError as msg: raise ValueError('Window_size/order have to be of type int') if window_size % 2 != 1 or window_size < 1: raise TypeError('Window_size must be a positive odd number') if window_size < order + 2: raise TypeError('Window_size is too small for the polynomials ' 'order') order_range = range(order+1) half_window = (window_size -1) // 2 # precompute coefficients b = np.mat([[k**i for i in order_range] for k in range(-half_window, half_window+1)]) m = np.linalg.pinv(b).A[deriv] * rate**deriv * factorial(deriv) # pad the signal at the extremes with values taken from the signal firstvals = y[0] - np.abs( y[1:half_window+1][::-1] - y[0] ) lastvals = y[-1] + np.abs(y[-half_window-1:-1][::-1] - y[-1]) y = np.concatenate((firstvals, y, lastvals)) return np.convolve( m[::-1], y, mode='valid') def _smooth_image(self, array=None, window_size=19, order=2): ''' Applies the Savitzky Golay smoothing algorithm to the spectral domain of each image pixel ''' if array is None: array = self.spyfile.open_memmap() array_2d = array.reshape(array.shape[0]*array.shape[1], array.shape[2]) array_2d_temp = array_2d.copy() for idx, row in enumerate(array_2d): array_2d_temp[idx, :] = self._savitzky_golay( row, window_size=window_size, order=order) # sns.lineplot(list(hs.meta_bands.keys()), array_2d[1000]) # sns.lineplot(list(hs.meta_bands.keys()), array_2d_temp[1000]) return array_2d_temp.reshape((array.shape))
[docs] def load_spyfile(self, spyfile): ''' Loads a ``SpyFile`` (Spectral Python object) for data access and/or manipulation by the ``hstools`` class. Parameters: spyfile (``SpyFile`` object): The datacube being accessed and/or manipulated. Example: Load and initialize the ``hsio`` and ``spec_mod`` modules >>> from hs_process import hsio >>> from hs_process import spec_mod >>> fname_in = r'F:\\nigo0024\Documents\hs_process_demo\Wells_rep2_20180628_16h56m_pika_gige_7-Convert Radiance Cube to Reflectance from Measured Reference Spectrum.bip.hdr' >>> io = hsio(fname_in) >>> my_spec_mod = spec_mod(io.spyfile) Load datacube >>> my_spec_mod.load_spyfile(io.spyfile) >>> my_spec_mod.spyfile Data Source: 'F:\\nigo0024\Documents\hs_process_demo\Wells_rep2_20180628_16h56m_pika_gige_7-Convert Radiance Cube to Reflectance from Measured Reference Spectrum.bip' # Rows: 617 # Samples: 1300 # Bands: 240 Interleave: BIP Quantization: 32 bits Data format: float32 ''' self.spyfile = spyfile self.tools = hstools(spyfile) try: self.spy_ul_e_srs = float(self.spyfile.metadata['map info'][3]) self.spy_ul_n_srs = float(self.spyfile.metadata['map info'][4]) self.spy_ps_e = float(self.spyfile.metadata['map info'][5]) self.spy_ps_n = float(self.spyfile.metadata['map info'][6]) except KeyError as e: print('Map information was not able to be loaded from the ' '``SpyFile``. Please be sure the metadata contains the "map ' 'info" tag with accurate geometric information.\n') self.spy_ul_e_srs = None self.spy_ul_n_srs = None self.spy_ps_e = None self.spy_ps_n = None
[docs] def spec_derivative(self, spyfile_spec=None, order=1): ''' Calculates the numeric derivative spectra from spyfile_spec. The derivavative spectra is calculated as the slope (rise over run) of the input spectra, and is normalized by the wavelength unit. Parameters: spyfile_spec: The spectral spyfile object to calculate the derivative for. order (``int``): The order of the derivative (default: 1). Example: Load and initialize ``hsio`` >>> import os >>> from hs_process import hsio >>> from hs_process import spec_mod >>> data_dir = r'F:\\nigo0024\Documents\hs_process_demo' >>> fname_hdr_spec = os.path.join(data_dir, 'Wells_rep2_20180628_16h56m_pika_gige_7_plot_611-cube-to-spec-mean.spec.hdr') >>> io = hsio() >>> io.read_spec(fname_hdr_spec) >>> my_spec_mod = spec_mod(io.spyfile_spec) Calculate the numeric derivative. >>> spec_dydx, metadata_dydx = my_spec_mod.spec_derivative(order=1) >>> io.write_spec('spec_derivative_order-1.spec.hdr', spec_dydx, df_std=None, metadata=metadata_dydx) Plot the numeric derivative spectra and compare against the original spectra. >>> import numpy as np >>> import seaborn as sns >>> sns.set_style("ticks") >>> wl_x = np.array([float(i) for i in metadata_dydx['wavelength']]) >>> y_ref = io.spyfile_spec.open_memmap()[0,0,:]*100 >>> ax1 = sns.lineplot(wl_x, y_ref) >>> ax2 = ax1.twinx() >>> ax2 = sns.lineplot(wl_x, 0, ax=ax2, color='gray') >>> ax2 = sns.lineplot(wl_x, spec_dydx[0,0,:]*100, ax=ax2, color=sns.color_palette()[1]) >>> ax2.set(ylim=(-0.8, 1.5)) >>> ax1.set_xlabel('Wavelength (nm)', weight='bold') >>> ax1.set_ylabel('Reflectance (%)', weight='bold') >>> ax2.set_ylabel('Reflectance derivative (%)', weight='bold') >>> ax1.set_title(r'API Example: `hstools.spec_derivative`', weight='bold') ''' msg0 = ('A numpy array was passed under the ``spyfile`` parameter, ' 'so therefore metadata must be retrieved from ' '``spec_mod.spyfile.metadata``. However, ``spec_mod.spyfile`` ' 'is not set. Please set via ``spec_mod.load_spyfile()``.') msg1 = ('``spyfile_spec`` was not passed and is not set; please set ' 'via ``spec_mod.load_spyfile()``.') msg2 = ('The passed ``Spyfile`` is not a valid "spec" file. A valid ' '"spec" file must have 3 dimensions with each of the first ' 'two dimensions (x and y) equal to 1 (e.g., shape = ' '(1, 1, n_bands)). Please set ``spyfile`` to a valid "spec" ' 'or pass a valid "spec" ``spyfile`` to ``spec_derivative()``.') if isinstance(spyfile_spec, SpyFile.SpyFile): spec = spyfile_spec.open_memmap() metadata = spyfile_spec.metadata elif isinstance(spyfile_spec, np.ndarray): assert self.spyfile is not None, msg0 spec = spyfile_spec.copy() metadata = self.spyfile.metadata else: assert self.spyfile is not None, msg1 spec = self.spyfile.open_memmap() metadata = self.spyfile.metadata assert spec.shape[:2] == (1, 1), msg2 # First two dimensions must be 1 wl_x = np.array([float(i) for i in metadata['wavelength']]) dydx = FinDiff(0, wl_x, order) # spec_dydx = np.empty_like(spec) # spec_dydx = np.empty(spec.shape) # spec_dydx[0,0,:] = np.gradient(spec[0,0,:], wl_x) spec_dydx = np.empty(spec.shape) spec_dydx[0,0,:] = dydx(spec[0,0,:]) # if 'stdev' in metadata: # stdev = np.array([float(i) for i in metadata['stdev']]) # stdev_dydx = np.gradient(stdev, wl_x) metadata_dydx = self._metadata_derivative(metadata, order) return spec_dydx, metadata_dydx
[docs] def spectral_clip(self, wl_bands=[[0, 420], [760, 776], [813, 827], [880, 1000]], spyfile=None): ''' Removes/clips designated wavelength bands from the hyperspectral datacube. Parameters: wl_bands (``list`` or ``list`` of ``lists``): minimum and maximum wavelenths to clip from image; if multiple groups of wavelengths should be cut, this should be a list of lists. For example, wl_bands=[760, 776] will clip all bands greater than 760.0 nm and less than 776.0 nm; wl_bands = [[0, 420], [760, 776], [813, 827], [880, 1000]] will clip all band less than 420.0 nm, bands greater than 760.0 nm and less than 776.0 nm, bands greater than 813.0 nm and less than 827.0 nm, and bands greater than 880 nm (default). spyfile (``SpyFile`` object or ``numpy.ndarray``): The data cube to clip; if ``numpy.ndarray`` or ``None``, loads band information from ``spec_mod.spyfile`` (default: ``None``). Returns: 2-element ``tuple`` containing - **array_clip** (``numpy.ndarray``): Clipped datacube. - **metadata** (``dict``): Modified metadata describing the clipped hyperspectral datacube (``array_clip``). Example: Load and initialize ``hsio`` and ``spec_mod`` >>> import os >>> from hs_process import hsio >>> from hs_process import spec_mod >>> data_dir = r'F:\\nigo0024\Documents\hs_process_demo' >>> fname_hdr = os.path.join(data_dir, 'Wells_rep2_20180628_16h56m_pika_gige_7-Radiance Conversion-Georectify Airborne Datacube-Convert Radiance Cube to Reflectance from Measured Reference Spectrum.bip.hdr') >>> io = hsio() >>> io.read_cube(fname_hdr) >>> my_spec_mod = spec_mod(io.spyfile) Using ``spec_mod.spectral_clip``, clip all spectral bands below *420 nm* and above *880 nm*, as well as the bands near the oxygen absorption (i.e., *760-776 nm*) and water absorption (i.e., *813-827 nm*) regions. >>> array_clip, metadata_clip = my_spec_mod.spectral_clip( wl_bands=[[0, 420], [760, 776], [813, 827], [880, 1000]]) Plot the spectra of the unclippe hyperspectral image and compare to that of the clipped image for a single pixel. >>> import seaborn as sns >>> from ast import literal_eval >>> spy_hs = my_spec_mod.spyfile.open_memmap() # datacube before smoothing >>> meta_bands = list(io.tools.meta_bands.values()) >>> meta_bands_clip = sorted([float(i) for i in literal_eval(metadata_clip['wavelength'])]) >>> ax = sns.lineplot(x=meta_bands, y=spy_hs[200][800]*100, label='Before spectral clipping', linewidth=3) >>> ax = sns.lineplot(x=meta_bands_clip, y=array_clip[200][800]*100, label='After spectral clipping', ax=ax) >>> ax.set_xlabel('Wavelength (nm)', weight='bold') >>> ax.set_ylabel('Reflectance (%)', weight='bold') >>> ax.set_title(r'API Example: `spec_mod.spectral_clip`', weight='bold') .. image:: img/spec_mod/spectral_clip.png ''' if spyfile is None: spyfile = self.spyfile array = self.spyfile.open_memmap() elif isinstance(spyfile, SpyFile.SpyFile): self.load_spyfile(spyfile) array = self.spyfile.open_memmap() elif isinstance(spyfile, np.ndarray): array = spyfile.copy() spyfile = self.spyfile if isinstance(wl_bands[0], list): spec_clip_groups = [self.tools.get_band_range( grp, index=True) for grp in wl_bands] spec_clip = list(itertools.chain(*spec_clip_groups)) else: spec_clip = self.tools.get_band_range(wl_bands, index=True) meta_bands = self.tools.meta_bands.copy() for k in self.tools.get_band_num(spec_clip): meta_bands.pop(k, None) # tools.meta_bands = meta_bands array_clip = np.delete(array, spec_clip, axis=2) metadata = self._metadata_clip(wl_bands, meta_bands) return array_clip, metadata
[docs] def spectral_mimic(self, sensor='sentinel-2a', df_band_response=None, col_wl='wl_nm', center_wl='peak', spyfile=None): ''' Mimics the response of a multispectral sensor based on transmissivity of sensor bands across a range of wavelength values by calculating its weighted average response and interpolating the hyperspectral response. Parameters: sensor (``str``): Should be one of ["sentera_6x", "micasense_rededge_3", "sentinel-2a", "sentinel-2b", "custom"]; if "custom", ``df_band_response`` and ``col_wl`` must be passed. df_band_response (``pd.DataFrame``): A DataFrame that contains the transmissivity (%) for each sensor band (as columns) mapped to the continuous wavelength values (as rows). Required if ``sensor`` is "custom", ignored otherwise. col_wl (``str``): The column of ``df_band_response`` denoting the wavlengths (default: 'wl_nm'). center_wl (``str``): Indicates how the center wavelength of each band is determined. If ``center_wl`` is "peak", the point at which transmissivity is at its maximum is used as the center wavelength. If ``center_wl`` is "weighted", the weighted average is used to compute the center wavelength. Must be one of ["peak", "weighted"] (``default: "peak"``). spyfile (``SpyFile`` object): The datacube being accessed and/or manipulated. Returns: 2-element ``tuple`` containing - **array_multi** (``numpy.ndarray``): Mimicked datacube. - **metadata** (``dict``): Modified metadata describing the mimicked spectral array (``array_multi``). Example: Load and initialize ``hsio`` and ``spec_mod`` >>> import os >>> from hs_process import hsio >>> from hs_process import spec_mod >>> data_dir = r'F:\\nigo0024\Documents\hs_process_demo' >>> fname_hdr = os.path.join(data_dir, 'Wells_rep2_20180628_16h56m_pika_gige_7-Radiance Conversion-Georectify Airborne Datacube-Convert Radiance Cube to Reflectance from Measured Reference Spectrum.bip.hdr') >>> data_dir2 = r'G:\\BBE\AGROBOT\Shared Work\hs_process_results\data\ref_closest_panel\crop_plot' >>> fname_hdr = os.path.join(data_dir2, 'study_aerffield_date_20190708_plot_5110-crop-plot.bip.hdr') >>> array = io.spyfile.open_memmap() >>> io = hsio() >>> io.read_cube(fname_hdr) >>> my_spec_mod = spec_mod(io.spyfile) Use spec_mod.spectral_mimic to mimic the Sentinel-2A spectral response function. >>> array_s2a, metadata_s2a = my_spec_mod.spectral_mimic(sensor='sentinel-2a', center_wl='weighted') Plot the mean spectral response of the hyperspectral image to that of the mimicked Sentinel-2A image bands (mean calculated across the entire image). >>> import seaborn as sns >>> from ast import literal_eval >>> spy_hs = my_spec_mod.spyfile.open_memmap() # datacube before smoothing >>> meta_bands = list(io.tools.meta_bands.values()) >>> meta_bands_s2a = sorted([float(i) for i in literal_eval(metadata_s2a['wavelength'])]) >>> ax = sns.lineplot(x=meta_bands, y=spy_hs[200][800]*100, label='Hyperspectral (Pika II)', linewidth=3) >>> ax = sns.lineplot(x=meta_bands_s2a, y=array_s2a[200][800]*100, label='Sentinel-2A "mimic"', marker='o', ms=6, ax=ax) >>> ax.set_xlabel('Wavelength (nm)', weight='bold') >>> ax.set_ylabel('Reflectance (%)', weight='bold') >>> ax.set_title(r'API Example: `spec_mod.spectral_mimic`', weight='bold') .. image:: img/spec_mod/spectral_mimic_sentinel-2a.png Use spec_mod.spectral_mimic to mimic the Sentera 6x spectral configuration and compare to both hyperspectral and Sentinel-2A. >>> array_6x, metadata_6x = my_spec_mod.spectral_mimic(sensor='sentera_6x', center_wl='peak') >>> meta_bands_6x = sorted([float(i) for i in literal_eval(metadata_6x['wavelength'])]) >>> ax = sns.lineplot(x=meta_bands, y=spy_hs[200][800]*100, label='Hyperspectral (Pika II)', linewidth=3) >>> ax = sns.lineplot(x=meta_bands_s2a, y=array_s2a[200][800]*100, label='Sentinel-2A "mimic"', marker='o', ms=6, ax=ax) >>> ax = sns.lineplot(x=meta_bands_6x, y=array_6x[200][800]*100, label='Sentera 6X "mimic"', marker='o', ms=8, ax=ax) >>> ax.set_xlabel('Wavelength (nm)', weight='bold') >>> ax.set_ylabel('Reflectance (%)', weight='bold') >>> ax.set_title(r'API Example: `spec_mod.spectral_mimic`', weight='bold') .. image:: img/spec_mod/spectral_mimic_6x.png And finally, mimic the Micasense RedEdge-MX and compare to hyperspectral, Sentinel-2A, and Sentera 6X. >>> array_re3, metadata_re3 = my_spec_mod.spectral_mimic(sensor='micasense_rededge_3', center_wl='peak') >>> meta_bands_re3 = sorted([float(i) for i in literal_eval(metadata_re3['wavelength'])]) >>> ax = sns.lineplot(x=meta_bands, y=spy_hs[200][800]*100, label='Hyperspectral (Pika II)', linewidth=3) >>> ax = sns.lineplot(x=meta_bands_s2a, y=array_s2a[200][800]*100, label='Sentinel-2A "mimic"', marker='o', ms=6, ax=ax) >>> ax = sns.lineplot(x=meta_bands_6x, y=array_6x[200][800]*100, label='Sentera 6X "mimic"', marker='o', ms=8, ax=ax) >>> ax = sns.lineplot(x=meta_bands_re3, y=array_re3[200][800]*100, label='Micasense RedEdge 3 "mimic"', marker='o', ms=8, ax=ax) >>> ax.set_xlabel('Wavelength (nm)', weight='bold') >>> ax.set_ylabel('Reflectance (%)', weight='bold') >>> ax.set_title(r'API Example: `spec_mod.spectral_mimic`', weight='bold') .. image:: img/spec_mod/spectral_mimic_re.png ''' if spyfile is None: spyfile = self.spyfile array = self.spyfile.open_memmap() elif isinstance(spyfile, SpyFile.SpyFile): self.load_spyfile(spyfile) array = self.spyfile.open_memmap() elif isinstance(spyfile, np.ndarray): array = spyfile.copy() spyfile = self.spyfile msg1 = ('`sensor`` ({0}) must be one of ["sentera_6x", ' '"micasense_rededge_3", "sentinel-2a", "sentinel-2b", ' '"custom"].'.format(sensor)) msg2 = ('``center_wl`` ({0}) must be one of ["peak", "weighted"].' ''.format(center_wl)) assert sensor in ['sentera_6x', 'micasense_rededge_3', 'sentinel-2a', 'sentinel-2b', 'custom'], msg1 assert center_wl in ['peak', 'weighted'], msg2 df, band_names = self._mimic_get_response(sensor, df_band_response, col_wl) # Normalizes cells along columns so column sum equals 1.0 df_norm = df.loc[:,:].div(df.sum(axis=0).drop('wl_nm')) df_norm['wl_nm'] = df['wl_nm'] # df_temp = df_norm.cumsum(axis=0) # df_temp['wl_nm'] = df['wl_nm'] array_multi = np.empty((self.spyfile.nrows, self.spyfile.ncols, len(band_names))) # interpolate and sum each pixel to get mimicked value meta_bands = self.tools.meta_bands.copy() meta_bands_sensor = {} wl_sensor = df_norm['wl_nm'].values wl_hs = list(meta_bands.values()) bands_remove = {} for idx, band_name in enumerate(band_names): y_sensor = df_norm[band_name].values f = interpolate.interp1d(wl_sensor, y_sensor) spec_response = f(wl_hs) # this is the response that can be multiplied by every pixel if np.count_nonzero(spec_response) > 0: array_multi[:,:,idx] = np.average(array, weights=spec_response, axis=2) meta_bands_sensor[band_name] = self._mimic_center_wl(center_wl, wl_hs, spec_response) else: bands_remove[idx] = band_name for idx, band_name in bands_remove.items(): array_multi = np.delete(array_multi, idx, axis=2) fwhm_dict = self._mimic_get_fwhm(df) metadata = self._metadata_mimic(sensor, meta_bands_sensor, fwhm_dict) return array_multi, metadata
[docs] def spectral_resample(self, bandwidth=None, bins_n=None, spyfile=None): ''' Performs pixel-wise resampling of spectral bands via binning (calculates the mean across all bands within each ``bandwidth`` region for each image pixel). Parameters: bandwidth (``float`` or ``int``): The bandwidth of the bands after spectral resampling is complete (units should be consistent with that of the .hdr file). Setting ``bandwidth`` to 10 will consolidate bands that fall within every 10 nm interval. bins_n (``int``): The number of bins (i.e., "bands") to achieve after spectral resampling is complete. Ignored if ``bandwidth`` is not ``None``. spyfile (``SpyFile`` object): The datacube being accessed and/or manipulated. Returns: 2-element ``tuple`` containing - **array_bin** (``numpy.ndarray``): Binned datacube. - **metadata** (``dict``): Modified metadata describing the binned spectral array (``array_bin``). Example: Load and initialize ``hsio`` and ``spec_mod`` >>> import os >>> from hs_process import hsio >>> from hs_process import spec_mod >>> data_dir = r'F:\\nigo0024\Documents\hs_process_demo' >>> fname_hdr = os.path.join(data_dir, 'Wells_rep2_20180628_16h56m_pika_gige_7-Radiance Conversion-Georectify Airborne Datacube-Convert Radiance Cube to Reflectance from Measured Reference Spectrum.bip.hdr') >>> io = hsio() >>> io.read_cube(fname_hdr) >>> my_spec_mod = spec_mod(io.spyfile) Use spec_mod.spectral_resample to "bin" the datacube to bands with 20 nm bandwidths. >>> array_bin, metadata_bin = my_spec_mod.spectral_resample(bandwidth=20) Plot the mean spectral response of the hyperspectral image to that of the binned image bands (mean calculated across the entire image). >>> import seaborn as sns >>> from ast import literal_eval >>> spy_hs = my_spec_mod.spyfile.open_memmap() # datacube before smoothing >>> meta_bands = list(io.tools.meta_bands.values()) >>> meta_bands_bin = sorted([float(i) for i in literal_eval(metadata_bin['wavelength'])]) >>> ax = sns.lineplot(x=meta_bands, y=spy_hs[200][800]*100, label='Hyperspectral (Pika II)', linewidth=3) >>> ax = sns.lineplot(x=meta_bands_bin, y=array_bin[200][800]*100, label='Spectral resample (20 nm)', marker='o', ms=6, ax=ax) >>> ax.set_xlabel('Wavelength (nm)', weight='bold') >>> ax.set_ylabel('Reflectance (%)', weight='bold') >>> ax.set_title(r'API Example: `spec_mod.spectral_resample`', weight='bold') .. image:: img/spec_mod/spectral_resample.png ''' if spyfile is None: spyfile = self.spyfile array = self.spyfile.open_memmap() elif isinstance(spyfile, SpyFile.SpyFile): self.load_spyfile(spyfile) array = self.spyfile.open_memmap() elif isinstance(spyfile, np.ndarray): array = spyfile.copy() spyfile = self.spyfile msg = ('Either ``bandwidth`` or ``bins_n`` must be set and be greater ' 'than zero.') if bandwidth is None: assert bins_n > 0, msg bins_n = int(bins_n) else: assert bandwidth > 1, msg bins_n = None wl_array, bandwidth, inds = self._resample_get_bin_indices( bandwidth, bins_n) # get empty array with array_bin = np.empty((self.spyfile.nrows, self.spyfile.ncols, len(np.unique(inds)))) # find the first and last index for each group meta_bands = {} for uid in np.unique(inds): i_low = np.argmax(inds == uid) i_high = np.argmax(inds > uid) - 1 if i_high == -1: i_high = i_low + 1 array_bin[:,:,uid-1] = np.average(array[:,:,i_low:i_high], axis=2) meta_bands[int(uid)] = wl_array[i_low:i_high].mean() # get band_names and center wavelengths metadata = self._metadata_resample(bandwidth, meta_bands) return array_bin, metadata
[docs] def spectral_smooth(self, window_size=11, order=2, spyfile=None): ''' Performs Savitzky-Golay smoothing on the spectral domain. Parameters: window_size (``int``): the length of the window; must be an odd integer number (default: 11). order (``int``): the order of the polynomial used in the filtering; must be less than ``window_size`` - 1 (default: 2). spyfile (``SpyFile`` object or ``numpy.ndarray``): The data cube to clip; if ``numpy.ndarray`` or ``None``, loads band information from ``spec_mod.spyfile`` (default: ``None``). Returns: 2-element ``tuple`` containing - **array_smooth** (``numpy.ndarray``): Clipped datacube. - **metadata** (``dict``): Modified metadata describing the smoothed hyperspectral datacube (``array_smooth``). Note: Because the smoothing operation is performed for every pixel individually, this function may take several minutes for large images. Example: Load and initialize ``hsio`` and ``spec_mod`` >>> import os >>> from hs_process import hsio >>> from hs_process import spec_mod >>> data_dir = r'F:\\nigo0024\Documents\hs_process_demo' >>> fname_hdr = os.path.join(data_dir, 'Wells_rep2_20180628_16h56m_pika_gige_7-Radiance Conversion-Georectify Airborne Datacube-Convert Radiance Cube to Reflectance from Measured Reference Spectrum.bip.hdr') >>> io = hsio() >>> io.read_cube(fname_hdr) >>> my_spec_mod = spec_mod(io.spyfile) Use ``spec_mod.spectral_smooth`` to perform a *Savitzky-Golay* smoothing operation across the hyperspectral spectral signature. >>> array_smooth, metadata_smooth = my_spec_mod.spectral_smooth( window_size=11, order=2) Plot the spectra of an individual pixel to visualize the result of the smoothing procedure. >>> import seaborn as sns >>> spy_hs = my_spec_mod.spyfile.open_memmap() # datacube before smoothing >>> meta_bands = list(io.tools.meta_bands.values()) >>> meta_bands_smooth = sorted([float(i) for i in metadata_smooth['wavelength']]) >>> ax = sns.lineplot(x=meta_bands, y=spy_hs[200][800]*100, label='Before spectral smoothing', linewidth=3) >>> ax = sns.lineplot(x=meta_bands_smooth, y=array_smooth[200][800]*100, label='After spectral smoothing', ax=ax) >>> ax.set_xlabel('Wavelength (nm)', weight='bold') >>> ax.set_ylabel('Reflectance (%)', weight='bold') >>> ax.set_title(r'API Example: `spec_mod.spectral_smooth`', weight='bold') .. image:: img/spec_mod/spectral_smooth.png ''' if spyfile is None: spyfile = self.spyfile array = self.spyfile.open_memmap() elif isinstance(spyfile, SpyFile.SpyFile): self.load_spyfile(spyfile) array = self.spyfile.open_memmap() elif isinstance(spyfile, np.ndarray): array = spyfile.copy() spyfile = self.spyfile array_smooth = self._smooth_image(array, window_size, order) metadata = self._metadata_smooth(window_size, order) return array_smooth, metadata
# TODO: Add normalization function for light scattering # https://link.springer.com/chapter/10.1007/978-1-4939-2836-1_4#Sec12