Source code for polartools.process_images

"""
Functions to process image files.

.. autosummary::
   ~clean_threshold
   ~load_images
   ~get_curvature
   ~get_spectrum
   ~get_spectra
"""

import dask.array as da
import numpy as np
import matplotlib.pyplot as plt
from ._pyrixs import (
    image_to_photon_events,
    plot_curvature,
    fit_curvature,
    extract,
)


[docs] def clean_threshold(images, threshold): """ Cleans image stack using a fixed threshold value. Sets all value above the threshold to nan. Parameters ---------- images : stack of images Images to be cleaned. threshold : float, integer Value of the threshold to be applied. Returns ------- clean_images : stack of images Clean images. """ clean_images = images.copy() clean_images[clean_images > threshold] = np.nan return clean_images
def _cleanup_images(images, parameters): """ Clean up images. Parameters ---------- images : stack of images Images to be cleaned. parameters : dictionary Clean up functions and arguments. Default options: - {'threshold': threshold_value} For custom functions, use: - {'function': (myfunc, (arg1, arg2, ...))} where myfunc is a function with call: myfunc(images, arg1, arg2, ...) These function can be stacked, but keys have to be different. For example, the call: {'threshold1': 100, 'threshold2': 10, 'function': (myfunc, (arg1, arg2))} will run the threshold function twice with 100 and 10 as argument, then myfunc with (arg1, arg2). Returns ------- images : dask array Processed images. """ for function, args in parameters.items(): if "threshold" in function.lower(): images = clean_threshold(images, *args) elif "function" in function.lower(): images = args[0](images, *args[1:]) else: raise ValueError( "function must be either 'threshold' for preset clean method, " "or 'function' for a custom clean method" ) return images
[docs] def load_images( scans, cat, detector_key, cleanup=None, normalize=None, positioner=None ): """ Load scans with 2D images. Note that this function forces explicity outputs: - If multiple scans are passed, it will return the average of the scans. - If positioner = None, it will average all images. The resulting image \ will have shape = (pixels_horizontal, pixels_vertical). - If positioner not None, it will not average all images. The resulting \ image will have \ shape = (positioner_size, pixels_horizontal, pixels_vertical). PARAMETERS ---------- scans : iterable List of scan_id or uids. cat : databroker catalog Catalog. detector_key : string Name of item that holds the images cleanup : dictionary, optional Clean up functions and arguments. Available functions: - {'threshold': threshold_value} For custom functions, use: - {'function': (myfunc, (arg1, arg2, ...))} \ where myfunc is a function with call: myfunc(images, arg1, arg2, ...) \ These function can be stacked. For example, the call: \ {'threshold': 100, 'threshold': 10, 'function': (myfunc, (arg1, arg2))} \ will run the threshold function twice with 100 and 10 as argument, then \ myfunc with (arg1, arg2). normalize : string, optional Name of detector that will be used to normalize data. Default is None. positioner : string, optional Name of positioner to be read. Defaults to None. Returns ------- images : dask array Processed images. positioner_values : numpy ndarray, optional Values of the positioner. It is only returned if positioner is not None. """ output = [] for scan in scans: data = cat[scan].primary.to_dask() images = da.array(data[detector_key]).compute().astype(np.float64) if cleanup is not None: if not isinstance(cleanup, dict): raise TypeError( f"cleanup must be a dictionary, but a {type(cleanup)} " "was entered." ) images = _cleanup_images(images, cleanup) if normalize is not None: images[:] /= data[normalize].broadcast_like(data[detector_key]) output.append(images) if positioner is None: return np.nanmean(da.stack(output), axis=(0, 1, 2)) else: return ( np.nanmean(da.stack(output), axis=(0, 2)), data[positioner].values, )
def _cleanup_photon_events(photon_events): index = np.where(np.isfinite(photon_events[:, 2])) return photon_events[index[0], :] def _get_constant_offset(image, rng=10): timg = image.transpose() pos = int(timg.shape[1] / 2) col = np.nanmean(timg[:, pos - rng : pos + rng], axis=1) return np.where(col == np.nanmax(col))[0][0]
[docs] def get_curvature(image, binx=10, biny=1, constant_offset=None, plot=False): """ Get the curvature of an image. A second order polynomial is fit to the cross correlation of each slice using the `pyrixs` algorithm. See pyrixs_ for more details. Ideally used with an image with large intensity and a single peak. .. _pyrixs: https://github.com/mpmdean/pyrixs/blob/master/pyrixs/process2d.py Parameters ---------- image : numpy.array or dask.array Image to be used. binx : integer, optional Bin size in the horizontal direction in pixels. Defaults to 10. biny : integer, optional Bin size in the vertical direction in pixels. Defaults to 1. constant_offset : integer Offset for the curvature. This is only relevant for the plot, does not affect the use of the curvature to extract the spectra. Defaults to None, which triggers an automatic offset calculation. plot : boolean Flag to plot image and curvature. Returns ------- curvature : list List of polynomial coefficients [c0, c1, c2] where: y = c0 + c1.x + c2.x^2. See also -------- :func:`pyrixs.fit_curvature` :func:`pyrixs.get_curvature_offsets` """ im = image.compute() ph = _cleanup_photon_events(image_to_photon_events(im.transpose())) if constant_offset is None: constant_offset = _get_constant_offset(im) curvature = fit_curvature( ph, binx=binx, biny=biny, CONSTANT_OFFSET=constant_offset ) if plot: vmax = np.nanpercentile(im, 99.99) _, ax = plt.subplots() plt.pcolor( image.transpose(), vmin=0, vmax=vmax if vmax > 0 else np.nanmax(im), cmap="plasma", ) plt.colorbar() plot_curvature(ax, curvature, ph) print(f"curvature = {curvature}") return curvature
[docs] def get_spectrum(image, curvature, biny=1): """ Extract the spectrum of a single image. Uses the `pyrixs` extract_ function. .. _extract: \ https://github.com/mpmdean/pyrixs/blob/e94531f325478ce18b88d9fb3d42068\ f269bd118/pyrixs/process2d.py#L325 Parameters ---------- image : numpy.array or dask.array Image to be processed. curvature : iterable List of polynomial coefficients [c0, c1, c2] where: y = c0 + c1.x + c2.x^2. biny : integer, optional Bin size in the vertical direction in pixels. Defaults to 1. Returns ------- spectrum : numpy array Extracted 1D spectrum. See also -------- :func:`pyrixs.extract` """ if isinstance(image, da.core.Array): image = image.compute() ph = _cleanup_photon_events(image_to_photon_events(image.transpose())) return extract(ph, curvature, biny=biny)
[docs] def get_spectra(images, curvature, biny=1): """ Extract several spectrum. Uses the `pyrixs` extract_ function. .. _extract: \ https://github.com/mpmdean/pyrixs/blob/e94531f325478ce18b88d9fb3d42068\ f269bd118/pyrixs/process2d.py#L325 Parameters ---------- images : iterable of numpy.array or dask.array Images to be processed. curvature : iterable List of polynomial coefficients [c0, c1, c2] where: y = c0 + c1.x + c2.x^2. biny : integer, optional Bin size in the vertical direction in pixels. Defaults to 1. Returns ------- spectra : numpy.array List of 1D spectrum. See also -------- :func:`polartools.process_images.get_spectrum` :func:`pyrixs.extract` """ spectra = [] for image in images: if isinstance(image, da.core.Array): image = image.compute() spectra.append(get_spectrum(image, curvature, biny=biny)) return np.array(spectra)
[docs] def process_rxes( scans, cat, detector_key, curvature, cleanup=None, normalize=None, positioner=None, biny=1, ): """ Wrapper with typical RXES data processing. PARAMETERS ---------- scans : iterable List of scan_id or uids. cat : databroker catalog Catalog. detector_key : string Name of item that holds the images curvature : iterable List of polynomial coefficients [c0, c1, c2] where: y = c0 + c1.x + c2.x^2. cleanup : dictionary, optional Clean up functions and arguments. Available functions: - {'threshold': threshold_value} For custom functions, use: - {'function': (myfunc, (arg1, arg2, ...))} \ where myfunc is a function with call: myfunc(images, arg1, arg2, ...) \ These function can be stacked. For example, the call: \ {'threshold': 100, 'threshold': 10, 'function': (myfunc, (arg1, arg2))} \ will run the threshold function twice with 100 and 10 as argument, then \ myfunc with (arg1, arg2). normalize : string, optional Name of detector that will be used to normalize data. Default is None. positioner : string, optional Name of positioner to be read. Defaults to None. biny : integer, optional Bin size in the vertical direction in pixels. Defaults to 1. Returns ------- spectrum or spectra : array Processed spectrum (if positioner is None) or spectra (if positioner is not None). positioner_values : numpy ndarray, optional Values of the positioner. It is only returned if positioner is not None. """ data = load_images( scans, cat, detector_key, cleanup=cleanup, normalize=normalize, positioner=positioner, ) if positioner is None: # "Count" scans return get_spectrum(data.compute(), curvature, biny=biny) else: # Scans with a positioner return (get_spectra(data[0].compute(), curvature, biny=biny), data[1])
[docs] def process_rxes_mcd( scans, cat, detector_key, curvature, cleanup=None, normalize=None, positioner=None, biny=1, ): """ Wrapper with typical RXES-MCD data processing. PARAMETERS ---------- scans : iterable List of scan_id or uids. cat : databroker catalog Catalog. detector_key : string Name of item that holds the images curvature : iterable List of polynomial coefficients [c0, c1, c2] where: y = c0 + c1.x + c2.x^2. cleanup : dictionary, optional Clean up functions and arguments. Available functions: - {'threshold': threshold_value} For custom functions, use: - {'function': (myfunc, (arg1, arg2, ...))} \ where myfunc is a function with call: myfunc(images, arg1, arg2, ...) \ These function can be stacked. For example, the call: \ {'threshold': 100, 'threshold': 10, 'function': (myfunc, (arg1, arg2))} \ will run the threshold function twice with 100 and 10 as argument, then \ myfunc with (arg1, arg2). normalize : string, optional Name of detector that will be used to normalize data. Default is None. positioner : string, optional Name of positioner to be read. Defaults to None. biny : integer, optional Bin size in the vertical direction in pixels. Defaults to 1. Returns ------- rxes : array Processed x-ray emission spectrum(if positioner is None) or spectra (if positioner is not None). mcd : array Processed x-ray emission dichroism spectrum (if positioner is None) or spectra (if positioner is not None). positioner_values : numpy ndarray, optional Values of the positioner. It is only returned if positioner is not None. """ # Need to get all images... if positioner is None: positioner = "Time" images, positions = load_images( scans, cat, detector_key, cleanup=cleanup, normalize=normalize, positioner=positioner, ) ims = images.reshape((-1, 4, images.shape[1], images.shape[2])) ims_plus = ims[:, [0, 3], :, :].mean(axis=1) ims_minus = ims[:, [1, 2], :, :].mean(axis=1) specs_plus = get_spectra(ims_plus.compute(), curvature, biny=biny) specs_minus = get_spectra(ims_minus.compute(), curvature, biny=biny) rxes = (specs_plus + specs_minus) / 2.0 mcd = specs_plus.copy() mcd[:, :, 1] -= specs_minus[:, :, 1] if positioner == "Time": return rxes.mean(axis=0), mcd.mean(axis=0) else: return rxes, mcd, positions.reshape(-1, 4).mean(axis=1)