Source code for polartools.absorption

"""
Functions to load and process x-ray absorption data.

.. autosummary::
   ~load_absorption
   ~load_dichro
   ~load_lockin
   ~load_multi_xas
   ~load_multi_dichro
   ~load_multi_lockin
   ~normalize_absorption
   ~pre_edge_background
   ~post_edge_background
   ~post_edge_flatten
   ~fluo_corr
   ~process_xmcd
   ~plot_xmcd
   ~save_xmcd
"""

# Copyright (c) 2020-2021, UChicago Argonne, LLC.
# See LICENSE file for details.

from .load_data import load_table, is_Bluesky_specfile
import numpy as np
from scipy.interpolate import interp1d
from spec2nexus.spec import SpecDataFile, SpecDataFileNotFound, NotASpecDataFile
from ._larch import finde0, index_nearest
from xraydb import xray_line, xray_edge, material_mu
from lmfit.models import PolynomialModel
from warnings import warn
import matplotlib.pyplot as plt

_spec_default_cols = dict(
    positioner="Energy",
    detector="IC5",
    monitor="IC4",
    dc_col="Lock DC",
    ac_col="Lock AC",
    acoff_col="Lock ACoff",
)

_bluesky_default_cols = dict(
    positioner="energy",
    detector="4idhI0",
    monitor="4idhI",
    dc_col="LockDC",
    ac_col="LockAC",
    acoff_col="LockACoff",
)


def _sort_and_deduplicate(x, *ys):
    """Sort arrays by x and remove duplicate x values."""
    _, idx = np.unique(x, return_index=True)
    return (x[idx],) + tuple(y[idx] for y in ys)


def _select_default_names(source, **kwargs):
    # Select default parameters
    if isinstance(source, (str, SpecDataFile)) and source not in (
        "csv",
        "hdf5",
        "h5",
        "hdf",
    ):
        # It is spec — check if it was written by Bluesky.
        try:
            check = is_Bluesky_specfile(source, folder=kwargs.get("folder", ""))
            _defaults = _bluesky_default_cols if check else _spec_default_cols
        except (NotASpecDataFile, SpecDataFileNotFound):
            _defaults = _bluesky_default_cols
    else:
        # csv, h5/hdf5/hdf, or databroker — all use Bluesky column names.
        _defaults = _bluesky_default_cols
    return _defaults


