Source code for polartools.manage_database

"""
Functions to import and export Bluesky data.

.. autosummary::
   ~to_databroker
   ~to_csv_json
   ~from_databroker_inplace
   ~remove_catalog
"""

# Copyright (c) 2021, UChicago Argonne, LLC.
# See LICENSE file for details.

from databroker_pack import (
    export_catalog,
    write_documents_manifest,
    write_msgpack_catalog_file,
    unpack_inplace,
    copy_external_files,
    write_external_files_manifest,
)

from databroker import catalog_search_path
from .load_data import db_query
from os import makedirs, remove
from os.path import exists, join
from itertools import tee
from suitcase.utils import MultiFileManager
from suitcase.csv import export as csv_export
from suitcase.json_metadata import export as json_export
from warnings import warn
from pathlib import Path


[docs] def to_databroker(db, folder, query=None, external=False): """ Exports databroker database into msgpack files. WARNING: While you can pass a query dictionary here, it is advised to run the query and check the results before running this function as you may inadvertely export a very large number of scans. See :func:`polartools.load_data.db_query`. This is a narrow usage of the `databroker-pack` package_. Note that this package includes a convenient command line tool. .. _package: https://blueskyproject.io/databroker-pack/index.html Parameters ---------- db : Databroker database. folder : str Destination directory. query : dict, optional Search parameters to select a subsection of `db`. See :func:`polartools.load_data.db_query` for more details. Notes ------ - The scans are saved in msgpack files placed in the `folder/documents` \ folder. - `catalog.yml` and `documents_manifest.txt` are located in `folder`. See also -------- :func:`polartools.load_data.db_query` :func:`databroker-pack.export_catalog` :func:`databroker-pack.write_documents_manifest` :func:`databroker-pack.write_msgpack_catalog_file` """ results = db_query(db, query) if query else db.v2 makedirs(folder, exist_ok=True) manager = MultiFileManager(folder) artifacts, external_files, _, _ = export_catalog(results, manager) write_documents_manifest(manager, folder, artifacts["all"]) root_map = {} if external: target_directory = Path(folder, "external_files") for (_, root, unique_id), files in external_files.items(): new_root, new_files, _ = copy_external_files( target_directory, root, unique_id, files ) # copying_failures.extend(copying_failures_) # The root_map value will be the relative path to # the data within the pack directory. relative_root = new_root.relative_to(folder) root_map.update({unique_id: relative_root}) rel_paths = [Path(f).relative_to(folder) for f in new_files] write_external_files_manifest(manager, unique_id, rel_paths) write_msgpack_catalog_file( manager, folder, ["./documents/*.msgpack"], root_map )
[docs] def to_csv_json( db, folder, query=None, fname_format="scan_{}_", overwrite=False, max_attempts=100, ): """ Exports scans into .csv and .json files. The scans will be labeled by their `scan_id` metadata. If two or more scans have the same `scan_id`, it will write the new scan with a `-number` suffix where number will be the first available integer starting with 2. WARNING: While you can pass a query dictionary here, it is advised to run the query and check the results before running this function as you may inadvertely export a very large number of scans. See :func:`polartools.load_data.db_query`. Parameters ---------- db : Databroker database. folder : str Destination directory. query : dict, optional Search parameters to select a subsection of `db`. See :func:`polartools.load_data.db_query` for more details. fname_format : str, optional Format of the string to be used for the file names. Note that one has to be able to add the scan numbers into this string by doing: `fname_format.format(scan_number)`. overwrite : bool, optional Flag to determine if an existing folder should be overwritten. max_attempts : int, optional Maximum number of times that a new suffix will be added to the file name. Once it reaches this maximum, it will overwrite the last file. Notes ----- - Each scan has one json "-metadata.json" file, plus ome csv file for \ each data stream, for instance "-primary.csv". See also -------- :func:`polartools.load_data.db_query` :func:`suitcase.csv.export` :func:`suitcase.json_metadata.export` """ def my_exporter(docs, directory, file_prefix): docs1, docs2 = tee(docs, 2) csv_export(docs1, directory, file_prefix) json_export(docs2, directory, file_prefix) def next_available_fname(folder, fname, max_attempts): new_fname = fname base = join(folder, fname) for i in range(2, max_attempts + 3): if exists(base + "baseline.csv"): new_fname = fname + f"{i}-" base = join(folder, new_fname) else: return new_fname warn( f"Reached maximum number of attempts ({max_attempts})" f', the files with "{new_fname}" prefix will be overwritten.' ) return new_fname if not overwrite and exists(folder): raise FileExistsError( f"{folder} already exists. Either select " "`overwrite=True` or enter a another folder." ) results = db_query(db, query) if query else db.v2 for uid in list(results): scanno = results[uid].metadata["start"]["scan_id"] fname = next_available_fname( folder, fname_format.format(scanno), max_attempts ) print( "Exporting uid #{}, scan_id #{}".format(uid[:8], scanno), end="... " ) my_exporter(results[uid].documents(fill="yes"), folder, fname) print("Done!")
[docs] def from_databroker_inplace(folder, name, catalog, merge=False): """ Load the exported databroker database. This is a narrow usage of the `databroker-pack` package_. Note that this package includes a convenient command line tool. .. _package: https://blueskyproject.io/databroker-pack/index.html Parameters ---------- folder : str Folder with files exported by :func:`polartools.manage_database.to_databroker`. name : str Unique name that will be set in the databroker catalog. merge : bool, optional Flag to decide if this data will be merged into an existing catalog. Example :: from databroker import catalog from polartools.manage_database import from_databroker_inplace from_databroker_inplace('folder/to/files', 'my_data') db = catalog['my_data'] """ config_path = unpack_inplace(folder, name, merge=merge) catalog.force_reload() print(f"Placed configuration file at {config_path!s}")
[docs] def remove_catalog(name, catalog=None): """ Removes a catalog created by `to_databroker`. Parameters ---------- name : str Catalog name Notes ----- - This will not remove the data files, only the catalog, which will not be\ discoverable using `databroker.catalog`. - It assumes that the catalog was created by `to_databroker` or \ `databroker_pack`. The actual name of the file is \ 'databroker_unpack_NAME.yml'. """ found = False for path in catalog_search_path(): filepath = join(path, f"databroker_unpack_{name}.yml") if exists(filepath): remove(filepath) if catalog is not None: catalog.force_reload() print(f"The {name} catalog was removed.") found = True if not found: print(f"The catalog {name} was not found.")