"""
Base functions to load data from various sources.
.. autosummary::
~load_spec
~load_csv
~load_hdf5_data
~hdf5_to_dataframe
~load_hdf5_master
~load_databroker
~load_table
~is_Bluesky_specfile
~db_query
~show_meta
~collect_meta
~lookup_position
"""
# Copyright (c) 2020-2021, UChicago Argonne, LLC.
# See LICENSE file for details.
from pandas import read_csv, DataFrame
from os.path import join
from spec2nexus.spec import SpecDataFile
from warnings import warn
import copy
from datetime import datetime
from collections import OrderedDict
from pyRestTable import Table
from h5py import File
from apstools.utils import getDatabase
from tiled.client import from_profile
from tiled.profiles import ProfileNotFound
from .area_detector_handlers import (
EigerHandler,
LambdaHDF5Handler,
SPEHandler,
)
# TODO: This should be just temp fix
LambdaHDF5Handler.specs = {"AD_HDF5_lambda"} | LambdaHDF5Handler.specs
HDF_DEFAULT_FNAME_FORMAT = "scan_{:06d}_master.hdf"
BLUESKY_DEFAULT_LOCATION = "entry/instrument/bluesky/streams/primary"
def _is_tiled(obj):
"""Return True if obj is a tiled catalog client, False if databroker."""
return "tiled" in type(obj).__module__
[docs]
def load_catalog(name=None, query=None, handlers=None, tiled_path="/raw"):
"""
Loads a databroker catalog and register data handlers.
Parameters
----------
name : str, optional
Name of the database. Defaults to 4-ID-D name.
query : dict, optional
Dictionary with search parameters for the database.
handlers : dict, optional
Dictionary organized as {handler_name: handler_class}. If None,
defaults to handlers used at 4-ID-D.
Returns
-------
cat : databroker catalog
Catalog after running the query, and registering the handler.
"""
try:
cat = getDatabase(catalog_name=name)
except KeyError:
cat = from_profile(name)[tiled_path]
except ProfileNotFound:
raise ValueError(f"No database {name} was not found!")
if query is not None:
cat = db_query(cat, query)
if not _is_tiled(cat):
if handlers is None:
handlers = dict(
AD_HDF5_Lambda250k_APSPolar=LambdaHDF5Handler,
AD_HDF5_lambda=LambdaHDF5Handler, # Temporary fix
AD_EIGER_APSPolar=EigerHandler,
AD_SPE_APSPolar=SPEHandler,
)
for hname, handler in handlers.items():
cat.register_handler(hname, handler, overwrite=True)
# For tiled: handler registration is server-side.
return cat
[docs]
def load_spec(scan_id, spec_file, folder=""):
"""
Load data from spec file.
If `spec_file` is the file name, it will load the spec file internally
which is time consuming.
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved.
spec_file : string or spec2nexus.spec.SpecDataFile
Either the spec file name or a SpecDataFile instance.
folder : string, optional
Folder where spec file is located.
Returns
-------
data : pandas.DataFrame
Table with the data from scan.
See also
--------
:func:`spec2nexus.spec.SpecDataFile`
"""
if isinstance(spec_file, str):
path = join(folder, spec_file)
spec_file = SpecDataFile(path)
return DataFrame(spec_file.getScan(scan_id).data)
[docs]
def load_csv(scan_id, folder="", name_format="scan_{}_primary.csv"):
"""
Load data from the 'primary' stream from exported csv files.
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved.
folder : string, optional
Folder where csv files are located.
name_format : string, optional
General format of file name. The correct name must be
retrievable through: `file_name_format.format(scan_id)`
Returns
-------
data : pandas.DataFrame
Table with the data from the primary stream.
See also
--------
:func:`pandas.read_csv`
"""
return read_csv(join(folder, name_format.format(scan_id)))
[docs]
def load_databroker(
scan_id, db=None, stream="primary", query=None, use_db_v1=True
):
"""
Load data of the first scan with the provided scan_id.
Currently defaults to databroker.v1 because it is faster. See issue #28.
For further details, refer to the `databroker` `documentation`_.
.. _documentation: https://blueskyproject.io/databroker/
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved
db :
`databroker` database
stream : string, optional
Selects the stream from which data will be loaded.
query : dict, optional
Dictionary with search parameters for the database.
use_db_v1 : bool, optional
Chooses databroker API version between 'v1' or 'v2', defaults to 'v1'.
Returns
-------
data : pandas.DataFrame
Table with the data from the primary stream.
"""
if db is None or not _is_tiled(db):
db = getDatabase(db=db)
_db = db_query(db, query) if query else db
if use_db_v1:
if stream in _db.v1[scan_id].stream_names:
return _db.v1[scan_id].table(stream_name=stream)
else:
raise ValueError(
f"The stream {stream} does not exist in scan {scan_id}."
)
else:
try:
return getattr(_db.v2[scan_id], stream).read().to_dataframe()
except AttributeError:
raise ValueError(
f"The stream {stream} does not exist in scan {scan_id}."
)
else:
_db = db_query(db, query) if query else db
run = _db[scan_id]
if stream not in run:
raise ValueError(
f"The stream {stream} does not exist in scan {scan_id}."
)
return run[stream].read()
[docs]
def hdf5_to_dataframe(data):
"""
Converts h5py object into dataframe
WARNING: it assumes a very specific format. For each item in `data` it will
get the data in `data["key/value"]
Parameters
----------
data : h5py object
Object with the data. Each key needs to have a "value" subkey.
Returns
-------
data : pandas.DataFrame
Table with the data.
See also
--------
:func:`polartools.load_data.load_hdf5_master`
:func:`h5py.File`
"""
output = {}
for key in data.keys():
value = data[key].get("value", None)
if value is not None:
output[key] = value[()]
return DataFrame(output)
[docs]
def load_hdf5_master(scan, folder, fname_format=HDF_DEFAULT_FNAME_FORMAT):
"""
Wrapper that loads HDF files using `h5py`.
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved.
folder : string, optional
Folder where the master files are located.
fname_format : string, optional
General format of file name. The correct name must be retrievable
through: `file_name_format.format(scan_id)`
Returns
-------
data : h5py.File
Loaded HDF file.
See also
--------
:func:`h5py.File`
"""
return File(join(folder, fname_format.format(scan)))
[docs]
def load_hdf5_data(
scan,
folder,
fname_format=HDF_DEFAULT_FNAME_FORMAT,
h5_location=BLUESKY_DEFAULT_LOCATION,
):
"""
Wrapper that loads HDF files using `h5py`.
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved.
folder : string, optional
Folder where the master files are located.
fname_format : string, optional
General format of file name. The correct name must be retrievable
through: `file_name_format.format(scan_id)`
h5_location : string, optional
Location of the Bluesky data stream.
Returns
-------
data : pandas.DataFrame
Table with the data from the primary stream.
See also
--------
:func:`polartools.load_data.load_hdf5_master`
:func:`polartools.load_data.hdf5_to_dataframe`
:func:`h5py.File`
"""
return hdf5_to_dataframe(
load_hdf5_master(scan, folder, fname_format=fname_format)[h5_location]
)
[docs]
def load_table(scan, source=None, **kwargs):
"""
Automated scan table loader.
The automation is based on the source argument.
- if source == 'csv' -> uses `load_csv`.
- else if source is a string or nexus2spec.spec.SpecDataFile -> uses\
`load_spec`.
- else -> uses `load_databroker`.
Parameters
----------
scan : int
Scan_id our uid. If scan_id is passed, it will load the last scan with
that scan_id. See kwargs for search options.
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
kwargs :
The necessary kwargs are passed to the loading functions defined by the
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query, use_db_v1.
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Returns
-------
table : pandas.DataFrame
Table with the scan data.
See also
--------
:func:`polartools.load_data.load_databroker`
:func:`polartools.load_data.load_csv`
:func:`polartools.load_data.load_spec`
"""
_kwargs = copy.deepcopy(kwargs)
folder = _kwargs.pop("folder", "")
if source == "csv":
name_format = kwargs.pop("name_format", "scan_{}_primary.csv")
table = load_csv(scan, folder=folder, name_format=name_format)
elif source in ("hdf5", "h5", "hdf"):
fname_format = kwargs.pop("fname_format", HDF_DEFAULT_FNAME_FORMAT)
h5_location = kwargs.pop("h5_location", BLUESKY_DEFAULT_LOCATION)
table = load_hdf5_data(
scan, folder, fname_format=fname_format, h5_location=h5_location
)
elif isinstance(source, str) or isinstance(source, SpecDataFile):
table = load_spec(scan, source, folder=folder)
else:
stream = _kwargs.pop("stream", "primary")
query = _kwargs.pop("query", None)
use_db_v1 = _kwargs.pop("use_db_v1", True)
table = load_databroker(
scan, db=source, stream=stream, query=query, use_db_v1=use_db_v1
)
if len(_kwargs) != 0:
warn(f"The following kwargs were not used! {list(_kwargs.keys())}")
return table
[docs]
def is_Bluesky_specfile(source, folder=""):
"""
Check if the specfile was created by Bluesky.
It looks for a "Bluesky" comment in the file header.
Parameters
----------
source : string or spec2nexus.spec.SpecDataFile
Either the spec file name or a SpecDataFile instance.
folder : string, optional
Folder where spec file is located.
Returns
-------
y : bool
True if spec_file was generated by Bluesky, False otherwise.
See also
--------
:func:`spec2nexus.spec.SpecDataFile`
"""
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
if len(source.headers[0].comments):
return source.headers[0].comments[0].startswith("Bluesky")
else:
return False
[docs]
def db_query(db, query):
"""
Searches the databroker v2 database.
Parameters
----------
db :
`databroker` database.
query: dict
Search parameters.
Returns
-------
_db :
Subset of db that satisfy the search parameters. Note that it has the
same format as db.
See also
--------
:func:`databroker.catalog.search`
"""
since = query.pop("since", None)
until = query.pop("until", None)
if _is_tiled(db):
from tiled.queries import Key, TimeRange
if since or until:
if not since:
since = "2010"
if not until:
until = "2050"
_db = db.search(TimeRange(since=since, until=until))
else:
_db = db
for key, value in query.items():
_db = _db.search(Key(key) == value)
else:
from databroker.queries import TimeRange
if since or until:
if not since:
since = "2010"
if not until:
until = "2050"
_db = db.v2.search(TimeRange(since=since, until=until))
else:
_db = db
if len(query) != 0:
_db = _db.v2.search(query)
return _db
def _flatten_list(inp):
"""Only handles lists to second level"""
output = []
for item in inp:
if isinstance(item, list):
output += [i for i in item]
else:
output.append(item)
return output
[docs]
def lookup_position(db, scan, search_string="", query=None):
"""
Lookup positioner values in past scans.
Parameters
----------
db : databroker database
Searcheable database
scan : integer
Scan numbers or uids.
search_string : string
Full or part of positioner name.
query: dict
Search parameters.
Returns
-------
output: list
"""
db_range = db_query(db, query=query) if query else db
baseline = load_databroker(scan, db_range, "baseline", use_db_v1=True)
if len(baseline["time"]) == 2:
date1 = baseline["time"][1].strftime("%m/%d/%y %H:%M:%S")
date2 = baseline["time"][2].strftime("%m/%d/%y %H:%M:%S")
print("=".center(100, "="))
print(f"{'Positioner':>50}{date1:>25}{date2:>25}")
print("-".center(100, "-"))
for key in baseline.keys():
if search_string in key:
if isinstance(baseline[key][1], list):
print(f"{key:>50}{baseline[key][1]}{baseline[key][2]}")
else:
print(
f"{key:>50}{baseline[key][1]:>25}{baseline[key][2]:>25}"
)
else:
date1 = baseline["time"][1].strftime("%m/%d/%y %H:%M:%S")
print("=".center(100, "="))
print(f"{'Positioner':>50}{date1:>25}")
print("-".center(100, "-"))
for key in baseline.keys():
if search_string in key:
if isinstance(baseline[key][1], list):
print(f"{key:>50}{baseline[key][1]}")
else:
print(f"{key:>50}{baseline[key][1]:>25}")
print("-".center(100, "-"))