"""
This module provides a flexible, schema-configured pull-to-push adaptor to find files generated by a registered SCU and push them to a bucket. It supports some logical organization of raw files via "elements", though it can be brute-forced with a single element and a sufficiently generic rule.
Functions
---------
:func:`get_group_name`
:func:`complain_about_file_count`
:func:`run_discovery`
:func:`send_files_from_manifest`
Classes
-------
:class:`DiscoveryRule`
:class:`FileSchemaParser`
"""
# Supports some logical organization of raw files via "elements", though you can brute force it with a single element and a sufficiently generic rule
import datetime
import fnmatch
import importlib.resources as resources
import json
import logging
import os
from typing import Optional
from uuid import uuid4
import click
from google.cloud import storage
from zcloud import get_scu_manifest_handler, load_raw_schema,SCUManifestSchemaHandler
logger = logging.getLogger(__name__)
[docs]
def check_depth(file:str, max_depth:Optional[int]=None, min_depth:Optional[int]=None) -> bool:
our_depth = file.count(os.sep)
if max_depth is not None:
if our_depth > max_depth:
return False
if min_depth is not None:
if our_depth < min_depth:
return False
return True
[docs]
def complain_about_file_count(
output_files: set,
rule_name: str,
num_files: Optional[int] = None,
required: Optional[bool] = None,
) -> None:
"""
Raise a ValueError if the number of files found does not match the expected number
The content of the error describes whether or not the rule requires the files to be found, and how many files were found.
Parameters
----------
output_files : set
The set of files found by the rule.
rule_name : str
The name of the rule.
num_files : int, optional
The number of files expected to be found. If not provided, the error message will say "at least one".
required : bool, optional
Whether the rule requires the files to be found. If not provided, the error message will say the rule is not required.
Raises
------
ValueError
If the number of files found does not match the expected number.
"""
if len(output_files) == 0 or (
not (num_files is None) and len(output_files) != num_files
):
if required:
if num_files is None:
num_files_val = "at least one"
else:
num_files_val = str(num_files)
raise ValueError(
f"Found {len(output_files)} files for rule {rule_name}, expected {num_files_val}"
)
else:
if num_files is None:
logger.warning(
f"Found {len(output_files)} files for rule {rule_name}, but it is not marked as required"
)
else:
logger.warning(
f"Found {len(output_files)} files for rule {rule_name}, expected {num_files}, but it is not marked as required in the schema"
)
logger.warning(
"num files is meant to be used with required=True, this warning is the only thing that will be raised when you use num_files without required=True in the discovery schema"
)
[docs]
def get_group_name(filename, group_by, pattern=None, extension=None) -> Optional[str]:
"""
Get the "group_name" for a file based on the "group_by" rules.
This is used by the "DiscoveryRule" class for the "paired_group" type of rule.
Parameters
----------
filename : str
The filename to get the group name for.
group_by : str
The group_by rule to use. Currently, only "basename" is supported.
pattern : str, optional
The pattern to use for the group name. If not provided, the implicit pattern is ``*.{ext}``.
extension : str, optional
The extension to use for the group name. If not provided, the extension is not used.
Returns
-------
str
The group name for the file.
"""
if group_by == "basename":
base_path = os.path.basename(filename)
if extension:
if not extension.startswith("."):
extension = f".{extension}"
return "".join(base_path.rsplit(extension, 1)[0])
return
# if group_by prefix,suffix, etc think about implementation here re: pattern parsing
[docs]
def read_manifest(bucket_name, manifest_path):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(manifest_path)
manifest_content = blob.download_as_string()
manifest = json.loads(manifest_content)
return manifest
[docs]
class DiscoveryRule:
"""
Configurable file discovery rule. This class is used to find files on a filesystem based on instructions in a meta schema.
.. note::
This class is not optimized for performance in searching through large numbers of files or convoluted directory structures.
It has O(n) complexity for each rule, where n is the number of files it is handed.
One future improvement could be to have some "rule-aware" optimization at a higher level, or have rules with a lot of overlap run inside the same loop.
Or ... just don't write 10,000 files in a single trajectory, how about that?
The rule configuration should have the following keys:
- **name**: A unique name for the rule.
- **type**: The type of rule. Currently, only 'paired_group', 'pattern', and 'all_files' are supported.
- **description** (optional): A description of the rule.
- **criteria**: A dictionary containing the criteria for the rule. The dictionary should have the following keys:
- **group_by**: A string indicating how to group files. Currently, only 'basename' is supported. Group_by is only relevant for paired_group rules, ignored otherwise.
- **extensions**: A list of strings indicating the file extensions to examine for pairing. Extensions may include the leading period. Extensions are only relevant for 'paired_group' rules, ignored otherwise.
- **max_depth**: An integer indicating the maximum depth to search for files.
- **min_depth**: An integer indicating the minimum depth to search for files.
- **fail_on_mismatch**: A boolean indicating whether to raise an error if a pair is missing. If False or omitted, the rule will ignore files that do not have all partners.
- **pattern**: A string containing a glob pattern to match files. The pattern should contain a placeholder ``(.{ext})`` that will be replaced with the file extension for paired group rules. This replacement is also supported for single rules.
- **required**: A boolean indicating whether finding files is required for this rule. If False or omitted, the rule will not raise an error if no files are found.
- **num_files**: An integer indicating the number of files to find. If the number of files found does not match this number, an error will be raised. Paired_group rules will find this number of groups, while single rules will find this number of files. Incomplete groups are not counted towards this number, which can create unwanted behavior if the fail_on_mismatch and pattern criteria are abused.
Parameters
----------
rule : dict
A dictionary containing the rule configuration.
"""
def __init__(self, rule,):
self.name = rule.get("name")
self.type = rule.get("type")
self.criteria = rule.get("criteria")
# TODO there's some code duplication in the apply functions which could be broken out into helper functions.
[docs]
def apply(self, file_list,) -> list[str]:
if self.type == "paired_group":
return self.apply_group_search(file_list,)
elif self.type == "pattern":
return self.apply_pattern(file_list,)
elif self.type == "all_files":
return self.apply_all_files(file_list,)
return []
[docs]
def apply_group_search(self, file_list: list[str],) -> list[str]:
"""
Apply a group search rule to a list of files.
Group search rules are used to find groups of files that match a certain pattern, but differ in extension or other patterns.
For example, if you have file1.pdb and file1.trb that are intrinsically related, you can use a group search rule to find both files, and even include rules that issue warnings or fail based on presence or absence of the group.
Detailed supported criteria are in the docs for the parent class.
Parameters
----------
file_list : list
List of files to search through.
Returns
-------
list
List of files that match the group search rule.
"""
# parse out rules from the criteria
group_by = self.criteria.get(
"group_by"
) # group by this part of the file, string, logical
extensions = self.criteria.get(
"extensions"
) # list of extensions to examine for pairing
max_depth = self.criteria.get("max_depth")
min_depth = self.criteria.get("min_depth")
fail_on_mismatch = self.criteria.get(
"fail_on_mismatch"
) # raise an error if a pair is missing. If False, just ignores files without all partners
if fail_on_mismatch is None:
fail_on_mismatch = False
pattern = self.criteria.get(
"pattern"
) # glob pattern to match files, assumes {ext} is the extension
if pattern is None:
logger.warning("no pattern provided for group search, using *.{ext}")
pattern = "*.{ext}"
required = self.criteria.get("required")
if required is None:
required = False
num_files = self.criteria.get("num_files")
# substitute .{ext} in the pattern with each of the extensions. Do check that the extensions have a leading "."
ext_sanitized = [
ext if ext.startswith(".") else f".{ext}" for ext in extensions
]
# for each extension, substitute it in for .{ext}, if relevant
ext_subs_patterns = [
ext.join(pattern.rsplit(".{ext}", 1))
for ext in ext_sanitized
if pattern.endswith(".{ext}")
]
paired_dict = {}
for file in file_list:
logger.debug(f"Checking file {file}")
# check file organization rules
# if max_depth is not None:
# if file.count(os.sep) > max_depth:
# continue
# if min_depth is not None:
# if file.count(os.sep) < min_depth:
# continue
if not check_depth(file, max_depth, min_depth,):
continue
basename = os.path.basename(file)
normalized_file = os.path.normpath(file)
# check pattern rules
for pattern in ext_subs_patterns:
# support for globbing on the basename, full path, or normalized full path
# normalized is probably not needed, since we also control the file walk in the file_list, but maybe this will save a future dev some grief
# or cause grief...
basename_matches = fnmatch.fnmatch(basename, pattern)
normalized_full_path_matches = fnmatch.fnmatch(normalized_file, pattern)
full_path_matches = fnmatch.fnmatch(file, pattern)
if (
basename_matches
or full_path_matches
or normalized_full_path_matches
): # if the file matches the pattern, path globs
logger.debug(f"File {file} matches pattern {pattern}")
ext = os.path.splitext(file)[1]
group_name = get_group_name(file, group_by, extension=ext)
if group_name not in paired_dict:
paired_dict[group_name] = {"exts": [], "files": []}
paired_dict[group_name]["exts"].append(ext)
paired_dict[group_name]["files"].append(file)
logger.debug(f"Adding group {group_name}")
logger.debug(f"Extensions: {paired_dict[group_name]['exts']}")
logger.debug(f"Files: {paired_dict[group_name]['files']}")
else:
paired_dict[group_name]["exts"].append(ext)
paired_dict[group_name]["files"].append(file)
logger.debug(f"Adding file {file} to group {group_name}")
# check file quantity and pairing rules
output_files = set()
n_groups = 0
old_paired_dict = paired_dict.copy()
for group_name in old_paired_dict.keys():
if len(paired_dict[group_name]["exts"]) != len(extensions):
if fail_on_mismatch:
raise ValueError(
f"Group {group_name} does not have all partners"
)
else: # dumps groups without paired partners
logger.warning(
f"Group {group_name} does not have all partners"
)
logger.warning("Ignoring group")
del paired_dict[group_name]
else:
output_files.update(paired_dict[group_name]["files"])
n_groups += 1
if not (num_files is None) and n_groups > num_files:
raise ValueError(
f"Found more than {num_files} groups for rule {self.name}"
)
complain_about_file_count(output_files, self.name, num_files, required)
return list(output_files)
[docs]
def apply_pattern(self, file_list: list[str]) -> list[str]:
"""
Apply a pattern search rule to a list of files.
Pattern search rules are used to find individual files that match a certain pattern.
Parameters
----------
file_list : list
List of files to search through.
Returns
-------
list
List of files that match the pattern search rule.
"""
logger.debug(f"Applying pattern search rule {self.name}")
# parse out rules from the criteria
extensions = self.criteria.get(
"extensions"
) # list of extensions to examine for pairing
if extensions is not None:
if len(extensions) > 1:
raise ValueError(
"Single search rule cannot have more than one extension specified"
)
extension = extensions[0]
if not extension.startswith("."):
extension = f".{extension}"
else:
extension = None
logger.debug(f"Extensions: {extensions}")
max_depth = self.criteria.get("max_depth")
min_depth = self.criteria.get("min_depth")
fail_on_mismatch = self.criteria.get(
"fail_on_mismatch"
)
if fail_on_mismatch is None:
fail_on_mismatch = False
pattern = self.criteria.get(
"pattern"
) # glob pattern to match files, assumes {ext} is the extension
logger.debug(f"Pattern: {pattern}")
if pattern is None:
if extension is None:
raise ValueError(
"No pattern or extension provided for single search rule, How am I meant to find the file?"
)
else:
pattern = "*.{ext}"
subbed_pattern = (
pattern.rsplit(".{ext}", 1)[0] + extension if extension and pattern.endswith(".{ext}") else pattern
)
logger.debug(f"Subbed pattern: {subbed_pattern}")
required = self.criteria.get("required")
if required is None:
required = False
num_files = self.criteria.get("num_files")
output_files = set()
for file in file_list:
# check file organization rules
# if max_depth is not None:
# if file.count(os.sep) > max_depth:
# continue
# if min_depth is not None:
# if file.count(os.sep) < min_depth:
# continue
if not check_depth(file, max_depth, min_depth,):
continue
basename = os.path.basename(file)
normalized_file = os.path.normpath(file)
# check pattern rules
if (
fnmatch.fnmatch(basename, subbed_pattern)
or fnmatch.fnmatch(normalized_file, subbed_pattern)
or fnmatch.fnmatch(file, subbed_pattern)
):
output_files.add(file)
logger.debug(f"Output files: {output_files}")
complain_about_file_count(output_files, self.name, num_files, required)
return list(output_files)
[docs]
def apply_all_files(self, file_list: list[str],) -> list[str]:
"""
Apply a rule to all files in the file list.
This is a catch-all rule which allows "rule criteria" without requiring patterns or extensions.
Parameters
----------
file_list : list
List of files to search through.
Raises
------
ValueError
can raise a ValueError via complain_about_file_count
Returns
-------
list
List of files that match the rule criteria
"""
max_depth = self.criteria.get("max_depth")
min_depth = self.criteria.get("min_depth")
required = self.criteria.get("required")
if required is None:
required = False
num_files = self.criteria.get("num_files")
output_files = set()
for file in file_list:
# check file organization rules
# if max_depth is not None:
# if file.count(os.sep) > max_depth:
# continue
# if min_depth is not None:
# if file.count(os.sep) < min_depth:
# continue
if not check_depth(file, max_depth, min_depth,):
continue
output_files.add(file)
complain_about_file_count(
output_files, self.name, num_files, required
) # If you remove this, update the docstring
return list(output_files)
[docs]
class FileSchemaParser:
def __init__(
self,
schema: dict,
): # manifest_schema_name:str="scu_manifest",manifest_version:str="1.0" load manifest meta schema to validate our manifest ... this is really more of an integration test than a needed func
self.schema_version = schema.get("schema_version")
self.description = schema.get("description")
self.compute_type = schema.get("compute_type")
self.discovery_rules = {
rule["name"]: DiscoveryRule(rule)
for rule in schema.get("discovery_rules", {}).get("rules", [])
}
self.elements = schema.get("elements")
# raise a schema error here if an element lists a discovery rule that doesn't exist
for element in self.elements:
element_name = element.get("name")
if "discovery_rules" in element:
for rule_name in element["discovery_rules"]:
if rule_name not in self.discovery_rules:
raise ValueError(
f"Element {element_name} lists discovery rule {rule_name} which does not exist in the schema"
)
if "element" in element:
for sub_element_name, sub_element in element["elements"].items():
if "discovery_rules" in sub_element:
for rule_name in sub_element["discovery_rules"]:
if rule_name not in self.discovery_rules:
raise ValueError(
f"Element {element_name}, sub-element: {sub_element_name} lists discovery rule {rule_name} which does not exist in the schema"
)
self._path_cache = {}
[docs]
def discover_files(self, directory: str) -> list:
"""
Discover files for each element in the schema.
Elements can also have "sub-elements" and each element can have any number of discovery rules applied to it.
Element structure is implied to have at least:
.. code-block:: json
{
"name": "element_name",
"description": "A brief description of the element",
"discovery_rules": ["rule1", "rule2"],
// And, optionally, with sub-elements (this can continue recursively, please avoid abusing it):
"elements": [
{
"name": "sub_element_name",
"description": "A brief description of the sub-element",
"discovery_rules": ["rule3", "rule4"],
"elements": []
}
]
// end optional sub-elements
}
Parameters
----------
directory : str
The directory to search for files.
Returns
-------
dict
A dictionary containing the discovered files for each element.
Key: element name, Value: list of discovered files, unordered.
"""
if self.elements is None:
return []
discovered_files = {}
for element in self.elements:
element_name = element.get("name")
discovery_rules = element.get("discovery_rules")
sub_elements = element.get("elements",None)
discovered_files.update(
self._discover_element_files(element_name, discovery_rules, directory,sub_elements=sub_elements)
)
logger.debug(f"Discovered files for element {element_name}:")
element_files = [{"name": element_name, "files": files} for element_name, files in discovered_files.items()]
return element_files
def _discover_element_files(
self,
element_name: str,
discovery_rules: dict,
directory: str,
parent_element: Optional[str] = None,
sub_elements: Optional[dict] = None,
) -> dict:
"""
Discover files for an element and its sub-elements, recursively.
When recursively discovering files for sub-elements, the parent element name is prepended to the sub-element name with a period.
Parameters
----------
element_name : str
The name of the element.
element_content : dict
The content of the element.
directory : str
The directory to search for files.
parent_element : str, optional
The parent element name. If not provided, the element is assumed to be a top-level element.
Returns
-------
dict
A dictionary containing the discovered files for the element and its sub-elements. key: element name, value: list of discovered files, unordered.
sub-elements are listed as "parent_element.sub_element", recursively. (e.g. "grandparent.parent.element")
"""
element_files = {}
element_name_output = (
element_name
if parent_element is None
else f"{parent_element}.{element_name}"
)
if not (discovery_rules is None):
all_files = self._path_cache.get(directory, None)
if all_files is None:
all_files = []
for root, _, files in os.walk(os.path.abspath(directory)):
logger.debug(f"Searching in {root}")
logger.debug(files)
for file in files:
relpath = os.path.relpath(os.path.join(root, file), start=os.path.abspath(directory))
# final_path = os.path.join(directory, relpath) if not(directory ==".") else relpath
all_files.append(relpath)
self._path_cache[directory] = all_files
logger.debug("All files:")
logger.debug(all_files)
current_list = element_files.get(element_name_output, [])
for rule_name in discovery_rules:
rule = self.discovery_rules[rule_name]
current_list.extend([{"name":path} for path in rule.apply(all_files)]) #follow the schema which is flexible enough to allow for more complex annotations
element_files[element_name_output] = current_list
if not (sub_elements is None):
for sub_element in sub_elements:
sub_element_name = sub_element.get("name")
sub_element_discovery_rules = sub_element.get("discovery_rules", None)
sub_sub_elements = sub_element.get("elements",None)
element_files.update(
self._discover_element_files(
sub_element_name,
sub_element_discovery_rules,
directory,
parent_element=element_name_output,
sub_elements=sub_sub_elements,
)
)
return element_files
[docs]
def get_manifest_dict(self, element_files: dict,manifest_schema_handler:Optional[SCUManifestSchemaHandler]=None, **meta) -> dict:
"""
Generates a manifest dictionary containing metadata and file information.
Parameters
----------
element_files : dict
A dictionary where keys are element names and values are lists of file paths.
Returns
-------
dict
A dictionary containing the following keys:
- "schema_version": The schema version.
- "description": A description of the manifest.
- "elements": Elements associated with the manifest.
- "element_files": The input element_files dictionary.
- "uuid": A unique identifier for the manifest.
- "files": A list of normalized file paths with the UUID prepended.
"""
if manifest_schema_handler is None:
manifest_schema_handler = get_scu_manifest_handler()
manifest_dict = {}
# get schema name, version, description etc
manifest_dict["schema_version"] = self.schema_version
manifest_dict[manifest_schema_handler.description] = self.description
manifest_dict[manifest_schema_handler.elements] = self.elements
manifest_dict[manifest_schema_handler.element_files] = element_files
manifest_dict[manifest_schema_handler.meta] = {**meta}
manifest_dict[manifest_schema_handler.meta][manifest_schema_handler.compute_type] = self.compute_type
manifest_dict[manifest_schema_handler.meta][manifest_schema_handler.upload_timestamp] = datetime.datetime.now().isoformat()
# get the files recursively from the element_files dict
file_paths_set = {file[manifest_schema_handler.file_path] for element in element_files for file in element.get("files") }
files_list = [{"name":path} for path in file_paths_set]
# make a uuid for the manifest
manifest_dict[manifest_schema_handler.meta][manifest_schema_handler.upload_uuid] = str(uuid4())
manifest_dict[manifest_schema_handler.files] = files_list
logger.debug(f"Files: {files_list}")
logger.debug(f"files added to manifest")
return manifest_dict
[docs]
def run_discovery(schema_name: str, directory: str,**meta) -> dict:
"""
Discover files in a directory based on a given schema and generate a manifest.
Parameters
----------
schema_name : str
The name of the schema to load.
directory : str
The directory to search for files.
Returns
-------
dict
A dictionary representing the manifest of discovered files.
"""
schema = load_raw_schema(schema_name)
parser = FileSchemaParser(schema)
logger.debug(f"running discovery on {directory}")
element_files = parser.discover_files(directory)
manifest = parser.get_manifest_dict(element_files,**meta)
return manifest
[docs]
def send_files_from_manifest(
bucket_name: str,
project: str,
manifest: dict,
base_dir:str = ".",
):
mh = get_scu_manifest_handler()
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name, user_project=project)
upload_uuid = manifest[mh.meta][mh.upload_uuid]
global_meta = manifest[mh.meta]
# convert arrays to comma separated strings
# hardcast all other values to strings
sanitized_meta = {
key:global_meta[key] if not isinstance(global_meta[key],list) else ",".join(global_meta[key])
for key in global_meta}
for file_obj in manifest[mh.files]:
file = file_obj[mh.file_path]
local_path = os.path.join(base_dir, file)
if not os.path.exists(local_path):
raise FileNotFoundError(f"File {file} not found in {base_dir}")
dest = os.path.join(upload_uuid, file)
blob = bucket.blob(dest)
# add metadata to the blob
blob.metadata = sanitized_meta
blob.upload_from_filename(local_path)
logger.debug(f"File {local_path} uploaded to object: {dest} in {bucket_name}.")
if not (base_dir == "."):
logger.debug("base_dir is not .")
logger.debug(f"Local path: {local_path}")
logger.debug(f"Base dir: {base_dir}")
logger.debug(f"File: {file}")
logger.debug(f"Dest: {dest}")
# upload manifest
manifest_destiny = os.path.join(
upload_uuid, f"{upload_uuid}_scu_manifest.json"
)
manifest_blob = bucket.blob(manifest_destiny)
#add metadata to the manifest:
manifest_blob.metadata = sanitized_meta
manifest_blob.upload_from_string(json.dumps(manifest))
logger.debug(f"Manifest uploaded to object: {manifest_destiny} in {bucket_name}.")
[docs]
def config_from_path(path,scusch:Optional[SCUManifestSchemaHandler]=None):
with open(path, "r") as f:
raw_dict = json.load(f)
return raw_dict
[docs]
def scrape_jira_ticket(ticket_dict):
jira_ticket_id = ticket_dict["id"]
email_address = None
ticket_assignee = ticket_dict["fields"].get("assignee")
if not ticket_assignee is None:
email_address = ticket_assignee.get("emailAddress")
authors = [] if email_address is None else [email_address]
config_dict_compatible = {
"authors": authors,
"jira_ticket_id": jira_ticket_id,
}
return config_dict_compatible
[docs]
def sub_alias(potential_alias):
aliases = {
"dmitriz@ziptx.bio":"dmitriz",
}
return aliases.get(potential_alias,potential_alias)
[docs]
def sub_aliases_and_validate_config(config_dict):
#placeholder, should ping API oracle to validate that meta fields are permissible values
if "authors" in config_dict:
config_dict["authors"] = [sub_alias(author) for author in config_dict["authors"]]
return config_dict
[docs]
def merge_jira_and_config(config_dict,ticket_dict):
jira_scraped = scrape_jira_ticket(ticket_dict)
jira_config_dict = sub_aliases_and_validate_config(jira_scraped)
unify_authors = jira_config_dict.get("authors",[])
unify_authors.extend(config_dict.get("authors",[]))
config_dict["authors"] = unify_authors
if "authors" in jira_config_dict:
jira_config_dict.pop("authors")
falsy_keys_jira = []
falsy_keys_config = []
#drop all falsy keys from the jira config
for key in jira_config_dict.keys():
if not jira_config_dict[key]:
falsy_keys_jira.append(key)
jira_config_dict.pop(key)
for key in config_dict.keys():
if not config_dict[key]:
falsy_keys_config.append(key)
config_dict.pop(key)
#raise error if there are still matched keys between these two
non_null_conflicting_keys = set(jira_config_dict.keys()).intersection(set(config_dict.keys()))
if len(non_null_conflicting_keys) > 0:
logger.debug(f"Non-null conflicting keys: {non_null_conflicting_keys}")
raise ValueError("There are non-null conflicting key/value pairs present in both the Jira ticket and the config file. Please resolve these conflicts")
output_dict = {**config_dict, **jira_config_dict}
#set all falsy keys that were previously in both to None
for key in set(falsy_keys_jira).intersection(set(falsy_keys_config)):
output_dict[key] = None
return output_dict