"""
Functions to load and process x-ray diffraction data.
.. autosummary::
~fit_peak
~load_info
~fit_series
~load_series
~get_type
~load_mesh
~plot_2d
~plot_fit
~load_axes
~plot_data
~dbplot
"""
from enum import Enum
import numpy as np
from pandas import DataFrame
import lmfit.models
import matplotlib.pyplot as plt
import copy
from os.path import join
from spec2nexus.spec import SpecDataFile
from xarray import DataArray
from .load_data import (
load_table,
load_csv,
is_Bluesky_specfile,
collect_meta,
load_databroker,
)
plt.ion()
rng = np.random.default_rng(seed=42)
_spec_default_cols = dict(
positioner="4C Theta",
detector="APD",
monitor="IC3",
)
_bluesky_default_cols = dict(
positioner="fourc_theta",
detector="APDSector4",
monitor="Ion Ch 3",
)
[docs]
class Model(Enum):
Gaussian = lmfit.models.GaussianModel
Lorentzian = lmfit.models.LorentzianModel
PseudoVoigt = lmfit.models.PseudoVoigtModel
[docs]
def fit_peak(xdata, ydata, model=Model.Gaussian):
"""
Fit Bragg peak with model of choice: Gaussian, Lorentzian, PseudoVoigt.
Uses lmfit (https://lmfit.github.io/lmfit-py/).
Parameters
----------
xdata : iterable
List of x-axis values.
yydata : iterable
List of y-axis values.
model: enumeration
fit model: model = Model.Gaussian (default), Model.Lorentzian, Model.PseudoVoigt
Returns
-------
fit : lmfit ModelResult class
Contains the fit results. See:
https://lmfit.github.io/lmfit-py/model.html#the-modelresult-class
"""
peak_mod = model.value()
background = lmfit.models.LinearModel()
mod = peak_mod + background
pars = background.make_params(intercept=ydata.min(), slope=0)
pars += peak_mod.guess(ydata, x=xdata)
pars["sigma"].set(min=0)
pars["amplitude"].set(min=0)
fit = mod.fit(ydata, pars, x=xdata)
return fit
[docs]
def load_info(scan_id, info, source=None, **kwargs):
"""
Load metadata variable value.
Parameters
----------
scan : int
Scan_id our uid. If scan_id is passed, it will load the last scan with
that scan_id.
info : list
If SPEC, information on metadata to be read: List starting with #P, #U,
#Q for motor positions, user values or Q-position or general #xx:
- #P: ['#P', row, element_number], e.g. ['#P', 2, 0]
- #U: ['#U', Variable, element_number], e.g. ['#U', 'KepkoI', 1]
- #Q: ['#Q', None, element_number], e.g. ['#Q', None, 0]
- #xx like #UA etc.: ['#UA', row, element_number],
If CSV, #metadata_name
If db, #baseline_information (e.g. #lakeshore340_sample)
source: databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
kwargs :
The necessary kwargs are passed to the loading functions defined by the
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query, use_db_v1.
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Returns
-------
value : number
Variable value.
"""
folder = kwargs.pop("folder", "")
if source == "csv":
value = load_csv(
scan_id, folder=folder, name_format="scan_{}_baseline.csv"
)[info[1:]].mean()
# to be implemented for csv
elif isinstance(source, str) or isinstance(source, SpecDataFile):
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
specscan = source.getScan(scan_id)
value = ""
if isinstance(info, str):
raise ValueError(
"expect list [#P, (#Q, #U), Variable (string or line number), "
"element number]"
)
if info[0] == "#P":
data_array = specscan.P
if isinstance(info[1], int):
value = data_array[info[1]][int(info[2])]
else:
raise ValueError(
"For #P, expect row and column integer numbers"
)
elif info[0] == "#U":
data_array = specscan.U
if isinstance(info[1], str):
for item in data_array:
ival = item.split(":")
if ival[0] == info[1]:
value = ival[1].split()[int(info[2])]
break
else:
raise ValueError("For #U, expect string and item number")
elif info[0] == "#Q":
data_array = specscan.Q
value = data_array[info[2]]
elif info[0][0] == "#":
data_array = specscan.raw.split("\n")
index = 0
for element in data_array:
if element[0 : len(info[0])] == info[0]:
if index == info[1]:
value = element.split()[info[2] + 1]
index += 1
else:
raise ValueError(
"expect list [#P, (#Q, #U), Variable (string or line number), "
"element number]"
)
if not value:
raise ValueError(
"expect list [#P, (#Q, #U, #xx), Variable (string or line number), "
"element number]"
)
else:
table = load_databroker(scan_id, db=source, stream="baseline")
value = table[info[1 : len(info)]].mean()
return value
[docs]
def fit_series(
scan_series,
source=None,
output=False,
var_series=None,
positioner=None,
detector=None,
monitor=None,
normalize=False,
xrange=None,
**kwargs,
):
"""
Fit series of scans with chosen functional and returns fit parameters.
Uses lmfit_.
.. _lmfit: https://lmfit.github.io/lmfit-py/
Parameters
----------
scan_series : int
start, stop, step, [start2, stop2, step2, ... ,startn, stopn, stepn]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
output : boolean, optional
Output fit parameters and plot data+fit for each scan.
var_series : string or list
If string:
- Varying variable for scan series to be read from scan\
{detector), e.g. SampK (sample temperature), optional.
- String starting with #metadata, reads metadata from CSV baseline
If list:
SPEC: Information on metadata to be read: List starting with #P, #U,
#Q for motor positions, user values or Q-position or general #xx:
- #P: ['#P', row, element_number], e.g. ['#P', 2, 0]
- #U: ['#U', Variable, element_number], e.g. ['#U', 'KepkoI', 1]
- #Q: ['#Q', None, element_number], e.g. ['#Q', None, 0]
- #xx like #UA etc.: ['#UA', row, element_number].
CSV: #metadata_name
If None, successive scans will be numbered starting from zero.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC. If None is passed, it defauts to '4C Theta' motor.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky. If None is passed, it defaults to the APD detector.
monitor : string, optional
Name of the monitor detector. If None is passed, it defaults to the ion
chamber 3.
normalize : boolean, optional
Normalization to selected/default monitor on/off
xrange : list
Set positioner range for fitting
kwargs :
The necessary kwargs are passed to the loading functions defined by the
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query, use_db_v1.
Note that a warning will be printed if the an unnecessary kwarg is
passed.
model -> fit model, options:
model = Model.Gaussian (default), Model.Lorentzian, Model.PseudoVoigt.
Returns
-------
fit : lmfit ModelResult class
Contains the fit results. See:
https://lmfit.github.io/lmfit-py/model.html#the-modelresult-class
"""
# Select default parameters
model = kwargs.pop("model", Model.Gaussian)
folder = kwargs.pop("folder", "")
if isinstance(source, (str, SpecDataFile)) and source != "csv":
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
if is_Bluesky_specfile(source):
_defaults = _bluesky_default_cols
else:
_defaults = _spec_default_cols
else:
_defaults = _bluesky_default_cols
if len(scan_series) % 3:
raise ValueError(
f"expected 3*n={3 * (len(scan_series) // 3)} arguments, got "
f"{len(scan_series)}"
)
nbp = 0
for series in range(1, len(scan_series), 3):
nbp = (
nbp
+ int(scan_series[series] - scan_series[series - 1])
/ scan_series[series + 1]
+ 1
)
fit_result = [np.zeros(9) for i in range(int(nbp))]
if output:
fig = plt.figure(num="Fitting", figsize=(6, 4), clear=True)
ax = fig.add_subplot(1, 1, 1)
index = 0
for series in range(1, len(scan_series), 3):
start = scan_series[series - 1]
stop = scan_series[series]
step = scan_series[series + 1]
print("Intervals: {} to {} with step {}".format(start, stop, step))
for scan in range(start, stop + 1, step):
if var_series and var_series[0][0] == "#":
fit_result[index][1] = load_info(
scan,
source=source,
info=var_series,
folder=folder,
**kwargs,
)
fit_result[index][2] = 0
table = load_table(
scan,
source=source,
folder=folder,
**kwargs,
)
elif var_series:
table = load_table(
scan,
source=source,
folder=folder,
**kwargs,
)
fit_result[index][1] = table[var_series].mean()
fit_result[index][2] = table[var_series].std()
else:
table = load_table(
scan,
source=source,
folder=folder,
**kwargs,
)
fit_result[index][1] = index
fit_result[index][2] = 0
if (
not isinstance(source, SpecDataFile)
or isinstance(source, str)
or source == "csv"
):
positioner, detector, monitor = (
load_axes(
scan,
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
defaults=_defaults,
read=False,
**kwargs,
)
if positioner in table.columns
else load_axes(
scan,
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
defaults=_defaults,
read=True,
**kwargs,
)
)
else:
if not positioner:
positioner = _defaults["positioner"]
if not detector:
detector = _defaults["detector"]
if not monitor:
monitor = _defaults["monitor"]
table = table.set_index(positioner)
if xrange:
table = table.loc[xrange[0] : xrange[1]]
x = table.index.to_numpy()
y = table[detector].to_numpy()
if normalize:
y0 = table[monitor].to_numpy()
y = y / y0
fit = fit_peak(x, y, model=model)
if output:
print(f"Fitting scan #{scan} with {model} model")
for key in fit.params:
print(
key,
"=",
fit.params[key].value,
"+/-",
fit.params[key].stderr,
)
ax.plot(x, y, color=(f"C{index}"), label=f"#{scan}")
ax.plot(
x,
fit.best_fit,
color=(f"C{index}"),
linestyle="dotted",
)
fit_result[index][0] = scan
fit_result[index][3] = fit.params["amplitude"].value
fit_result[index][4] = fit.params["amplitude"].stderr
fit_result[index][5] = fit.params["center"].value
fit_result[index][6] = fit.params["center"].stderr
fit_result[index][7] = fit.params["fwhm"].value
fit_result[index][8] = fit.params["fwhm"].stderr
index += 1
if output:
ax.legend(loc=0)
if hasattr(__builtins__, "__IPYTHON__"):
plt.get_current_fig_manager().show()
else:
plt.show()
return DataFrame(
fit_result,
columns=[
"Scan #",
"Index",
"Std Index",
"Intensity",
"Std I",
"Position",
"Std P",
"Width",
"Std W",
],
)
[docs]
def load_series(
scan_series,
source=None,
log=False,
var_series=None,
positioner=None,
detector=None,
monitor=None,
normalize=False,
scale=None,
**kwargs,
):
"""
Load series of scans as function of variable like temperature of field
which can be spaced unequally. Generates input arrays for plot_2d.
Parameters
----------
scan_series : list, int
start, stop, step, [start2, stop2, step2, ... ,startn, stopn, stepn]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
var_series : string or list
string:
- Varying variable for scan series to be read from scan\
(detector), e.g. SampK (sample temperature), optional.
- String starting with #metadata, reads metadata from CSV baseline
list:
SPEC: Information on metadata to be read: List starting with #P, #U,
#Q for motor positions, user values or Q-position or general #xx:
- #P: ['#P', row, element_number], e.g. ['#P', 2, 0]
- #U: ['#U', Variable, element_number], e.g. ['#U', 'KepkoI', 1]
- #Q: ['#Q', None, element_number], e.g. ['#Q', None, 0]
- #xx like #UA etc.: ['#UA', row, element_number],
CSV: #metadata_name
None, successive scans will be numbered starting from zero.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC. If None is passed, it defauts to '4C Theta' motor.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky. If None is passed, it defaults to the APD detector.
monitor : string, optional
Name of the monitor detector for normalization. If None is passed, data
are not normalized.
log : boolean
If True, z-axis plotted in logarithmic scale.
scale : list, int
intensity limits: [z_min,z_max]
kwargs :
The necessary kwargs are passed to the loading and fitting functions
defined by the `source` argument:
- csv -> possible kwargs: folder, name_format, e.g.\
"scan_{}_primary.csv"
- spec -> possible kwargs: folder
- databroker -> possible kwargs: stream, query
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Returns
-------
data : arrays with x, y and z information for 2D plot
"""
nbp = 0
for series in range(1, len(scan_series), 3):
nbp = (
nbp
+ int(scan_series[series] - scan_series[series - 1])
/ scan_series[series + 1]
+ 1
)
folder = kwargs.pop("folder", "")
if isinstance(source, (str, SpecDataFile)) and source != "csv":
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
if is_Bluesky_specfile(source):
_defaults = _bluesky_default_cols
else:
_defaults = _spec_default_cols
else:
_defaults = _bluesky_default_cols
if len(scan_series) % 3:
raise ValueError(
f"expected 3*n={3 * (len(scan_series) // 3)} arguments, got "
f"{len(scan_series)}"
)
table = load_table(
scan_series[1],
source=source,
folder=folder,
**kwargs,
)
if (
not isinstance(source, SpecDataFile)
or isinstance(source, str)
or source == "csv"
):
positioner, detector, monitor = (
load_axes(
scan_series[1],
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
defaults=_defaults,
read=False,
**kwargs,
)
if positioner in table.columns
else load_axes(
scan_series[1],
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
defaults=_defaults,
read=True,
**kwargs,
)
)
else:
if not positioner:
positioner = _defaults["positioner"]
if not detector:
detector = _defaults["detector"]
if not monitor:
monitor = _defaults["monitor"]
data_len = len(table[detector])
datax = [np.zeros(data_len) for i in range(int(nbp))]
datay = [np.zeros(data_len) for i in range(int(nbp))]
dataz = [np.zeros(data_len) for i in range(int(nbp))]
index = 0
for series in range(1, len(scan_series), 3):
start = scan_series[series - 1]
stop = scan_series[series]
step = scan_series[series + 1]
print("Intervals: {} to {} with step {}".format(start, stop, step))
for scan in range(start, stop + 1, step):
if var_series and var_series[0][0] == "#":
table = load_table(
scan,
source=source,
folder=folder,
**kwargs,
)
y_value = load_info(
scan,
source=source,
info=var_series,
folder=folder,
**kwargs,
)
tt = np.empty(data_len)
tt.fill(y_value)
datay[index] = tt
elif var_series:
table = load_table(
scan,
source=source,
folder=folder,
**kwargs,
)
datay[index] = table[var_series]
else:
table = load_table(
scan,
source=source,
folder=folder,
**kwargs,
)
tt = np.empty(data_len)
tt.fill(index)
datay[index] = tt
datax[index] = table[positioner]
if normalize:
dataz[index] = table[detector] / table[monitor]
else:
dataz[index] = table[detector]
if log:
dataz[index].replace(0, 1, inplace=True)
dataz[index] = np.log10(dataz[index])
if scale:
if len(scale) > 1:
dataz[index].values[dataz[index] < float(scale[0])] = float(
scale[0]
)
dataz[index].values[dataz[index] > float(scale[1])] = float(
scale[1]
)
else:
dataz[index].values[dataz[index] > float(scale[0])] = float(
scale[0]
)
index += 1
return datax, datay, dataz, detector, positioner
[docs]
def get_type(scan_id, source=None, **kwargs):
"""
get_type returns type of scan and scan parameters.
Parameters
----------
scan_id : int
scan number
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
kwargs :
The necessary kwargs are passed to the loading and fitting functions
defined by the `source` argument:
- csv -> possible kwargs: folder, name_format, e.g.\
"scan_{}_primary.csv"
- spec -> possible kwargs: folder
- databroker -> possible kwargs: stream, query
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Additional parameters in kwargs:
folder: location of scan file
Returns
-------
data : type of scan and scan parameters.
"""
_kwargs = copy.deepcopy(kwargs)
folder = _kwargs.pop("folder", "")
detector = _kwargs.pop("detector", "")
scan_info = {
"scan_no": 0,
"plan_name": "rel_scan",
"scan_type": None,
"motor0": None,
"x0": 0,
"x1": 0,
"xint": 0,
"motor1": None,
"y0": 0,
"y1": 0,
"yint": 0,
"detector": None,
}
if source == "csv":
pass
# to be implemented for csv
elif isinstance(source, str) or isinstance(source, SpecDataFile):
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
specscan = source.getScan(scan_id)
if specscan is None:
raise ValueError(f'Filename "{source}" not existing!')
scan_cmd = specscan.scanCmd.split()
plan_name = scan_cmd[0]
scan_info["x0"] = scan_cmd[2]
scan_info["x1"] = scan_cmd[3]
scan_info["xint"] = scan_cmd[4]
if plan_name == "mesh" or plan_name == "hklmesh":
scan_info["plan_name"] = plan_name
scan_info["y0"] = scan_cmd[6]
scan_info["y1"] = scan_cmd[7]
scan_info["yint"] = scan_cmd[8]
elif plan_name == "dichromesh":
scan_info["plan_name"] = plan_name
scan_info["y0"] = scan_cmd[6]
scan_info["y1"] = scan_cmd[7]
scan_info["yint"] = scan_cmd[8]
scan_info["scan_type"] = "dichro"
elif plan_name != "qxscan":
scan_info["plan_name"] = plan_name
else:
pass
else:
scan_read = collect_meta(
[scan_id],
["plan_name", "plan_pattern_args", "num_points", "hints"],
db=source,
)
for scanno, plan in scan_read.items():
scan_info["scan_no"] = scanno
for key, item in plan.items():
if key == "plan_name":
scan_info["plan_name"] = item[0]
if key == "num_points":
if not scan_info["xint"]:
scan_info["xint"] = item[0]
if key == "plan_pattern_args":
if (
scan_info["plan_name"] == "grid_scan"
or scan_info["plan_name"] == "rel_grid_scan"
):
scan_info["x0"] = item[0]["args"][1]
scan_info["x1"] = item[0]["args"][2]
scan_info["xint"] = item[0]["args"][3]
scan_info["y0"] = item[0]["args"][5]
scan_info["y1"] = item[0]["args"][6]
scan_info["yint"] = item[0]["args"][7]
else:
scan_info["x0"] = item[0]["args"][-2]
scan_info["x1"] = item[0]["args"][-1]
if key == "hints":
scan_info["motor0"] = item[0]["dimensions"][0][0][0]
if (
scan_info["plan_name"] == "grid_scan"
or scan_info["plan_name"] == "rel_grid_scan"
):
scan_info["motor1"] = item[0]["dimensions"][1][0][0]
scan_info["scan_type"] = item[0].get("scan_type", None)
scan_info["detector"] = (
detector if detector else item[0]["detectors"][0]
)
return scan_info
[docs]
def load_mesh(
scan,
scan_range,
source=None,
log=False,
mrange="reduced",
detector=None,
**kwargs,
):
"""
Load mesh generates input array for plot_2d from mesh scans:
mesh, dichromesh, hklmesh (SPEC)
grid_scan, rel_grid_scan (BlueSky)
Parameters
----------
scan_series : list, int
start, stop, step, [start2, stop2, step2, ... ,startn, stopn, stepn]
scan_range : list, int
scan parameters of mesh scan [x0, x1, xinterval, y0, y1, yinterval]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
log: boolean
If True, z-axis plotted in logarithmic scale.
mrange: string, list
reduced, full, [xmin,ymin,xmx,ymax]
kwargs :
The necessary kwargs are passed to the loading and fitting functions
defined by the `source` argument:
- csv -> possible kwargs: folder, name_format, e.g.\
"scan_{}_primary.csv"
- spec -> possible kwargs: folder
- databroker -> possible kwargs: stream, query
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Additional parameters in kwargs:
scale : list, int
intensity limits: [z_min,z_max]
Returns
-------
data : arrays with x, y and z information for 2D plot and axes names
"""
data = load_table(scan, source=source, **kwargs)
if (
scan_range["plan_name"] == "grid_scan"
or scan_range["plan_name"] == "rel_grid_scan"
):
x_label = scan_range["motor0"]
y_label = scan_range["motor1"]
if x_label == "nanopositioner_nanox":
x_label = "nanopositioner_nanox_user_setpoint"
if y_label == "nanopositioner_nanoy":
y_label = "nanopositioner_nanoy_user_setpoint"
z_label = scan_range["detector"]
xr = int(scan_range["xint"])
yr = int(scan_range["yint"])
else:
x_label = data.columns[0]
y_label = data.columns[1]
z_label = detector if detector else data.columns[-1]
yr = int(scan_range["yint"]) + 1
xr = int(scan_range["xint"]) + 1
x = data[x_label]
y = data[y_label]
zp = data[z_label]
ya = float(scan_range["y0"])
yb = float(scan_range["y1"])
ys = (yb - ya) / (yr - 1)
if log:
zp.replace(0, 1, inplace=True)
zp = np.log10(zp)
xi = x.unique()
yi = y.unique()
if xi.size > xr:
xi = x[0 : xr * yr : yr].to_numpy()
yi = y[0:yr:1].to_numpy()
if yi.size < yr and mrange == "full":
app = np.arange(yi[-1] + ys, yb, ys)
yi = np.append(yi, app)
z = np.zeros((xi.size * yi.size))
z[: zp.size] = zp
z[zp.size :] = np.nan
else:
data = data.groupby([y_label, x_label]).sum()[z_label]
zp_left = data.unstack()
yi = zp_left.index.values
xi = zp_left.columns.values
zi = zp_left.values
return xi, yi, zi, x_label, y_label, z_label
[docs]
def load_dichromesh(
scan,
scan_range,
source=None,
detector=None,
**kwargs,
):
"""
Load dichromesh generates input array for plot_2d from mesh scans with
several polarizations per scan point:
- dichromesh (SPEC)
- dichro_grid_scan (BlueSky)
Parameters
----------
scan_series : list, int
start, stop, step, [start2, stop2, step2, ... ,startn, stopn, stepn]
scan_range : list, int
scan parameters of mesh scan [x0, x1, xinterval, y0, y1, yinterval]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
kwargs :
The necessary kwargs are passed to the loading and fitting functions
defined by the `source` argument:
- csv -> possible kwargs: folder, name_format, e.g.\
"scan_{}_primary.csv"
- spec -> possible kwargs: folder
- databroker -> possible kwargs: stream, query
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Additional parameters in kwargs:
scale : list, int
intensity limits: [z_min,z_max]
Returns
-------
data : arrays with x, y and z information for 2D plot for left and right polarizatio and axes names
"""
data = load_table(scan, source=source, **kwargs)
if (
scan_range["plan_name"] == "grid_scan"
or scan_range["plan_name"] == "rel_grid_scan"
):
x_label = scan_range["motor0"]
y_label = scan_range["motor1"]
if x_label == "nanopositioner_nanox":
x_label = "nanopositioner_nanox_user_setpoint"
if y_label == "nanopositioner_nanoy":
y_label = "nanopositioner_nanoy_user_setpoint"
z_label = scan_range["detector"]
data = (
data.groupby([x_label, y_label, "pr2_pzt_localDC"])
.sum()
.unstack("pr2_pzt_localDC")[z_label]
)
left_p = data.columns[0]
right_p = data.columns[1]
zp_left = data[left_p].unstack()
zp_right = data[right_p].unstack()
yi = zp_left.index.values
xi = zp_left.columns.values
zl = zp_left.values
zr = zp_right.values
else:
# needs to be tested for dichromesh (spec)
x_label = data.columns[0]
y_label = data.columns[1]
z_label = detector if detector else data.columns[-1]
xi = data[x_label].unique()
yi = data[y_label].unique()
zl = data[z_label]
z = np.zeros((xi.size * yi.size))
z[: zl.size] = zl
z[zl.size :] = np.nan
zi = np.reshape(z, (yi.size, xi.size))
zr = zi
return xi, yi, zi, zr, x_label, y_label, z_label
[docs]
def plot_2d(
scans,
source=None,
var_series=None,
positioner=None,
detector=None,
monitor=None,
normalize=False,
log=False,
scale=None,
mrange="reduced",
direction=[1, 1],
output=False,
xcut=None,
ycut=None,
plot2=None,
scale2=None,
**kwargs,
):
"""
Plot 2d:
Creates 2D plot from individual 1D scans as function of variable parameter
or plots a 2D mesh scan.
Supported mesh scans:
- mesh, dichromesh, hklmesh (SPEC)
- grid_scan, rel_grid_scan (BlueSky)
Parameters
----------
scans : list, int
1D-scans: start, stop, step, [start2, stop2, step2, ... ,startn, stopn, stepn]
e.g. [10,14,2,23,27,4] will use scan #10,12,14,23,27
mesh-scan: scan_number, e.g. 10 will read scan #10
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
var_series: string or list, optional
string:
- Varying variable for scan series to be read from scan
(detector), e.g. SampK (sample temperature), optional.
- String starting with #metadata, reads metadata from CSV baseline
list:
Information on metadata to be read: List starting with #P, #U or #Q for
motor positions, user values or Q-position, or general #xx, optional:
- #P: ['#P', row, element_number], e.g. ['#P', 2, 0]
- #U: ['#U', Variable, element_number], e.g. ['#U', 'KepkoI', 1]
- #Q: ['#Q', None, element_number], e.g. ['#Q', None, 0]
- #xx like #UA etc.: ['#UA', row, element_number],
If None, successive scans will be numbered starting from zero.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC. If None is passed, it defauts to '4C Theta' motor.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky. If None is passed, it defaults to the APD detector.
monitor : string, optional
Name of the monitor detector for normalization. If None is passed,
data are not normalized.
log: boolean, optional
If True, z-axis plotted in logarithmic scale.
scale : list, int, optional
intensity limits: [z_min,z_max]
scale2 : list, int, optional
intensity limits: [z_min,z_max] for sum if 2 plots are provided
mrange : list, optional
full, reduced, [xmin,ymin,xmax,ymax]
direction : list, int, optional
multiply axes for inversion: [1,-1]
output : string, optional
Output file for png file of plot.
xcut : list, optional
plot 1D cuts for x-axis values
ycut : list, optional
plot 1D cuts for y-axis values
plot2 : list, int
same as scans. Only z-values are taken. x and y used from scans
kwargs:
The necessary kwargs are passed to the loading and fitting functions
defined by the `source` argument:
- csv -> possible kwargs: folder, name_format, e.g.\
"scan_{}_primary.csv"
- spec -> possible kwargs: folder
- databroker -> possible kwargs: stream, query
Returns
-------
2D plot, png-file
"""
dichro = False
if isinstance(scans, int):
scan_series = [scans, scans, 1]
elif isinstance(scans, list):
scan_series = scans
else:
raise ValueError(f"expected int or list got '{scans}'")
scan_info = get_type(
scan_id=scan_series[0], source=source, detector=detector, **kwargs
)
if (
scan_info["plan_name"] == "mesh"
or scan_info["plan_name"] == "dichromesh"
or scan_info["plan_name"] == "hklmesh"
or scan_info["plan_name"] == "grid_scan"
or scan_info["plan_name"] == "rel_grid_scan"
):
if scan_info["scan_type"] == "dichro":
dichro = True
(
datax,
datay,
dataz,
dataz2,
positioner,
var_series,
detector,
) = load_dichromesh(
scan_series[0],
scan_info,
source=source,
log=log,
mrange=mrange,
detector=detector,
**kwargs,
)
else:
datax, datay, dataz, positioner, var_series, detector = load_mesh(
scan_series[0],
scan_info,
source=source,
log=log,
mrange=mrange,
detector=detector,
**kwargs,
)
if plot2:
_, _, dataz2, _, _, _ = load_mesh(
plot2,
scan_info,
source=source,
log=log,
mrange=mrange,
detector=detector,
**kwargs,
)
else:
datax, datay, dataz, detector, positioner = load_series(
scan_series=scan_series,
source=source,
log=log,
var_series=var_series,
positioner=positioner,
detector=detector,
monitor=monitor,
normalize=normalize,
**kwargs,
)
if plot2:
_, _, dataz2, _, _ = load_series(
scan_series=plot2,
source=source,
log=log,
var_series=var_series,
positioner=positioner,
detector=detector,
monitor=monitor,
normalize=normalize,
**kwargs,
)
if bool(xcut) ^ bool(ycut):
fig = (
plt.figure(
num="Plot_2d - sum/diff - xcut/ycut",
clear=True,
figsize=(15, 4),
constrained_layout=True,
)
if plot2
else plt.figure(
num="Plot_2d - xcut/ycut",
clear=True,
figsize=(10.3, 4),
constrained_layout=True,
)
)
ax = (
fig.subplots(
ncols=5, gridspec_kw={"width_ratios": [1, 0.05, 1, 0.05, 1]}
)
if plot2
else fig.subplots(
ncols=3, gridspec_kw={"width_ratios": [1, 0.05, 1]}
)
)
elif xcut and ycut:
fig = (
plt.figure(
num="Plot_2d - sum/diff - xcut and ycut",
clear=True,
figsize=(20, 4),
constrained_layout=True,
)
if plot2
else plt.figure(
num="Plot_2d - xcut and ycut",
clear=True,
figsize=(15, 4),
constrained_layout=True,
)
)
ax = (
fig.subplots(
ncols=6, gridspec_kw={"width_ratios": [1, 0.05, 1, 0.05, 1, 1]}
)
if plot2
else fig.subplots(
ncols=4, gridspec_kw={"width_ratios": [1, 0.05, 1, 1]}
)
)
elif plot2:
fig = plt.figure(
num="Plot_2d - sum/diff",
clear=True,
figsize=(11, 4),
constrained_layout=True,
)
ax = fig.subplots(
ncols=4, gridspec_kw={"width_ratios": [1, 0.05, 1, 0.05]}
)
elif dichro:
fig = plt.figure(
num="Plot_2d - dichro",
clear=True,
figsize=(20, 4),
constrained_layout=True,
)
ax = fig.subplots(
ncols=8,
gridspec_kw={"width_ratios": [1, 0.05, 1, 0.05, 1, 0.05, 1, 0.05]},
)
else:
fig = plt.figure(
num="Plot_2d",
clear=True,
figsize=(5.5, 4),
constrained_layout=True,
)
ax = fig.subplots(ncols=2, gridspec_kw={"width_ratios": [1, 0.05]})
cmap = plt.get_cmap("rainbow")
datax = np.multiply(datax, direction[0])
datay = np.multiply(datay, direction[1])
datazm = np.subtract(dataz, dataz2) if plot2 else dataz
if scale is None:
scale = (np.nanpercentile(datazm, 1), np.nanpercentile(datazm, 99))
vmin = float(scale[0])
vmax = float(scale[1])
z_label = detector
x_label = positioner
if isinstance(var_series, list):
y_label = " ".join(map(str, var_series))
else:
y_label = var_series
nlabel = ""
if (
scan_info["plan_name"] == "mesh"
or scan_info["plan_name"] == "dichromesh"
or scan_info["plan_name"] == "hklmesh"
or scan_info["plan_name"] == "grid_scan"
or scan_info["plan_name"] == "rel_grid_scan"
):
nlabel = nlabel + (", #{}".format(scan_series[0]))
else:
for series in range(1, len(scan_series), 3):
start = scan_series[series - 1]
stop = scan_series[series]
nlabel = nlabel + (", #{}-{}".format(start, stop))
c = ax[0].pcolormesh(
datax,
datay,
datazm,
vmin=vmin,
vmax=vmax,
cmap=cmap,
shading="auto",
)
if plot2:
datazp = np.add(dataz2, dataz)
if scale2 is None:
scale2 = (
np.nanpercentile(datazp, 1),
np.nanpercentile(datazp, 99),
)
vmin = float(scale2[0])
vmax = float(scale2[1])
c1 = ax[2].pcolormesh(
datax,
datay,
datazp,
vmin=vmin,
vmax=vmax,
cmap=cmap,
shading="auto",
)
plt.colorbar(c1, cax=ax[3])
ax[2].set_xlabel(x_label)
ax[2].set_ylabel(y_label)
nlabel2 = (
f", #{scan_series[0]}+#{plot2}"
if (isinstance(scan_series, list) and isinstance(plot2, int))
else f", #{scan_series}+#{plot2}"
)
nlabel = (
f", #{scan_series[0]}-#{plot2}"
if (isinstance(scan_series, list) and isinstance(plot2, int))
else f", #{scan_series}-#{plot2}"
)
if dichro:
scale = (np.nanpercentile(dataz2, 1), np.nanpercentile(dataz2, 99))
vmin = float(scale[0])
vmax = float(scale[1])
c1 = ax[2].pcolormesh(
datax,
datay,
dataz2,
vmin=vmin,
vmax=vmax,
cmap=cmap,
shading="auto",
)
plt.colorbar(c1, cax=ax[3])
datazm = np.subtract(dataz, dataz2)
scale = (np.nanpercentile(datazm, 1), np.nanpercentile(datazm, 99))
vmin = float(scale[0])
vmax = float(scale[1])
c2 = ax[4].pcolormesh(
datax,
datay,
datazm,
vmin=vmin,
vmax=vmax,
cmap=cmap,
shading="auto",
)
plt.colorbar(c2, cax=ax[5])
datazm = np.add(dataz, dataz2)
scale = (np.nanpercentile(datazm, 1), np.nanpercentile(datazm, 99))
vmin = float(scale[0])
vmax = float(scale[1])
c3 = ax[6].pcolormesh(
datax,
datay,
datazm,
vmin=vmin,
vmax=vmax,
cmap=cmap,
shading="auto",
)
plt.colorbar(c3, cax=ax[7])
ax[2].set_xlabel(x_label)
ax[4].set_xlabel(x_label)
ax[6].set_xlabel(x_label)
nlabel = f", #{scan_series[0]}: left circular"
nlabel2 = f", #{scan_series[0]}: right circular"
ax[4].set_title("difference", fontsize=12)
ax[6].set_title("sum", fontsize=12)
if len(z_label + nlabel) < 35:
ax[0].set_title("{}{}".format(z_label, nlabel), fontsize=12)
if plot2 or dichro:
ax[2].set_title("{}{}".format(z_label, nlabel2), fontsize=12)
elif len(z_label + nlabel) < 45:
ax[0].set_title("{}{}".format(z_label, nlabel), fontsize=10)
if plot2 or dichro:
ax[2].set_title("{}{}".format(z_label, nlabel2), fontsize=10)
else:
ax[0].set_title("{}{}".format(z_label, nlabel), fontsize=8)
if plot2 or dichro:
ax[2].set_title("{}{}".format(z_label, nlabel2), fontsize=8)
ax[0].set_xlabel(x_label)
ax[0].set_ylabel(y_label)
plt.colorbar(c, cax=ax[1])
SIZE = 12
plt.rc("font", size=SIZE)
plt.rc("axes", titlesize=SIZE)
plt.rc("axes", labelsize=SIZE)
plt.rc("xtick", labelsize=SIZE)
plt.rc("ytick", labelsize=SIZE)
plt.rc("legend", fontsize=6)
if datax.ndim == 2:
datax = datax[0, :]
if datay.ndim == 2:
datay = datay[:, 0]
data = DataArray(
datazm,
dims=("y_label", "x_label"),
coords={"x_label": datax, "y_label": datay},
)
if plot2:
data2 = DataArray(
datazp,
dims=("y_label", "x_label"),
coords={"x_label": datax, "y_label": datay},
)
if xcut:
num = 4 if plot2 else 2
for pos in xcut:
color = "#{:06x}".format(rng.integers(0, 16777215))
test = data.sel(x_label=pos, method="nearest")
ax[0].vlines(pos, datay[0], datay[-1], color=color)
ax[num].plot(
datay,
test,
color=color,
label=(f"{x_label}={pos}"),
)
if plot2:
color2 = "#{:06x}".format(rng.integers(0, 16777215))
test = data2.sel(x_label=pos, method="nearest")
ax[2].vlines(pos, datay[0], datay[-1], color=color2)
ax[num].plot(
datay,
test,
color=color2,
label=(f"{x_label}={pos} (sum)"),
)
ax[num].set_xlabel(y_label)
ax[num].set_ylabel(z_label)
ax[num].legend(loc=0)
if ycut:
num = 4 if plot2 else 2
if xcut:
num += 1
for pos in ycut:
color = "#{:06x}".format(rng.integers(0, 16777215))
test = data.sel(y_label=pos, method="nearest")
ax[0].hlines(pos, datax[0], datax[-1], color=color)
ax[num].plot(datax, test, color=color, label=(f"{y_label}={pos}"))
if plot2:
color2 = "#{:06x}".format(rng.integers(0, 16777215))
test = data2.sel(y_label=pos, method="nearest")
ax[2].hlines(pos, datax[0], datax[-1], color=color2)
ax[num].plot(
datax, test, color=color2, label=(f"{y_label}={pos} (sum)")
)
ax[num].set_xlabel(x_label)
ax[num].set_ylabel(z_label)
ax[num].legend(loc=0)
if hasattr(__builtins__, "__IPYTHON__"):
plt.get_current_fig_manager().show()
else:
plt.show()
if output:
plt.savefig(output, dpi=600, transparent=True, bbox_inches="tight")
[docs]
def plot_fit(
scan_series,
source=None,
output=False,
var_series=None,
positioner=None,
detector=None,
monitor=None,
normalize=False,
errorbar=True,
xrange=None,
**kwargs,
):
"""
Fit and plot series of scans with chosen functional and returns fit parameters.
Uses lmfit (https://lmfit.github.io/lmfit-py/).
Parameters
----------
scan_series : int
start, stop, step, [start2, stop2, step2, ... ,startn, stopn, stepn]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
output : boolean, optional
Output fit parameters and plot data+fit for each scan.
var_series : string or list
If string
- Varying variable for scan series to be read from scan (detector),
e.g. SampK (sample temperature), optional.
- String starting with #metadata, reads metadata from CSV baseline
If list, information on metadata to be read: List starting with #P, #U
or #Q for motor positions, user values or Q-position, optional:
- #P ['#P', row, element_number], e.g. ['#P', 2, 0]
- #U ['#U', Variable, element_number], e.g. ['#U', 'KepkoI', 1]
- #Q ['#Q', None, element_number], e.g. ['#Q', None, 0]
- #xx like #UA etc.: ['#UA', row, element_number]
If None, successive scans will be numbered starting from zero.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC. If None is passed, it defauts to '4C Theta' motor.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky. If None is passed, it defaults to the APD detector.
monitor : string, optional
Name of the monitor detector. If None is passed, it defaults to the ion
chamber 3.
normalize : boolean, optional
Normalization to selected/default monitor on/off
xrange: list
Set positioner range for fitting
noerror : boolean, optional
Plotting of errorbars on/off
kwargs :
The necessary kwargs are passed to the loading functions defined by the
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query.
model : string, optional
fit model: Gaussian, Lorentzian, PseudoVoigt
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Returns
-------
fit : lmfit ModelResult class
Contains the fit results. See:
https://lmfit.github.io/lmfit-py/model.html#the-modelresult-class
plot: plot of fitted data as function of variable changing between scans
"""
data = fit_series(
scan_series,
source=source,
output=output,
var_series=var_series,
positioner=positioner,
detector=detector,
monitor=monitor,
normalize=normalize,
xrange=xrange,
**kwargs,
)
fig = plt.figure(num="Plot_fit", figsize=(8, 8), clear=True)
ax1 = fig.add_subplot(3, 1, 1)
ax2 = fig.add_subplot(3, 1, 2)
ax3 = fig.add_subplot(3, 1, 3)
if errorbar:
ax1.errorbar(
data["Index"],
data["Intensity"],
yerr=data["Std I"],
xerr=data["Std Index"],
color="orange",
marker="o",
linewidth=2,
markersize=10,
)
ax2.errorbar(
data["Index"],
data["Position"],
yerr=data["Std P"],
xerr=data["Std Index"],
color="blue",
marker="o",
linewidth=2,
markersize=10,
)
ax3.errorbar(
data["Index"],
data["Width"],
yerr=data["Std W"],
xerr=data["Std Index"],
color="green",
marker="o",
linewidth=2,
markersize=10,
)
else:
ax1.plot(
data["Index"],
data["Intensity"],
color="orange",
marker="o",
linewidth=2,
markersize=10,
)
ax2.plot(
data["Index"],
data["Position"],
color="blue",
marker="o",
linewidth=2,
markersize=10,
)
ax3.plot(
data["Index"],
data["Width"],
color="green",
marker="o",
linewidth=2,
markersize=10,
)
ax1.set_ylabel("Intensity")
ax2.set_ylabel("Position")
ax3.set_ylabel("FWHM")
if isinstance(var_series, list):
x_label = " ".join(map(str, var_series))
else:
x_label = var_series
ax3.set_xlabel(x_label)
if hasattr(__builtins__, "__IPYTHON__"):
plt.get_current_fig_manager().show()
else:
plt.show()
return data
[docs]
def load_axes(
scan,
source=None,
positioner=None,
detector=None,
monitor=None,
defaults=None,
read=False,
**kwargs,
):
"""
Load default positioner and detector for plot
Parameters
----------
scan : int, list
single scan
or list [start, stop, step, start2, stop2, step2, ... ,startn, stopn, stepn]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC. If None is passed, it defauts to '4C Theta' motor.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky. If None is passed, it defaults to the APD detector.
monitor : string, optional
Name of the monitor detector. If None is passed, it defaults to the ion
chamber 3.
defaults : string, optional
Default values for positioner and detector
read: boolean, optional
Determines if positioner is read from metadata
Output
-------
Plot
"""
_kwargs = copy.deepcopy(kwargs)
query = _kwargs.pop("query", None)
meta = collect_meta(
[scan], meta_keys=["motors", "hints"], db=source, query=query
)
if not positioner or read:
positioner = meta[scan]["motors"][0]
det = meta[scan]["hints"] if "hints" in meta[scan] else None
if not detector:
detector = (
det[0]["detectors"][0]
if "detectors" in det[0]
else defaults["detector"]
)
if not monitor:
monitor = (
det[0]["monitor"] if "monitor" in det[0] else defaults["monitor"]
)
return positioner, detector, monitor
[docs]
def plot_data(
scan_series,
source=None,
positioner=None,
detector=None,
monitor=None,
fit=False,
normalize=False,
log=False,
deriv=False,
direction=[1, 1],
**kwargs,
):
"""
Plot and fit data.
Parameters
----------
scan_series : list
list [start, stop, step, start2, stop2, step2, ... ,startn, stopn, stepn]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky.
monitor : string, optional
Monitor to be read from this scan, again it needs to be the same name
as in Bluesky.
normalize : boolean, optional
Normalization to selected/default monitor on/off
log: boolean
If True, y-axis plotted in logarithmic scale.
deriv : boolean, optional
calculated derivative
fit : boolean, optional
Fitting of peak using model on/off. In case of deriv=True, derivative is fitted
direction : list, int
multiply axes for inversion: [1,-1] inverts y-axis
kwargs :
model: enumeration
fit model: model = Model.Gaussian (default), Model.Lorentzian, Model.PseudoVoigt
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query, use_db_v1.
Output
-------
Plot
"""
model = kwargs.pop("model", Model.Gaussian)
folder = kwargs.pop("folder", "")
if isinstance(source, (str, SpecDataFile)) and source != "csv":
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
if is_Bluesky_specfile(source):
_defaults = _bluesky_default_cols
else:
_defaults = _spec_default_cols
else:
_defaults = _bluesky_default_cols
fig = plt.figure(
num="Plot_data", figsize=(6, 4), constrained_layout=True, clear=True
)
ax = fig.add_subplot(1, 1, 1)
if deriv:
ax2 = ax.twinx()
ax.clear()
index = 0
if isinstance(scan_series, list):
if len(scan_series) % 3:
raise ValueError(
f"expected 3*n={3 * ((len(scan_series) + 1) // 3)} arguments, got {len(scan_series)}"
)
for series in range(1, len(scan_series), 3):
start = scan_series[series - 1]
stop = scan_series[series]
step = scan_series[series + 1]
print("Intervals: {} to {} with step {}".format(start, stop, step))
for scan in range(start, stop + 1, step):
data = load_table(
scan,
source=source,
**kwargs,
)
if (
not isinstance(source, SpecDataFile)
or isinstance(source, str)
or source == "csv"
):
positioner, detector, monitor = (
load_axes(
scan,
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
defaults=_defaults,
read=False,
**kwargs,
)
if positioner in data.columns
else load_axes(
scan,
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
defaults=_defaults,
read=True,
**kwargs,
)
)
else:
if not positioner:
positioner = _defaults["positioner"]
if not detector:
detector = _defaults["detector"]
if not monitor:
monitor = _defaults["monitor"]
data[positioner] = np.multiply(data[positioner], direction[0])
data[detector] = np.multiply(data[detector], direction[1])
if normalize:
data[detector] = data[detector] / data[monitor]
if log:
data[detector].replace(0, 1, inplace=True)
data[detector] = np.log10(data[detector])
x = data[positioner].to_numpy()
y = data[detector].to_numpy()
if deriv:
y = np.diff(y) / np.diff(x)
x = (x[:-1] + x[1:]) / 2
if fit:
fit_data = fit_peak(x, y, model=model)
text1 = f"{fit_data.params['center'].value:.3f}"
text2 = f"{fit_data.params['fwhm'].value:.3f}"
if deriv and fit:
ax.errorbar(
data[positioner],
data[detector],
color=(f"C{index}"),
marker="o",
linewidth=2,
markersize=8,
label=(f"#{scan}"),
)
ax2.plot(
x,
fit_data.best_fit,
color="black",
linewidth=2,
)
ax2.plot(
x,
y,
color=("#{:06x}".format(rng.integers(0, 16777215))),
marker="o",
linewidth=2,
markersize=8,
label=(f"#{scan}_deriv [{text1}, {text2}]"),
)
elif fit:
ax.errorbar(
data[positioner],
data[detector],
color=(f"C{index}"),
marker="o",
linewidth=2,
markersize=8,
label=(f"#{scan} [{text1}, {text2}]"),
)
ax.plot(
x,
fit_data.best_fit,
color="black",
linewidth=2,
)
elif deriv:
ax.errorbar(
data[positioner],
data[detector],
color=(f"C{index}"),
marker="o",
linewidth=2,
markersize=8,
label=(f"#{scan}"),
)
ax2.plot(
x,
y,
color=("#{:06x}".format(rng.integers(0, 16777215))),
marker="o",
linewidth=2,
markersize=8,
label=(f"#{scan}_deriv"),
)
else:
ax.errorbar(
data[positioner],
data[detector],
color=(f"C{index}"),
marker="o",
linewidth=2,
markersize=8,
label=(f"#{scan}"),
)
index += 1
else:
raise ValueError(f"expected list got '{scan_series}'")
ax.set_xlabel(positioner)
ax.set_ylabel(detector)
ax.legend(loc=0)
if deriv:
ax2.legend(loc=4)
if hasattr(__builtins__, "__IPYTHON__"):
plt.get_current_fig_manager().show()
else:
plt.show()
[docs]
def dbplot(
scan,
source=None,
positioner=None,
detector=None,
monitor=None,
normalize=False,
fit=False,
deriv=False,
direction=[1, 1],
**kwargs,
):
"""
Plot and fit data.
Parameters
----------
scan_series : int, list
single scan
or list [start, stop, step, start2, stop2, step2, ... ,startn, stopn, stepn]
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
positioner : string, optional
Name of the positioner, this needs to be the same as defined in
Bluesky or SPEC. If None is passed, it defauts to '4C Theta' motor.
detector : string, optional
Detector to be read from this scan, again it needs to be the same name
as in Bluesky. If None is passed, it defaults to the APD detector.
monitor : string, optional
Name of the monitor detector. If None is passed, it defaults to the ion
chamber 3.
normalize : boolean, optional
Normalization to selected/default monitor on/off
deriv : boolean, optional
calculated derivative
fit : boolean, optional
Fitting of peak using model on/off. In case of deriv=True, derivative is fitted
direction : list, int
multiply axes for inversion: [1,-1] inverts y-axis
kwargs :
model : string, optional
- fit model: Gaussian, Lorentzian, PseudoVoigt
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query, use_db_v1.
Output
-------
Plot
"""
if isinstance(scan, int):
scan_series = [scan, scan, 1]
elif isinstance(scan, list):
scan_series = scan
else:
raise ValueError(f"expected int or list got '{scan}'")
plot_data(
scan_series=scan_series,
source=source,
positioner=positioner,
detector=detector,
monitor=monitor,
fit=fit,
normalize=normalize,
deriv=deriv,
direction=direction,
**kwargs,
)