[docs] def load_absorption( scan, source, positioner=None, detector=None, monitor=None, transmission=True, **kwargs, ): """ Load the x-ray absorption from one scan. Parameters ---------- scan : int Scan_id our uid. If scan_id is passed, it will load the last scan with that scan_id. See kwargs for search options. source : databroker database, name of the spec file, or 'csv' Note that applicable kwargs depend on this selection. positioner : string, optional Name of the positioner, this needs to be the same as defined in Bluesky or SPEC. If None is passed, it defauts to the x-ray energy. detector : string, optional Detector to be read from this scan, again it needs to be the same name as in Bluesky. If None is passed, it defaults to the ion chamber 5. monitor : string, optional Name of the monitor detector. If None is passed, it defaults to the ion chamber 4. transmission : bool, optional Flag to select between transmission mode -> ln(monitor/detector) or fluorescence mode -> detector/monitor kwargs : The necessary kwargs are passed to the loading functions defined by the `source` argument: - csv -> possible kwargs: folder, name_format. - spec -> possible kwargs: folder. - databroker -> possible kwargs: stream, query, use_db_v1. Note that a warning will be printed if the an unnecessary kwarg is passed. Returns ------- x : numpy.array Positioner data. y : numpy.array X-ray absorption. See also -------- :func:`polartools.load_data.load_table` """ _defaults = _select_default_names(source, **kwargs) if not positioner: positioner = _defaults["positioner"] if not detector: detector = _defaults["detector"] if not monitor: monitor = _defaults["monitor"] # Load data table = load_table(scan, source, **kwargs) if transmission: return ( table[positioner].values, np.log(table[monitor] / table[detector]).values, ) else: return ( table[positioner].values, (table[detector] / table[monitor]).values, )
[docs] def load_lockin( scan, source, positioner=None, dc_col=None, ac_col=None, acoff_col=None, **kwargs, ): """ Load the x-ray magnetic dichroism from one scan taken in lock-in mode. Parameters ---------- scan : int Scan_id our uid. If scan_id is passed, it will load the last scan with that scan_id. See kwargs for search options. source : databroker database, name of the spec file, or 'csv' Note that applicable kwargs depend on this selection. positioner : string, optional Name of the positioner, this needs to be the same as defined in Bluesky or SPEC. If None is passed, it defaults to the x-ray energy. dc_col : string, optional Name of the DC scaler, again it needs to be the same name as in Bluesky or SPEC. If None is passed, it defaults to 'Lock DC'. ac_col : string, optional Name of the AC scaler. If None is passed, it defaults to 'Lock AC'. acoff_col : string, optional Name of the AC offset scaler. If None is passed, it defaults to 'Lock AC off'. kwargs : The necessary kwargs are passed to the loading functions defined by the `source` argument: - csv -> possible kwargs: folder, name_format. - spec -> possible kwargs: folder. - databroker -> possible kwargs: stream, query, use_db_v1. Note that a warning will be printed if the an unnecessary kwarg is passed. Returns ------- x : numpy.array Positioner data. dc : numpy.array DC response, normally corresponds to the x-ray absorption. ac-ac_off : numpy.array AC response minus the offset, normally corresponds to the x-ray magnetic dichroism. See also -------- :func:`polartools.load_data.load_table` """ _defaults = _select_default_names(source, **kwargs) if not positioner: positioner = _defaults["positioner"] if not dc_col: dc_col = _defaults["dc_col"] if not ac_col: ac_col = _defaults["ac_col"] if not acoff_col: acoff_col = _defaults["acoff_col"] # Load data table = load_table(scan, source, **kwargs) return ( table[positioner].values, table[dc_col].values, (table[ac_col] - table[acoff_col]).values, )
[docs] def load_dichro( scan, source, positioner=None, detector=None, monitor=None, transmission=True, **kwargs, ): """ Load the x-ray magnetic dichroism from one scan taken in non-lock-in mode ('dichro'). Parameters ---------- scan : int Scan_id our uid. If scan_id is passed, it will load the last scan with that scan_id. See kwargs for search options. source : databroker database, name of the spec file, or 'csv' Note that applicable kwargs depend on this selection. positioner : string, optional Name of the positioner, this needs to be the same as defined in Bluesky. Defauts to the x-ray energy. detector : string, optional Detector to be read from this scan, again it needs to be the same name as in Bluesky. Defaults to the ion chamber 5. monitor : string, optional Name of the monitor detector. Defaults to the ion chamber 4. transmission: bool, optional Flag to select between transmission mode -> ln(monitor/detector) or fluorescence mode -> detector/monitor. kwargs : The necessary kwargs are passed to the loading functions defined by the `source` argument: - csv -> possible kwargs: folder, name_format. - spec -> possible kwargs: folder. - databroker -> possible kwargs: stream, query, use_db_v1. Note that a warning will be printed if the an unnecessary kwarg is passed. Returns ------- x : numpy.array Positioner data. xanes : numpy.array X-ray absorption. xmcd : numpy.array X-ray magnetic dichroism. See also -------- :func:`polartools.load_data.load_absorption` :func:`polartools.load_data.load_table` """ # In SPEC the columns are different. if ( isinstance(source, (str, SpecDataFile)) and source not in ("csv", "hdf5", "h5", "hdf") and not is_Bluesky_specfile(source, **kwargs) ): if not positioner: positioner = _spec_default_cols["positioner"] if not monitor: monitor = _spec_default_cols["monitor"] if not detector: detector = _spec_default_cols["detector"] table = load_table(scan, source, **kwargs) x = table[positioner].values monp = table[monitor + "(+)"].values detp = table[detector + "(+)"].values monm = table[monitor + "(-)"].values detm = table[detector + "(-)"].values if transmission: xanes = (np.log(monp / detp) + np.log(monm / detm)) / 2.0 xmcd = np.log(monp / detp) - np.log(monm / detm) else: xanes = (detp / monp + detm / monm) / 2.0 xmcd = detp / monp - detm / monm else: x0, y0 = load_absorption( scan, source, positioner, detector, monitor, transmission, **kwargs ) size = x0.size // 4 x0 = x0.reshape(size, 4) y0 = y0.reshape(size, 4) x = x0.mean(axis=1) xanes = y0.mean(axis=1) xmcd = y0[:, [0, 3]].mean(axis=1) - y0[:, [1, 2]].mean(axis=1) return x, xanes, xmcd
[docs] def load_multi_xas( scans, source, return_mean=True, positioner=None, detector=None, monitor=None, transmission=True, **kwargs, ): """ Load multiple x-ray absorption energy scans. Parameters ---------- scans : iterable Sequence of scan_ids our uids. If scan_id is passed, it will load the last scan with that scan_id. Use kwargs for search options. source : databroker database, name of the spec file, or 'csv' Note that applicable kwargs depend on this selection. return_mean : boolean, optional Flag to indicate if the averaging of multiple scans will be performed. Note that if True three outputs are generated, otherwise two. positioner : string, optional Name of the positioner, this needs to be the same as defined in Bluesky or SPEC. If None is passed, it defauts to the x-ray energy. detector : string, optional Detector to be read from this scan, again it needs to be the same name as in Bluesky. If None is passed, it defaults to the ion chamber 5. monitor : string, optional Name of the monitor detector. If None is passed, it defaults to the ion chamber 4. transmission: bool, optional Flag to select between transmission mode -> ln(monitor/detector) or fluorescence mode -> detector/monitor kwargs : The necessary kwargs are passed to the loading functions defined by the `source` argument: - csv -> possible kwargs: folder, name_format. - spec -> possible kwargs: folder. - databroker -> possible kwargs: stream, query, use_db_v1. Note that a warning will be printed if the an unnecessary kwarg is passed. Returns ------- energy : numpy.array X-ray energy. xanes : numpy.array X-ray absorption. If return_mean = True: xanes.shape = (len(scans), len(energy)), otherwise: xanes.shape = (len(energy)) xanes_std : numpy.array, optional Error of the mean of x-ray absorption. This will only be returned if return_mean = True. See also -------- :func:`polartools.load_data.load_absorption` """ for scan in scans: if "energy" not in locals(): energy, xanes = load_absorption( scan, source, positioner, detector, monitor, transmission, **kwargs, ) energy, xanes = _sort_and_deduplicate(energy, xanes) else: energy_tmp, xanes_tmp = load_absorption( scan, source, positioner, detector, monitor, transmission, **kwargs, ) energy_tmp, xanes_tmp = _sort_and_deduplicate(energy_tmp, xanes_tmp) xanes = np.vstack( ( xanes, interp1d( energy_tmp, xanes_tmp, kind="linear", fill_value=np.nan, bounds_error=False, )(energy), ) ) if return_mean: if len(xanes.shape) == 2: xanes_std = xanes.std(axis=0) / np.sqrt(len(scans)) xanes = xanes.mean(axis=0) return energy, xanes, xanes_std else: return energy, xanes, np.zeros((xanes.size)) else: return energy, xanes
[docs] def load_multi_dichro( scans, source, return_mean=True, positioner=None, detector=None, monitor=None, transmission=True, **kwargs, ): """ Load multiple x-ray magnetic dichroism energy "dichro" scans. Parameters ---------- scans : iterable Sequence of scan_ids our uids. If scan_id is passed, it will load the last scan with that scan_id. Use kwargs for search options. source : databroker database, name of the spec file, or 'csv' Note that applicable kwargs depend on this selection. return_mean : boolean, optional Flag to indicate if the averaging of multiple scans will be performed. Note that if True three outputs are generated, otherwise two. positioner : string, optional Name of the positioner, this needs to be the same as defined in Bluesky or SPEC. If None is passed, it defauts to the x-ray energy. detector : string, optional Detector to be read from this scan, again it needs to be the same name as in Bluesky. If None is passed, it defaults to the ion chamber 5. monitor : string, optional Name of the monitor detector. If None is passed, it defaults to the ion chamber 4. transmission: bool, optional Flag to select between transmission mode -> ln(monitor/detector) or fluorescence mode -> detector/monitor kwargs : The necessary kwargs are passed to the loading functions defined by the `source` argument: - csv -> possible kwargs: folder, name_format. - spec -> possible kwargs: folder. - databroker -> possible kwargs: stream, query, use_db_v1. Note that a warning will be printed if the an unnecessary kwarg is passed. Returns ------- energy : numpy.array X-ray energy. xanes : numpy.array X-ray absorption. If return_mean = True: xanes.shape = (len(scans), len(energy)), otherwise: xanes.shape = (len(energy)) xmcd : numpy.array X-ray magnetic dichroism. It has the same shape as the xanes. xanes_std : numpy.array, optional Error of the mean of x-ray absorption. This will only be returned if return_mean = True. xmcd_std : numpy.array, optional Error of the mean of x-ray magnetic dichroism. This will only be returned if return_mean = True. See also -------- :func:`polartools.load_data.load_dichro` """ for scan in scans: if "energy" not in locals(): energy, xanes, xmcd = load_dichro( scan, source, positioner, detector, monitor, transmission, **kwargs, ) energy, xanes, xmcd = _sort_and_deduplicate(energy, xanes, xmcd) else: energy_tmp, xanes_tmp, xmcd_tmp = load_dichro( scan, source, positioner, detector, monitor, transmission, **kwargs, ) energy_tmp, xanes_tmp, xmcd_tmp = _sort_and_deduplicate( energy_tmp, xanes_tmp, xmcd_tmp ) xanes = np.vstack( ( xanes, interp1d( energy_tmp, xanes_tmp, kind="linear", fill_value=np.nan, bounds_error=False, )(energy), ) ) xmcd = np.vstack( ( xmcd, interp1d( energy_tmp, xmcd_tmp, kind="linear", fill_value=np.nan, bounds_error=False, )(energy), ) ) if return_mean: if len(xanes.shape) == 2: xanes_std = xanes.std(axis=0) / np.sqrt(len(scans)) xanes = xanes.mean(axis=0) xmcd_std = xmcd.std(axis=0) / np.sqrt(len(scans)) xmcd = xmcd.mean(axis=0) return energy, xanes, xmcd, xanes_std, xmcd_std else: return ( energy, xanes, xmcd, np.zeros((xanes.size)), np.zeros((xanes.size)), ) else: return energy, xanes, xmcd
[docs] def load_multi_lockin( scans, source, return_mean=True, positioner=None, dc_col=None, ac_col=None, acoff_col=None, **kwargs, ): """ Load multiple x-ray magnetic dichroism energy "lockin" scans. Parameters ---------- scans : iterable Sequence of scan_ids our uids. If scan_id is passed, it will load the last scan with that scan_id. Use kwargs for search options. source : databroker database, name of the spec file, or 'csv' Note that applicable kwargs depend on this selection. return_mean : boolean, optional Flag to indicate if the averaging of multiple scans will be performed. Note that if True three outputs are generated, otherwise two. positioner : string, optional Name of the positioner, this needs to be the same as defined in Bluesky or SPEC. If None is passed, it defauts to the x-ray energy. dc_col : string, optional Name of the DC scaler, again it needs to be the same name as in Bluesky or SPEC. If None is passed, it defaults to 'Lock DC'. ac_col : string, optional Name of the AC scaler. If None is passed, it defaults to 'Lock AC'. acoff_col : string, optional Name of the AC offset scaler. If None is passed, it defaults to 'Lock AC off'. kwargs : The necessary kwargs are passed to the loading functions defined by the `source` argument: - csv -> possible kwargs: folder, name_format. - spec -> possible kwargs: folder. - databroker -> possible kwargs: stream, query, use_db_v1. Note that a warning will be printed if the an unnecessary kwarg is passed. Returns ------- energy : numpy.array X-ray energy. xanes : numpy.array X-ray absorption. If return_mean = True: xanes.shape = (len(scans), len(energy)), otherwise: xanes.shape = (len(energy)) xmcd : numpy.array X-ray magnetic dichroism. It has the same shape as the xanes. xanes_std : numpy.array, optional Error of the mean of x-ray absorption. This will only be returned if return_mean = True. xmcd_std : numpy.array, optional Error of the mean of x-ray magnetic dichroism. This will only be returned if return_mean = True. See also -------- :func:`polartools.load_data.load_lockin` """ for scan in scans: if "energy" not in locals(): energy, xanes, xmcd = load_lockin( scan, source, positioner, dc_col, ac_col, acoff_col, **kwargs ) energy, xanes, xmcd = _sort_and_deduplicate(energy, xanes, xmcd) else: energy_tmp, xanes_tmp, xmcd_tmp = load_lockin( scan, source, positioner, dc_col, ac_col, acoff_col, **kwargs ) energy_tmp, xanes_tmp, xmcd_tmp = _sort_and_deduplicate( energy_tmp, xanes_tmp, xmcd_tmp ) xanes = np.vstack( ( xanes, interp1d( energy_tmp, xanes_tmp, kind="linear", fill_value=np.nan, bounds_error=False, )(energy), ) ) xmcd = np.vstack( ( xmcd, interp1d( energy_tmp, xmcd_tmp, kind="linear", fill_value=np.nan, bounds_error=False, )(energy), ) ) if return_mean: if len(xanes.shape) == 2: xanes_std = xanes.std(axis=0) / np.sqrt(len(scans)) xanes = xanes.mean(axis=0) xmcd_std = xmcd.std(axis=0) / np.sqrt(len(scans)) xmcd = xmcd.mean(axis=0) return energy, xanes, xmcd, xanes_std, xmcd_std else: return ( energy, xanes, xmcd, np.zeros((xanes.size)), np.zeros((xanes.size)), ) else: return energy, xanes, xmcd
[docs] def normalize_absorption( energy, mu, *, e0=None, edge_step=None, pre_range=None, pre_order=1, nvict=0, post_range=None, post_order=None, flat_range=None, flat_order=None, pre_pars=None, post_pars=None, flat_pars=None, ): """ Extract pre- and post-edge normalization curves by fitting polynomials. This is a wrapper of for the `pre_edge_background`, `post_edge_background`, and `post_edge_flatten`. The overall process is largely based on `larch.xafs.preedge`, but it was modified so that a polynomial of any order can be applied to the pre-edge, and the initial parameters for the pre- and post-edge polynomials can be given by the user. Parameters ---------- energy : iterable Incident energy. Must be in eV. It will raise an error if it notices the maximum of this list is below 100. mu : iterable Raw x-ray absorption. e0 : float or int, optional Absorption edge energy. If None, it is extracted from the data. edge_step : float, optional Size of edge step of the raw data. If None, it is calculated based by postedge[e0] - preedge[e0]. pre_range : list, optional List with the energy ranges [initial, final] of the pre-edge region **relative** to the absorption edge. If None is passed to either pre_range or one of the elements, a guessed value will be used. For example: - pre_range = None -> guess initial and final points. - pre_range = [None, -20] -> guess only initial point. - pre_range = [-40, None] -> guess only final point. - pre_range = [-40, -20] -> guess neither. pre_order : int, optional Order of the polynomial to be used in the pre-edge. Defaults to 1. nvict : int, optional Energy exponent to use. The pre-edge background is modelled by a line that is fit to xanes(energy)*energy**pre_exponent. Defaults to 0. post_range : list, optional List with the energy ranges [initial, final] of the post-edge region **relative** to the absorption edge. post_order : int, optional Order of the polynomial to be used in the post-edge. If None, it will be determined by `larch.xafs.preedge`: - If post_range[1]-post_range[0]>350 -> post_order = 2; - if 50 < post_range[1]-post_range[0] < 350 -> post_order = 1; - otherwise -> post_order = 0 flat_range : list, optional List with the energy ranges [initial, final] of the post-edge region **relative** to the absorption edge. If None, it will be set as the same as `post_range`. flat_order : int, optional Order of the polynomial to be used in the post-edge. If None, it will be set as the same as `post_order`. pre_pars, post_pars, flat_pars : lmfit.Parameters, optional Option to input the initial parameters to the polynomial used in the data flattening. These will be labelled 'c0', 'c1', ..., depending on `post_order`. See lmfit.models.PolynomialModel_ for details. .. _lmfit.models.PolynomialModel:\ https://lmfit.github.io/lmfit-py/builtin_models.html#polynomialmodel Returns ------- results : dict Dictionary with the results and parameters of the normalization and flattening. The most important items are: - 'energy' -> incident energy. - 'mu' -> raw xanes. - 'norm' -> normalized xanes. - 'flat' -> flattened xanes. See also -------- :func:`larch.xafs.preedge` :func:`polartools.absorption.pre_edge_background` :func:`polartools.absorption.post_edge_background` :func:`polartools.absorption.post_edge_flatten` """ # Start output dictionary results = {} sort = np.argsort(energy) results["energy"] = np.array(energy)[sort] results["mu"] = np.array(mu)[sort] # Process pre-edge pre1, pre2 = (None, None) if not pre_range else tuple(pre_range) pre_results = pre_edge_background( results["energy"], results["mu"], e0=e0, pre1=pre1, pre2=pre2, pre_order=pre_order, nvict=nvict, pre_pars=pre_pars, ) results.update(pre_results) # Process post-edge post1, post2 = (None, None) if not post_range else tuple(post_range) post_results = post_edge_background( results["energy"], results["mu"], preedge=results["preedge"], e0=results["e0"], edge_step=edge_step, post1=post1, post2=post2, post_order=post_order, post_pars=post_pars, ) results.update(post_results) # Flatten post-edge use_post = not bool(flat_order or flat_range or flat_pars) postedge_results = post_results if use_post else None flat1, flat2 = (None, None) if not flat_range else tuple(flat_range) flat_results = post_edge_flatten( results["energy"], results["norm"], e0=results["e0"], flat1=flat1, flat2=flat2, flat_order=flat_order, flat_pars=flat_pars, bkg_results=postedge_results, ) results.update(flat_results) return results
[docs] def pre_edge_background( energy, mu, e0=None, pre1=None, pre2=None, pre_order=1, nvict=0, pre_pars=None, ): """ Extracts the pre-edge background by fitting a polynomial. Based on `larch.xafs.preedge`. Modified so that a polynomial of any order can be applied to the pre-edge, and its initial parameters can be given by the user. Parameters ---------- energy : iterable Incident energy. Must be in eV. It will raise an error if it notices the maximum of this list is below 100. mu : iterable Raw x-ray absorption. e0 : float or int, optional Absorption edge energy. pre1, pre2 : float, optional Low, high energy limit of pre-edge range with respect to e0. If None, it will try to guess it based on mu(energy). pre_order : int, optional Order of the polynomial to be used in the pre-edge. Defaults to 1. nvict : int, optional Energy exponent to use. The pre-edge background is modelled by a line that is fit to xanes(energy)*energy**pre_exponent. Defaults to 0. pre_pars : lmfit.Parameters, optional Option to input the initial parameters to the polynomial used in the data flattening. These will be labelled 'c0', 'c1', ..., depending on `post_order`. See lmfit.models.PolynomialModel_ for details. .. _lmfit.models.PolynomialModel:\ https://lmfit.github.io/lmfit-py/builtin_models.html#polynomialmodel Returns ------- results : dict Dictionary with the results and parameters of the normalization and flattening. The most important items are: - 'energy' -> incident energy. - 'mu' -> raw xanes. - 'preedge' -> preedge polynomial. See also -------- :func:`larch.xafs.preedge` :func:`larch.absorption.normalize_absorption` """ # Edge energy e0, unit = _process_e0(energy, mu, e0) # Process pre-edge range pre1, pre2 = _process_preedge_params(energy, e0, pre1, pre2) # Find pre-edge background index = np.logical_and(energy > pre1 + e0, energy < pre2 + e0) mu_fit = mu[index] * energy[index] ** nvict energy_fit = energy[index] fit = _fit_polynomial(energy_fit, mu_fit, pre_order, pars=pre_pars) precoefs = fit.best_values preedge = fit.eval(x=energy) * energy ** (-nvict) return dict( energy=energy, mu=mu, preedge=preedge, pre1=pre1, pre2=pre2, pre_order=pre_order, nvict=nvict, e0=e0, precoefs=precoefs, energy_unit=unit, )
[docs] def post_edge_background( energy, mu, preedge=None, e0=None, edge_step=None, post1=None, post2=None, post_order=None, post_pars=None, ): """ Extracts the post-edge background by fitting a polynomial. Based on `larch.xafs.preedge`. Modified so that a polynomial of any order can be applied to the post-edge, and its initial parameters can be given by the user. Parameters ---------- energy : iterable Incident energy. Must be in eV. It will raise an error if it notices the maximum of this list is below 100. mu : iterable Raw x-ray absorption. e0 : float or int, optional Absorption edge energy. edge_step : float, optional Size of edge step of the raw data. If None, it is calculated based by postedge[e0] - preedge[e0]. preedge : iterable, optional Pre-edge polynomial. If None, it will be set to zero. post1, post2 : float, optional Low, high energy limit of post-edge range with respect to e0. If None, it will try to guess it based on mu(energy). post_order : int, optional Order of the polynomial to be used in the post-edge. If None, it will be determined using: - If post2-post1>350 -> post_order = 2; - if 50 < post2-post1 < 350 -> post_order = 1; - otherwise -> post_order = 0 post_pars : lmfit.Parameters, optional Option to input the initial parameters to the polynomial used in the data flattening. These will be labelled 'c0', 'c1', ..., depending on `post_order`. See lmfit.models.PolynomialModel_ for details. .. _lmfit.models.PolynomialModel:\ https://lmfit.github.io/lmfit-py/builtin_models.html#polynomialmodel Returns ------- results : dict Dictionary with the results and parameters of the normalization and flattening. The most important items are: - 'energy' -> incident energy. - 'mu' -> raw xanes. - 'norm' -> normalized XAS. - 'post-edge' -> post-edge background. - 'edge-step' -> Absorption jump size. See also -------- :func:`larch.xafs.preedge` :func:`larch.absorption.normalize_absorption` """ # Edge energy e0, unit = _process_e0(energy, mu, e0) # Post-edge params post1, post2, post_order = _process_postedge_params( energy, e0, post1, post2, post_order ) # Pre-edge if preedge is None: preedge = np.zeros(len(energy)) else: if len(preedge) != len(energy): raise TypeError("preedge must have the same dimensions as energy.") # Normalization index = np.logical_and(energy > post1 + e0, energy < post2 + e0) energy_fit = energy[index] mu_fit = (mu - preedge)[index] fit = _fit_polynomial(energy_fit, mu_fit, post_order, pars=post_pars) postcoefs = fit.best_values postedge = preedge + fit.eval(x=energy) if not edge_step: ie0 = index_nearest(energy, e0) edge_step = postedge[ie0] - preedge[ie0] norm = (mu - preedge) / edge_step return dict( energy=energy, mu=mu, postedge=postedge, preedge=preedge, postcoefs=postcoefs, post1=post1, post2=post2, post_order=post_order, energy_unit=unit, norm=norm, edge_step=edge_step, )
[docs] def post_edge_flatten( energy, norm, bkg_results=None, e0=None, flat1=None, flat2=None, flat_order=None, flat_pars=None, ): """ Flattens the normalized absorption. This is a slightly modified version from that in the larch_ package. .. _larch: https://xraypy.github.io/xraylarch/index.html The energies inputs must have the same units. Parameters ---------- energy : iterable Incident energy in eV. norm : iterable Normalized x-ray absorption. bkg_results : dict, optional Results of the data processing done by `post_edge_background`. If None, the remaining keyword arguments will be ignored. e0 : float or int Absorption edge energy. flat1, flat2 : float or int Low, high energy limit of flattening range with respect to e0. flat_order : int, optional Degree of polynomial to be used. If None, it will be determined using: - If post2-post1>350 -> post_order = 2; - if 50 < post2-post1 < 350 -> post_order = 1; - otherwise -> post_order = 0 fpars : lmfit.Parameters, optional Option to input the initial parameters. These will be labelled 'c0', 'c1', ..., depending on `nnorm`. See lmfit.models.PolynomialModel_ for details. .. _lmfit.models.PolynomialModel:\ https://lmfit.github.io/lmfit-py/builtin_models.html#polynomialmodel Returns ------- flat : numpy.array Flattened x-ray absorption. See also -------- :func:`larch.xafs.pre_edge` :func:`lmfit.models.PolynomialModel` """ # Edge energy e0, unit = _process_e0(energy, norm, e0) ie0 = index_nearest(energy, e0) # Runs the post edge fit with flat parameters if needed. if not bkg_results: bkg_results = post_edge_background( energy, norm, e0=e0, post1=flat1, post2=flat2, post_order=flat_order, post_pars=flat_pars, ) flat_function = bkg_results["postedge"] else: flat_function = bkg_results["postedge"] - bkg_results["preedge"] flat_function /= bkg_results["edge_step"] flat1 = bkg_results["post1"] flat2 = bkg_results["post2"] flat_order = bkg_results["post_order"] flatcoefs = bkg_results["postcoefs"] flat = norm - (flat_function - flat_function[ie0]) flat[:ie0] = norm[:ie0] return dict( energy=energy, norm=norm, flat=flat, flat1=flat1, flat2=flat2, flat_order=flat_order, flatcoefs=flatcoefs, flat_function=flat_function, energy_unit=unit, )
def _process_e0(energy, mu, e0): """Internal function to extract the e0 and energy unit.""" # Energy unit unit = "keV" if np.nanmax(energy) < 100 else "eV" # Edge energy if e0 is None: e0 = finde0(energy, mu) elif e0 < energy.min() or e0 > energy.max(): e0 = finde0(energy, mu) warn( "e0 is not contained in the provided energy range, " "using e0 = {:0.2f} {}".format(e0, unit) ) else: e0 = energy[index_nearest(energy, e0)] return e0, unit def _process_preedge_params(energy, e0, pre1, pre2): """Internal function to process the pre-edge parameters.""" # Process pre-edge range if pre1 is None: pre1 = np.nanmin(energy[1:-1]) - e0 # avoid first/last points if pre2 is None: pre2 = 5.0 * round(pre1 / 15.0) # from larch, don't understand logic. if pre1 > pre2: pre1, pre2 = pre2, pre1 return pre1, pre2 def _process_postedge_params(energy, e0, post1, post2, post_order): """Internal function to process the post-edge parameters.""" # Post-edge params if post2 is None: post2 = np.nanmax(energy[1:-1]) - e0 # avoid first/last two points if post1 is None: post1 = min(150, 5.0 * round(post2 / 15.0)) if post1 > post2: post1, post2 = post2, post1 if post_order is None: if post2 - post1 < 50: post_order = 0 elif post2 - post1 < 350: post_order = 1 else: post_order = 2 return post1, post2, post_order def _fit_polynomial(x, y, order, pars=None): """Internal function to fit a polynomial using `lmfit`.""" model = PolynomialModel(order) if not pars: pars = model.guess(y, x=x) return model.fit(y, pars, x=x)
[docs] def fluo_corr(norm, formula, elem, edge, line, anginp, angout): """ Correct over-absorption (self-absorption) for fluorescene XAFS using the based on the `larch` implementation of the FLUO alogrithm of D. Haskel. See FLUO manual_ for details. .. _manual: https://www3.aps.anl.gov/haskel/fluo.html Parameters ---------- norm : iter Normalized fluorescence formula : str Chemical formula of the compound. For example: 'EuO'. elem : str Element of interest. For example: 'Eu'. edge : str Absorption edge of interest. For example: 'L3'. line : str Fluorescence line measured. For example: 'La'. anginp : float Input angle with respect to the sample surface. See FLUO manual. anginp : float Output angle with respect to the sample surface. See FLUO manual. Returns -------- norm_corr : numpy.array Corrected normalized fluorescence. """ # Angular correction. Avoid divide by zero. ang_corr = np.sin(np.deg2rad(anginp)) / np.sin( max(1.0e-7, np.deg2rad(angout)) ) # Find edge energies and fluorescence line energy e_edge = xray_edge(elem, edge).energy e_fluor = xray_line(elem, line).energy # Calculate mu(E) for fluorescence energy, above, below edge # alpha ends up independent of density! muvals = material_mu( formula, np.array([e_fluor, e_edge - 10.0, e_edge + 10.0]), density=1 ) # Do the correction alpha = (muvals[0] * ang_corr + muvals[1]) / (muvals[2] - muvals[1]) norm_corr = np.array(norm) * alpha / (alpha + 1 - np.array(norm)) return norm_corr
[docs] def process_xmcd( scans_plus, scans_minus, source, xmcd_kind="dichro", load_parameters=dict(), normalization_parameters=dict(), normalization_parameters_minus=None, ): """ Process the XMCD scans of +/- magnetic fields. Parameters ---------- scans_plus : iterable List of scan number or scan_id of XMCD taken using the "plus" magnetic field. scans_minus : iterable List of scan number or scan_id of XMCD taken using the "minus" magnetic field. source : databroker database, name of the spec file, or 'csv' Note that applicable `load_parameters` depend on this selection. xmcd_kind : "dichro" or "lockin", optional Type of XMCD scan used, defaults to "dichro". load_parameters : dictionary Parameters used to load data. Passed as kwargs to `polartools.absorption.load_multi_dichro` or `polartools.absorption.load_multi_lockin`. normalization_parameters : dictionary Parameters used to normalize the "plus" data (and the "minus" data when ``normalization_parameters_minus`` is None). Passed as kwargs to `polartools.absorption.normalize_absorption`. normalization_parameters_minus : dictionary, optional If provided, used to normalize the "minus" data independently. When None (default), the "minus" data is normalized with the same parameters as the "plus" data. Returns -------- plus : dictionary XANES/XMCD taken using the "plus" magnetic field. Output of `polartools.absorption.normalize_absorption` with the XMCD data added. minus : dictionary XANES/XMCD taken using the "minus" magnetic field. Output of `polartools.absorption.normalize_absorption` with the XMCD data added. See also -------- :func:`polartools.absorption.load_multi_dichro` :func:`polartools.absorption.load_multi_lockin` :func:`polartools.absorption.normalize_absorption` """ if xmcd_kind == "dichro": _load_func = load_multi_dichro elif xmcd_kind == "lockin": _load_func = load_multi_lockin else: raise ValueError( "The parameter 'xmcd_kind' must be either 'lockin' or 'dichro', " f"but {xmcd_kind} was entered." ) minus_parameters = ( normalization_parameters if normalization_parameters_minus is None else normalization_parameters_minus ) x, y, z, _, _ = _load_func(scans_plus, source, **load_parameters) plus = normalize_absorption(x * 1000, y, **normalization_parameters) plus["xmcd"] = z[np.argsort(x)] / plus["edge_step"] x, y, z, _, _ = _load_func(scans_minus, source, **load_parameters) minus = normalize_absorption(x * 1000, y, **minus_parameters) minus["xmcd"] = z[np.argsort(x)] / minus["edge_step"] return plus, minus
[docs] def plot_xmcd(plus, minus): """ Plots a new figure with the combined XMCD of plus and minus fields. Parameters ---------- plus : dictionary XANES/XMCD taken using the "plus" magnetic field. Output of the normalize_absorption function. minus : dictionary XANES/XMCD taken using the "minus" magnetic field. Output of the normalize_absorption function. Returns -------- fig : matplotlib.pyplot.figure Figure instance. axs : list List of the figure axes. See also -------- :func:`polartools.absorption.process_xmcd` :func:`polartools.absorption.normalize_absorption` """ fig, ((ax1, ax2), (ax3, ax4), (ax5, ax6)) = plt.subplots( 3, 2, figsize=(8, 9) ) plt.sca(ax1) plt.plot(plus["energy"], plus["mu"]) plt.plot(plus["energy"], plus["preedge"]) plt.plot(plus["energy"], plus["postedge"]) plt.axvline(x=plus["e0"] + plus["pre1"], ls="--", alpha=0.5, color="C1") plt.axvline(x=plus["e0"] + plus["pre2"], ls="--", alpha=0.5, color="C1") plt.axvline(x=plus["e0"] + plus["post1"], ls="--", alpha=0.5, color="C2") plt.axvline(x=plus["e0"] + plus["post2"], ls="--", alpha=0.5, color="C2") plt.xlabel("Energy (eV)") plt.ylabel("Raw XANES") plt.sca(ax2) plt.plot(minus["energy"], minus["mu"]) plt.plot(minus["energy"], minus["preedge"]) plt.plot(minus["energy"], minus["postedge"]) plt.axvline(x=minus["e0"] + minus["pre1"], ls="--", alpha=0.5, color="C1") plt.axvline(x=minus["e0"] + minus["pre2"], ls="--", alpha=0.5, color="C1") plt.axvline(x=minus["e0"] + minus["post1"], ls="--", alpha=0.5, color="C2") plt.axvline(x=minus["e0"] + minus["post2"], ls="--", alpha=0.5, color="C2") plt.xlabel("Energy (eV)") plt.ylabel("Raw XANES") plt.sca(ax3) plt.plot(plus["energy"], plus["norm"], label="H+") plt.plot(minus["energy"], minus["norm"], label="H-") plt.legend() plt.xlabel("Energy (eV)") plt.ylabel("Normalized XANES") plt.sca(ax4) plt.plot(plus["energy"], plus["xmcd"] * 100, label="H+") plt.plot(minus["energy"], minus["xmcd"] * 100, label="H-") plt.legend() plt.xlabel("Energy (eV)") plt.ylabel("Normalized XMCD (%)") plt.sca(ax5) plt.plot(plus["energy"], (plus["xmcd"] - minus["xmcd"]) / 2 * 100) plt.xlabel("Energy (eV)") plt.ylabel("Normalized XMCD (%)") plt.sca(ax6) plt.plot(plus["energy"], (plus["xmcd"] + minus["xmcd"]) / 2 * 100) plt.xlabel("Energy (eV)") plt.ylabel("Normalized Artifact (%)") plt.tight_layout() return fig, [ax1, ax2, ax3, ax4, ax5, ax6]
[docs] def save_xmcd(plus, minus, file_name, header="XMCD\n", fmt="%0.5e"): """ Saves processed XANES/XMCD data into a file. Parameters ---------- plus : dictionary XMCD taken using the "plus" magnetic field. It needs at least three keys: "energy", "norm", and "xmcd". minus : dictionary XMCD taken using the "minus" magnetic field. It needs at least three keys: "energy", "norm", and "xmcd". file_name : string File name (including folder if not current). header : string, optional File header. Note that each line has to be finished with the newline character. fmt : string, optional Format of the data, defaults to %0.5e See also -------- :func:`polartools.absorption.process_xmcd` :func:`polartools.absorption.normalize_absorption` """ combined = np.vstack( ( plus["energy"], (plus["norm"] + minus["norm"]) / 2, (plus["xmcd"] - minus["xmcd"]) / 2, (plus["xmcd"] + minus["xmcd"]) / 2, ) ).transpose() header += "Energy\tXANES\tXMCD\tArtifact" np.savetxt(file_name, combined, header=header, fmt=fmt)