Source code for zcloud.generic_scu_transfer

"""

This module provides a flexible, schema-configured pull-to-push adaptor to find files generated by a registered SCU and push them to a bucket. It supports some logical organization of raw files via "elements", though it can be brute-forced with a single element and a sufficiently generic rule.

Functions
---------

:func:`get_group_name`

:func:`complain_about_file_count`

:func:`run_discovery`

:func:`send_files_from_manifest`

Classes
-------

:class:`DiscoveryRule`

:class:`FileSchemaParser`

"""

# Supports some logical organization of raw files via "elements", though you can brute force it with a single element and a sufficiently generic rule

import datetime
import fnmatch
import importlib.resources as resources
import json
import logging
import os
from typing import Optional
from uuid import uuid4

import click
from google.cloud import storage

from zcloud import get_scu_manifest_handler, load_raw_schema,SCUManifestSchemaHandler

logger = logging.getLogger(__name__)


[docs] def check_depth(file:str, max_depth:Optional[int]=None, min_depth:Optional[int]=None) -> bool: our_depth = file.count(os.sep) if max_depth is not None: if our_depth > max_depth: return False if min_depth is not None: if our_depth < min_depth: return False return True
[docs] def complain_about_file_count( output_files: set, rule_name: str, num_files: Optional[int] = None, required: Optional[bool] = None, ) -> None: """ Raise a ValueError if the number of files found does not match the expected number The content of the error describes whether or not the rule requires the files to be found, and how many files were found. Parameters ---------- output_files : set The set of files found by the rule. rule_name : str The name of the rule. num_files : int, optional The number of files expected to be found. If not provided, the error message will say "at least one". required : bool, optional Whether the rule requires the files to be found. If not provided, the error message will say the rule is not required. Raises ------ ValueError If the number of files found does not match the expected number. """ if len(output_files) == 0 or ( not (num_files is None) and len(output_files) != num_files ): if required: if num_files is None: num_files_val = "at least one" else: num_files_val = str(num_files) raise ValueError( f"Found {len(output_files)} files for rule {rule_name}, expected {num_files_val}" ) else: if num_files is None: logger.warning( f"Found {len(output_files)} files for rule {rule_name}, but it is not marked as required" ) else: logger.warning( f"Found {len(output_files)} files for rule {rule_name}, expected {num_files}, but it is not marked as required in the schema" ) logger.warning( "num files is meant to be used with required=True, this warning is the only thing that will be raised when you use num_files without required=True in the discovery schema" )
[docs] def get_group_name(filename, group_by, pattern=None, extension=None) -> Optional[str]: """ Get the "group_name" for a file based on the "group_by" rules. This is used by the "DiscoveryRule" class for the "paired_group" type of rule. Parameters ---------- filename : str The filename to get the group name for. group_by : str The group_by rule to use. Currently, only "basename" is supported. pattern : str, optional The pattern to use for the group name. If not provided, the implicit pattern is ``*.{ext}``. extension : str, optional The extension to use for the group name. If not provided, the extension is not used. Returns ------- str The group name for the file. """ if group_by == "basename": base_path = os.path.basename(filename) if extension: if not extension.startswith("."): extension = f".{extension}" return "".join(base_path.rsplit(extension, 1)[0]) return
# if group_by prefix,suffix, etc think about implementation here re: pattern parsing
[docs] def read_manifest(bucket_name, manifest_path): storage_client = storage.Client() bucket = storage_client.bucket(bucket_name) blob = bucket.blob(manifest_path) manifest_content = blob.download_as_string() manifest = json.loads(manifest_content) return manifest
[docs] class DiscoveryRule: """ Configurable file discovery rule. This class is used to find files on a filesystem based on instructions in a meta schema. .. note:: This class is not optimized for performance in searching through large numbers of files or convoluted directory structures. It has O(n) complexity for each rule, where n is the number of files it is handed. One future improvement could be to have some "rule-aware" optimization at a higher level, or have rules with a lot of overlap run inside the same loop. Or ... just don't write 10,000 files in a single trajectory, how about that? The rule configuration should have the following keys: - **name**: A unique name for the rule. - **type**: The type of rule. Currently, only 'paired_group', 'pattern', and 'all_files' are supported. - **description** (optional): A description of the rule. - **criteria**: A dictionary containing the criteria for the rule. The dictionary should have the following keys: - **group_by**: A string indicating how to group files. Currently, only 'basename' is supported. Group_by is only relevant for paired_group rules, ignored otherwise. - **extensions**: A list of strings indicating the file extensions to examine for pairing. Extensions may include the leading period. Extensions are only relevant for 'paired_group' rules, ignored otherwise. - **max_depth**: An integer indicating the maximum depth to search for files. - **min_depth**: An integer indicating the minimum depth to search for files. - **fail_on_mismatch**: A boolean indicating whether to raise an error if a pair is missing. If False or omitted, the rule will ignore files that do not have all partners. - **pattern**: A string containing a glob pattern to match files. The pattern should contain a placeholder ``(.{ext})`` that will be replaced with the file extension for paired group rules. This replacement is also supported for single rules. - **required**: A boolean indicating whether finding files is required for this rule. If False or omitted, the rule will not raise an error if no files are found. - **num_files**: An integer indicating the number of files to find. If the number of files found does not match this number, an error will be raised. Paired_group rules will find this number of groups, while single rules will find this number of files. Incomplete groups are not counted towards this number, which can create unwanted behavior if the fail_on_mismatch and pattern criteria are abused. Parameters ---------- rule : dict A dictionary containing the rule configuration. """ def __init__(self, rule,): self.name = rule.get("name") self.type = rule.get("type") self.criteria = rule.get("criteria") # TODO there's some code duplication in the apply functions which could be broken out into helper functions.
[docs] def apply(self, file_list,) -> list[str]: if self.type == "paired_group": return self.apply_group_search(file_list,) elif self.type == "pattern": return self.apply_pattern(file_list,) elif self.type == "all_files": return self.apply_all_files(file_list,) return []
[docs] def apply_pattern(self, file_list: list[str]) -> list[str]: """ Apply a pattern search rule to a list of files. Pattern search rules are used to find individual files that match a certain pattern. Parameters ---------- file_list : list List of files to search through. Returns ------- list List of files that match the pattern search rule. """ logger.debug(f"Applying pattern search rule {self.name}") # parse out rules from the criteria extensions = self.criteria.get( "extensions" ) # list of extensions to examine for pairing if extensions is not None: if len(extensions) > 1: raise ValueError( "Single search rule cannot have more than one extension specified" ) extension = extensions[0] if not extension.startswith("."): extension = f".{extension}" else: extension = None logger.debug(f"Extensions: {extensions}") max_depth = self.criteria.get("max_depth") min_depth = self.criteria.get("min_depth") fail_on_mismatch = self.criteria.get( "fail_on_mismatch" ) if fail_on_mismatch is None: fail_on_mismatch = False pattern = self.criteria.get( "pattern" ) # glob pattern to match files, assumes {ext} is the extension logger.debug(f"Pattern: {pattern}") if pattern is None: if extension is None: raise ValueError( "No pattern or extension provided for single search rule, How am I meant to find the file?" ) else: pattern = "*.{ext}" subbed_pattern = ( pattern.rsplit(".{ext}", 1)[0] + extension if extension and pattern.endswith(".{ext}") else pattern ) logger.debug(f"Subbed pattern: {subbed_pattern}") required = self.criteria.get("required") if required is None: required = False num_files = self.criteria.get("num_files") output_files = set() for file in file_list: # check file organization rules # if max_depth is not None: # if file.count(os.sep) > max_depth: # continue # if min_depth is not None: # if file.count(os.sep) < min_depth: # continue if not check_depth(file, max_depth, min_depth,): continue basename = os.path.basename(file) normalized_file = os.path.normpath(file) # check pattern rules if ( fnmatch.fnmatch(basename, subbed_pattern) or fnmatch.fnmatch(normalized_file, subbed_pattern) or fnmatch.fnmatch(file, subbed_pattern) ): output_files.add(file) logger.debug(f"Output files: {output_files}") complain_about_file_count(output_files, self.name, num_files, required) return list(output_files)
[docs] def apply_all_files(self, file_list: list[str],) -> list[str]: """ Apply a rule to all files in the file list. This is a catch-all rule which allows "rule criteria" without requiring patterns or extensions. Parameters ---------- file_list : list List of files to search through. Raises ------ ValueError can raise a ValueError via complain_about_file_count Returns ------- list List of files that match the rule criteria """ max_depth = self.criteria.get("max_depth") min_depth = self.criteria.get("min_depth") required = self.criteria.get("required") if required is None: required = False num_files = self.criteria.get("num_files") output_files = set() for file in file_list: # check file organization rules # if max_depth is not None: # if file.count(os.sep) > max_depth: # continue # if min_depth is not None: # if file.count(os.sep) < min_depth: # continue if not check_depth(file, max_depth, min_depth,): continue output_files.add(file) complain_about_file_count( output_files, self.name, num_files, required ) # If you remove this, update the docstring return list(output_files)
[docs] class FileSchemaParser: def __init__( self, schema: dict, ): # manifest_schema_name:str="scu_manifest",manifest_version:str="1.0" load manifest meta schema to validate our manifest ... this is really more of an integration test than a needed func self.schema_version = schema.get("schema_version") self.description = schema.get("description") self.compute_type = schema.get("compute_type") self.discovery_rules = { rule["name"]: DiscoveryRule(rule) for rule in schema.get("discovery_rules", {}).get("rules", []) } self.elements = schema.get("elements") # raise a schema error here if an element lists a discovery rule that doesn't exist for element in self.elements: element_name = element.get("name") if "discovery_rules" in element: for rule_name in element["discovery_rules"]: if rule_name not in self.discovery_rules: raise ValueError( f"Element {element_name} lists discovery rule {rule_name} which does not exist in the schema" ) if "element" in element: for sub_element_name, sub_element in element["elements"].items(): if "discovery_rules" in sub_element: for rule_name in sub_element["discovery_rules"]: if rule_name not in self.discovery_rules: raise ValueError( f"Element {element_name}, sub-element: {sub_element_name} lists discovery rule {rule_name} which does not exist in the schema" ) self._path_cache = {}
[docs] def discover_files(self, directory: str) -> list: """ Discover files for each element in the schema. Elements can also have "sub-elements" and each element can have any number of discovery rules applied to it. Element structure is implied to have at least: .. code-block:: json { "name": "element_name", "description": "A brief description of the element", "discovery_rules": ["rule1", "rule2"], // And, optionally, with sub-elements (this can continue recursively, please avoid abusing it): "elements": [ { "name": "sub_element_name", "description": "A brief description of the sub-element", "discovery_rules": ["rule3", "rule4"], "elements": [] } ] // end optional sub-elements } Parameters ---------- directory : str The directory to search for files. Returns ------- dict A dictionary containing the discovered files for each element. Key: element name, Value: list of discovered files, unordered. """ if self.elements is None: return [] discovered_files = {} for element in self.elements: element_name = element.get("name") discovery_rules = element.get("discovery_rules") sub_elements = element.get("elements",None) discovered_files.update( self._discover_element_files(element_name, discovery_rules, directory,sub_elements=sub_elements) ) logger.debug(f"Discovered files for element {element_name}:") element_files = [{"name": element_name, "files": files} for element_name, files in discovered_files.items()] return element_files
def _discover_element_files( self, element_name: str, discovery_rules: dict, directory: str, parent_element: Optional[str] = None, sub_elements: Optional[dict] = None, ) -> dict: """ Discover files for an element and its sub-elements, recursively. When recursively discovering files for sub-elements, the parent element name is prepended to the sub-element name with a period. Parameters ---------- element_name : str The name of the element. element_content : dict The content of the element. directory : str The directory to search for files. parent_element : str, optional The parent element name. If not provided, the element is assumed to be a top-level element. Returns ------- dict A dictionary containing the discovered files for the element and its sub-elements. key: element name, value: list of discovered files, unordered. sub-elements are listed as "parent_element.sub_element", recursively. (e.g. "grandparent.parent.element") """ element_files = {} element_name_output = ( element_name if parent_element is None else f"{parent_element}.{element_name}" ) if not (discovery_rules is None): all_files = self._path_cache.get(directory, None) if all_files is None: all_files = [] for root, _, files in os.walk(os.path.abspath(directory)): logger.debug(f"Searching in {root}") logger.debug(files) for file in files: relpath = os.path.relpath(os.path.join(root, file), start=os.path.abspath(directory)) # final_path = os.path.join(directory, relpath) if not(directory ==".") else relpath all_files.append(relpath) self._path_cache[directory] = all_files logger.debug("All files:") logger.debug(all_files) current_list = element_files.get(element_name_output, []) for rule_name in discovery_rules: rule = self.discovery_rules[rule_name] current_list.extend([{"name":path} for path in rule.apply(all_files)]) #follow the schema which is flexible enough to allow for more complex annotations element_files[element_name_output] = current_list if not (sub_elements is None): for sub_element in sub_elements: sub_element_name = sub_element.get("name") sub_element_discovery_rules = sub_element.get("discovery_rules", None) sub_sub_elements = sub_element.get("elements",None) element_files.update( self._discover_element_files( sub_element_name, sub_element_discovery_rules, directory, parent_element=element_name_output, sub_elements=sub_sub_elements, ) ) return element_files
[docs] def get_manifest_dict(self, element_files: dict,manifest_schema_handler:Optional[SCUManifestSchemaHandler]=None, **meta) -> dict: """ Generates a manifest dictionary containing metadata and file information. Parameters ---------- element_files : dict A dictionary where keys are element names and values are lists of file paths. Returns ------- dict A dictionary containing the following keys: - "schema_version": The schema version. - "description": A description of the manifest. - "elements": Elements associated with the manifest. - "element_files": The input element_files dictionary. - "uuid": A unique identifier for the manifest. - "files": A list of normalized file paths with the UUID prepended. """ if manifest_schema_handler is None: manifest_schema_handler = get_scu_manifest_handler() manifest_dict = {} # get schema name, version, description etc manifest_dict["schema_version"] = self.schema_version manifest_dict[manifest_schema_handler.description] = self.description manifest_dict[manifest_schema_handler.elements] = self.elements manifest_dict[manifest_schema_handler.element_files] = element_files manifest_dict[manifest_schema_handler.meta] = {**meta} manifest_dict[manifest_schema_handler.meta][manifest_schema_handler.compute_type] = self.compute_type manifest_dict[manifest_schema_handler.meta][manifest_schema_handler.upload_timestamp] = datetime.datetime.now().isoformat() # get the files recursively from the element_files dict file_paths_set = {file[manifest_schema_handler.file_path] for element in element_files for file in element.get("files") } files_list = [{"name":path} for path in file_paths_set] # make a uuid for the manifest manifest_dict[manifest_schema_handler.meta][manifest_schema_handler.upload_uuid] = str(uuid4()) manifest_dict[manifest_schema_handler.files] = files_list logger.debug(f"Files: {files_list}") logger.debug(f"files added to manifest") return manifest_dict
[docs] def run_discovery(schema_name: str, directory: str,**meta) -> dict: """ Discover files in a directory based on a given schema and generate a manifest. Parameters ---------- schema_name : str The name of the schema to load. directory : str The directory to search for files. Returns ------- dict A dictionary representing the manifest of discovered files. """ schema = load_raw_schema(schema_name) parser = FileSchemaParser(schema) logger.debug(f"running discovery on {directory}") element_files = parser.discover_files(directory) manifest = parser.get_manifest_dict(element_files,**meta) return manifest
[docs] def send_files_from_manifest( bucket_name: str, project: str, manifest: dict, base_dir:str = ".", ): mh = get_scu_manifest_handler() storage_client = storage.Client() bucket = storage_client.bucket(bucket_name, user_project=project) upload_uuid = manifest[mh.meta][mh.upload_uuid] global_meta = manifest[mh.meta] # convert arrays to comma separated strings # hardcast all other values to strings sanitized_meta = { key:global_meta[key] if not isinstance(global_meta[key],list) else ",".join(global_meta[key]) for key in global_meta} for file_obj in manifest[mh.files]: file = file_obj[mh.file_path] local_path = os.path.join(base_dir, file) if not os.path.exists(local_path): raise FileNotFoundError(f"File {file} not found in {base_dir}") dest = os.path.join(upload_uuid, file) blob = bucket.blob(dest) # add metadata to the blob blob.metadata = sanitized_meta blob.upload_from_filename(local_path) logger.debug(f"File {local_path} uploaded to object: {dest} in {bucket_name}.") if not (base_dir == "."): logger.debug("base_dir is not .") logger.debug(f"Local path: {local_path}") logger.debug(f"Base dir: {base_dir}") logger.debug(f"File: {file}") logger.debug(f"Dest: {dest}") # upload manifest manifest_destiny = os.path.join( upload_uuid, f"{upload_uuid}_scu_manifest.json" ) manifest_blob = bucket.blob(manifest_destiny) #add metadata to the manifest: manifest_blob.metadata = sanitized_meta manifest_blob.upload_from_string(json.dumps(manifest)) logger.debug(f"Manifest uploaded to object: {manifest_destiny} in {bucket_name}.")
[docs] def config_from_path(path,scusch:Optional[SCUManifestSchemaHandler]=None): with open(path, "r") as f: raw_dict = json.load(f) return raw_dict
[docs] def scrape_jira_ticket(ticket_dict): jira_ticket_id = ticket_dict["id"] email_address = None ticket_assignee = ticket_dict["fields"].get("assignee") if not ticket_assignee is None: email_address = ticket_assignee.get("emailAddress") authors = [] if email_address is None else [email_address] config_dict_compatible = { "authors": authors, "jira_ticket_id": jira_ticket_id, } return config_dict_compatible
[docs] def sub_alias(potential_alias): aliases = { "dmitriz@ziptx.bio":"dmitriz", } return aliases.get(potential_alias,potential_alias)
[docs] def sub_aliases_and_validate_config(config_dict): #placeholder, should ping API oracle to validate that meta fields are permissible values if "authors" in config_dict: config_dict["authors"] = [sub_alias(author) for author in config_dict["authors"]] return config_dict
[docs] def merge_jira_and_config(config_dict,ticket_dict): jira_scraped = scrape_jira_ticket(ticket_dict) jira_config_dict = sub_aliases_and_validate_config(jira_scraped) unify_authors = jira_config_dict.get("authors",[]) unify_authors.extend(config_dict.get("authors",[])) config_dict["authors"] = unify_authors if "authors" in jira_config_dict: jira_config_dict.pop("authors") falsy_keys_jira = [] falsy_keys_config = [] #drop all falsy keys from the jira config for key in jira_config_dict.keys(): if not jira_config_dict[key]: falsy_keys_jira.append(key) jira_config_dict.pop(key) for key in config_dict.keys(): if not config_dict[key]: falsy_keys_config.append(key) config_dict.pop(key) #raise error if there are still matched keys between these two non_null_conflicting_keys = set(jira_config_dict.keys()).intersection(set(config_dict.keys())) if len(non_null_conflicting_keys) > 0: logger.debug(f"Non-null conflicting keys: {non_null_conflicting_keys}") raise ValueError("There are non-null conflicting key/value pairs present in both the Jira ticket and the config file. Please resolve these conflicts") output_dict = {**config_dict, **jira_config_dict} #set all falsy keys that were previously in both to None for key in set(falsy_keys_jira).intersection(set(falsy_keys_config)): output_dict[key] = None return output_dict