Source code for zcloud.console_scripts.order_uploader

import logging
import os
from typing import Optional, List,  Set, Tuple, Callable,  Any, Dict, Iterable

import click
import numpy as np
import pandas as pd

from zcloud.benchling_order import (
    # Enums
    OrderUploaderColumns,
    OrderUploaderInputColumns,

    # Column and table utilities
    get_required_columns,
    get_all_indexed_input_columns,
    get_small_table_fields,
    get_small_table_rename_dict,
    get_possible_columns_names_for_metadata_table,

    # DataFrame operations
    initialize_df_with_design_name,
    pdapply_build_design_names_from_row,
    check_generated_design_names,

    # ID candidate getters
    get_program_id_candidate,
    get_target_id_candidate,
    get_binding_site_candidates,
    get_fusion_id_candidates,
    get_iteration_id_candidate,

    # ID checkers
    check_program_id,
    check_target_id,
    check_binding_site_id,
    check_fusion_id,
    check_user_id,
    
    # Benchling operations
    publish_benchling_entry,
    create_benchling_order_folder,
    register_protein_entities,

    # Schema and metadata
    check_scu_schema,
    get_full_metadata_tables,
    get_all_scu_tables,

    # Monday service
    ticket_details_from_monday_service,

    # Exceptions
    UnableToFindMetadataError,
)

from zcloud.data_enums import (
    DataBuckets,
    MetadataColumns,
    MetadataTables,
    MondayBoardFields,
    ServiceKeys,
)




from zcloud.gcp_services import get_storage_client, pandas_df_to_csv_bytes
from zcloud.rapidfuzz import fuzzy_match

logger = logging.getLogger(__name__)


[docs] def get_candidate_resolver_single_value(value: str) -> Callable: """ Get a candidate resolver function for single-value metadata fields. Parameters ---------- value : str The field name to get a resolver for. Must be one of: 'iteration_number', 'program_id', 'target_id'. Returns ------- Callable A function that can be used to resolve the specified metadata field. Raises ------ KeyError If the provided value is not a valid field name. """ dispatch = { "iteration_number": get_iteration_id_candidate, "program_id": get_program_id_candidate, "target_id": get_target_id_candidate, } return dispatch[value]
[docs] def get_candidate_resolver_set_of_values(value: str) -> Callable: """ Get a candidate resolver function for set-value metadata fields. Parameters ---------- value : str The field name to get a resolver for. Must be one of: 'binding_site_id', 'fusion_id'. Returns ------- Callable A function that can be used to resolve the specified metadata field, returning a set of values. Raises ------ KeyError If the provided value is not a valid field name. """ dispatch = { "binding_site_id": get_binding_site_candidates, "fusion_id": get_fusion_id_candidates, } return dispatch[value]
[docs] def get_metadata_checker(value: str) -> Callable: """ Get a metadata checker function for validating metadata values. Parameters ---------- value : str The metadata table name to get a checker for. Must be one of: 'binding_site_table', 'fusion_table', 'program_table', 'target_table'. Returns ------- Callable A function that can be used to validate metadata values against the specified table. Raises ------ KeyError If the provided value is not a valid metadata table name. """ dispatch = { MetadataTables.BINDING_SITE_TABLE.value: check_binding_site_id, MetadataTables.FUSION_TABLE.value: check_fusion_id, MetadataTables.PROGRAM_TABLE.value: check_program_id, MetadataTables.TARGET_TABLE.value: check_target_id, # "iteration_number": check_iteration_number, #not implemented } return dispatch[value]
[docs] def confirm_set_of_values_from_user( value_type: str, df: pd.DataFrame, cli_value: Optional[str], allow_cli_override: bool ) -> Set[str]: """ Confirm and resolve a set of values from user input or DataFrame. This function attempts to resolve values from the DataFrame first, and if that fails, prompts the user for input or uses CLI override values. Parameters ---------- value_type : str The type of value to resolve (e.g., 'binding_site_id', 'fusion_id'). df : pd.DataFrame The input DataFrame containing the data to analyze. cli_value : Optional[str] Optional CLI-provided value to use as override. allow_cli_override : bool Whether to allow CLI values to override DataFrame values. Returns ------- Set[str] A set of resolved values for the specified type. Raises ------ SystemExit If user chooses to exit during the confirmation process. """ try: value_queries = get_candidate_resolver_set_of_values(value_type)( df, cli_value, allow_cli_override ) except ValueError as e: if not allow_cli_override: if cli_value: override_confirmation = click.confirm( f"Unable to unambiguously determine {value_type} from input CSV. Would you like to apply cli override and use {cli_value}? (y/n): " ) if override_confirmation: value_queries = {cli_value} else: # gracefully fail and apologize logger.error( f"Unable to unambiguously determine {value_type} from input CSV. Please provide a {value_type}: " ) exit(0) else: try: value_candidate = click.prompt( f"Please provide a {value_type}: " + "\n" + "ctrl+c (keyboard interrupt) to exit", type=str, ) except click.exceptions.Abort: logger.error("Exiting...") exit(0) value_queries = {value_candidate} return value_queries
[docs] def confirm_single_value_from_user( value_type: str, df: pd.DataFrame, cli_value: Optional[str], allow_cli_override: bool ) -> str: """ Confirm and resolve a single value from user input or DataFrame. This function attempts to resolve a value from the DataFrame first, and if that fails, prompts the user for input or uses CLI override values. Parameters ---------- value_type : str The type of value to resolve (e.g., 'program_id', 'target_id'). df : pd.DataFrame The input DataFrame containing the data to analyze. cli_value : Optional[str] Optional CLI-provided value to use as override. allow_cli_override : bool Whether to allow CLI values to override DataFrame values. Returns ------- str The resolved value for the specified type. Raises ------ SystemExit If user chooses to exit during the confirmation process. """ try: value_candidate = get_candidate_resolver_single_value(value_type)( df, cli_value, allow_cli_override ) except ValueError as e: if not allow_cli_override: if cli_value: override_confirmation = click.confirm( f"Unable to unambiguously determine {value_type} from input CSV. Would you like to apply cli override and use {cli_value}? (y/n): " ) if override_confirmation: value_candidate = cli_value else: logger.error( f"Unable to unambiguously determine {value_type} from input CSV. Please provide a {value_type}: " ) exit(0) else: try: value_candidate = click.prompt( f"Please provide a {value_type }: ", type=str ) except click.exceptions.Abort: logger.error("Exiting...") exit(0) return value_candidate
[docs] def ask_for_new_value(query_val: str, metadata_table_id: str, max_attempts: int = 3) -> str: """ Prompt the user for a new value when the original cannot be resolved. Parameters ---------- query_val : str The original value that could not be resolved. metadata_table_id : str The ID of the metadata table being queried. max_attempts : int, optional Maximum number of attempts before exiting, by default 3. Returns ------- str A new value provided by the user. Raises ------ SystemExit If max attempts are reached or user provides the same value repeatedly. """ count = 0 while True: try_again_val = click.prompt( f"Unable to resolve {query_val}. Provide a different value for {metadata_table_id} or press enter to continue: ", default="", ) if try_again_val != query_val: return try_again_val else: logger.error( f"Your entered value {try_again_val} is the same as the value you are trying to resolve. Please provide a different value to check for in {metadata_table_id}, or exit with ctrl+c" ) count += 1 if count >= max_attempts: logger.error(f"Max attempts ({max_attempts}) reached. Exiting...") exit(0)
[docs] def ask_user_to_confirm_fuzzy_match( query_val: str, fuzzy_matches: Iterable[str], metadata_table_id: str ) -> str: """ Ask the user to confirm a fuzzy match selection from a list of candidates. Parameters ---------- query_val : str The original query value that did not match exactly. fuzzy_matches : Iterable[str] A list of fuzzy match candidates. metadata_table_id : str The ID of the metadata table being queried. Returns ------- str The value selected by the user (either from fuzzy matches or a new value). Raises ------ SystemExit If user chooses to exit during the selection process. """ if len(fuzzy_matches) == 1: fuzzy_match_val = fuzzy_matches[0] user_confirms = click.confirm( f"{query_val} does not match any registered names. Did you mean {fuzzy_match_val}? (y/n): " ) if user_confirms: return fuzzy_match_val else: return ask_for_new_value(query_val, metadata_table_id) if len(fuzzy_matches) > 1: while True: fuzzy_match_list = list(fuzzy_matches) # Offer the user a numbered list of all the fuzzy matches fuzzy_match_choices = [ f"{i+1}. {match}" for i, match in enumerate(fuzzy_match_list) ] fuzzy_match_choices.append(f"{len(fuzzy_match_choices)+1}. Or, enter a new value") fuzzy_match_str = "\n".join(fuzzy_match_choices) user_choice = click.prompt( f"Please select the number of the fuzzy match you meant to check for in {metadata_table_id}: \n{fuzzy_match_str}\n" ) if user_choice.isdigit() and 1 <= int(user_choice) <= len(fuzzy_match_list): fuzzy_match_val = fuzzy_match_list[int(user_choice) - 1] return fuzzy_match_val elif user_choice == str(len(fuzzy_match_choices)): return ask_for_new_value(query_val, metadata_table_id) else: logger.error( f"Invalid input. Please enter a number between 1 and {len(fuzzy_match_list)} or ctrl+c to exit" ) return ask_for_new_value(query_val, metadata_table_id)
[docs] def ask_user_to_decide_what_to_do_with_name_resolution( df: pd.DataFrame, generated_design_names: pd.Series, original_input_columns: pd.Index ) -> pd.DataFrame: """ Ask the user to decide how to handle design name resolution conflicts. When generated design names clash with expectations, this function presents the user with options to either use the generated names, save them to a file, or exit the process. Parameters ---------- df : pd.DataFrame The main DataFrame containing the order data. generated_design_names : pd.Series The generated design names that caused the conflict. original_input_columns : pd.Index The original column names from the input CSV. Returns ------- pd.DataFrame The DataFrame with design names updated if user chose to use generated names. Raises ------ SystemExit If user chooses to save names to file or exit. """ use_generated_num = 1 save_generated_names_num = 2 exit_num = 3 user_choices = [ f"{use_generated_num}. Use the generated design names, continue to upload", f"{save_generated_names_num}. Save the generated names to file, and exit", f"{exit_num}. Exit", ] logger.warning( f"Design names clash with expectations based on the input configuration" ) logger.warning( f"WARNING: if you choose ({use_generated_num}), the 'raw' csv will be the original (mismatched) one provided at the beginning." ) user_choice_list = "\n".join(user_choices) user_choice = click.prompt( f"Unable to find design names. Please select an option: \n{user_choice_list}\n" ) if int(user_choice) == use_generated_num: df[OrderUploaderInputColumns.DESIGN_NAME.value] = generated_design_names elif int(user_choice) == save_generated_names_num: user_output_path = click.prompt( f"Please provide a path to save the generated design names: ", default="generated_design_names.csv", ) output_df = df[original_input_columns] output_df[OrderUploaderInputColumns.DESIGN_NAME.value] = generated_design_names output_df.to_csv(user_output_path, index=False) logger.warning(f"Generated design names saved to {user_output_path}") exit(0) elif int(user_choice) == exit_num: exit(0) return df
[docs] def check_metadata_against_oracle_with_fuzzy_find_on_fail( metadata_table_id: str, query_value: str, allowed_other_ids: Optional[List[str]] = None, try_to_find_monday_id: Optional[str] = None, fuzzy_match_threshold: int = 70, _recursion_depth: int = 0, ) -> Tuple[Any, ...]: """ Check metadata value against oracle with fuzzy matching fallback. Attempts to validate a metadata value against the oracle database. If the exact match fails, falls back to fuzzy matching and user confirmation. Uses recursion to handle multiple validation attempts. Parameters ---------- metadata_table_id : str The ID of the metadata table to check against. query_value : str The value to validate. allowed_other_ids : Optional[List[str]], optional Additional IDs that are allowed for validation, by default None. try_to_find_monday_id : Optional[str], optional Monday ID to try to match against, by default None. fuzzy_match_threshold : int, optional Threshold for fuzzy matching (0-100), by default 70. _recursion_depth : int, optional Internal recursion depth counter, by default 0. Returns ------- Tuple[Any, ...] A tuple containing the validated metadata information. The exact structure depends on the metadata checker function used. Raises ------ SystemExit If maximum recursion depth is reached. ValueError If there are errors retrieving metadata tables. """ if _recursion_depth > 3: logger.error( f"Max recursion depth ({_recursion_depth}) reached, you will need to start from the beginning. Exiting..." ) exit(0) metadata_checker = get_metadata_checker(metadata_table_id) try: metadata_checker_kwargs = {} if try_to_find_monday_id: metadata_checker_kwargs["try_to_find_monday_id"] = try_to_find_monday_id if allowed_other_ids: metadata_checker_kwargs["allowed_other_ids"] = allowed_other_ids oracle_tuple = metadata_checker(query_value, **metadata_checker_kwargs) except UnableToFindMetadataError as e: logger.error(e) metadata_tables_response = get_full_metadata_tables(metadata_table_id) response_code = metadata_tables_response.status_code response_json = metadata_tables_response.json() if response_code >= 300: raise ValueError(response_json.get("error")) response_records = response_json[metadata_table_id] metadata_table = pd.DataFrame(response_records) allowed_column_names = get_possible_columns_names_for_metadata_table( metadata_table_id ) vals_to_search = list( set(metadata_table[allowed_column_names].values.flatten()) ) fuzzy_match_results = fuzzy_match( query_value, vals_to_search, threshold=fuzzy_match_threshold, ) new_value = ask_user_to_confirm_fuzzy_match( query_value, [match[0] for match in fuzzy_match_results], metadata_table_id ) oracle_tuple = check_metadata_against_oracle_with_fuzzy_find_on_fail( metadata_table_id, new_value, _recursion_depth=_recursion_depth + 1, **metadata_checker_kwargs, ) return oracle_tuple
[docs] def check_scu_against_oracle_with_fuzzy_find_on_fail( score_columns_to_check: Set[str], _recursion_depth: int = 0, all_tables: Optional[Dict[str, List[Dict[str, str]]]] = None, keymap: Optional[Dict[str, str]] = None, ) -> Tuple[Dict[str, List[str]], Set[str], Dict[str, str]]: """ Check score columns against SCU oracle with fuzzy matching fallback. Validates score column names against the SCU (Score Column Unit) schema. If exact matches fail, provides fuzzy matching and user interaction to resolve column names. Handles orphaned columns that cannot be matched. Parameters ---------- score_columns_to_check : Set[str] Set of score column names to validate. _recursion_depth : int, optional Internal recursion depth counter, by default 0. all_tables : Optional[Dict[str, List[Dict[str, str]]]], optional Cached SCU tables data to avoid repeated API calls, by default None. keymap : Optional[Dict[str, str]], optional Mapping of original column names to corrected names, by default None. Returns ------- Tuple[Dict[str, List[str]], Set[str], Dict[str, str]] A tuple containing: - Dictionary mapping table IDs to lists of found field names - Set of orphaned score columns that couldn't be matched - Dictionary mapping original column names to corrected names Raises ------ SystemExit If maximum recursion depth is reached. ValueError If there are errors retrieving SCU tables. """ if _recursion_depth > 10: logger.error( f"Max recursion depth ({_recursion_depth}) reached, you will need to start from the beginning. Exiting..." ) exit(0) score_column_results = check_scu_schema( { MetadataColumns.FIELD.value: list(score_columns_to_check), }, all_tables=all_tables, ) found_columns_set = set() for table_id, found_fields in score_column_results.items(): found_columns_set.update(found_fields) orphaned_score_columns = score_columns_to_check - found_columns_set if len(orphaned_score_columns) == 0: return score_column_results, set(), keymap or {} else: if _recursion_depth == 0: click.echo( f"Unable to find these columns in the registered SCU values: {orphaned_score_columns}" ) click.echo( f"WARNING:If you choose a fuzzy match, your score tables uploaded will not match the raw uploaded csv" ) check_for_fuzzy_matches = click.confirm( f"Would you like to check for fuzzy matches? (y/n): " ) if not check_for_fuzzy_matches: return score_column_results, orphaned_score_columns, keymap or {} else: return score_column_results, orphaned_score_columns, keymap or {} all_scu_tables = get_all_scu_tables() if all_scu_tables.get("success") == False: raise ValueError(f"Error getting all SCU tables: {all_scu_tables.get('error')}") for table_id, table_data in all_scu_tables.items(): cols_to_check = list( orphaned_score_columns ) # Don't make new orphans! It breaks our keymapping for check_val in cols_to_check: scu_fields = set(row.get(MetadataColumns.FIELD.value) for row in table_data) # table is list of dicts fuzzy_match_results = fuzzy_match( check_val, scu_fields, threshold=70, ) if fuzzy_match_results: new_value = ask_user_to_confirm_fuzzy_match( check_val, [match[0] for match in fuzzy_match_results], table_id, ) if not new_value: continue found_score_results, _, keymap = ( check_scu_against_oracle_with_fuzzy_find_on_fail( {new_value}, _recursion_depth=_recursion_depth + 1, all_tables=all_scu_tables, keymap=keymap, ) ) if found_score_results: for table_id, found_fields in found_score_results.items(): score_column_results_list_at_table_id = ( score_column_results.get(table_id, []) ) for new_value in found_fields: # should only ever be one if new_value not in score_column_results_list_at_table_id: score_column_results_list_at_table_id.append(new_value) score_column_results[table_id] = ( score_column_results_list_at_table_id ) found_columns_set.add(new_value) orphaned_score_columns.remove(check_val) keymap[check_val] = ( new_value # map the original value to the new value, we're ok with overwriting in a loop, we never put anything new in check_val ) if _recursion_depth == 0 and orphaned_score_columns: click.echo( f"Unable to find these columns in the registered SCU values: {orphaned_score_columns}" ) rename_manually = click.confirm( f"Would you like to rename the columns manually (and check them against the oracle) (y), or keep non-indexed score columns (n)? (y/n): " ) if rename_manually: remaining_orphaned_columns_list = list(orphaned_score_columns) for non_found_score in remaining_orphaned_columns_list: new_score_name = click.prompt( f"Please provide a new name for {non_found_score}: " ) found_score_results, _, keymap = ( check_scu_against_oracle_with_fuzzy_find_on_fail( {new_score_name}, _recursion_depth=_recursion_depth + 1, all_tables=all_scu_tables, keymap=keymap, ) ) if found_score_results: for table_id, found_fields in found_score_results.items(): score_column_results_list_at_table_id = ( score_column_results.get(table_id, []) ) for new_value in found_fields: # should only ever be one if new_value not in score_column_results_list_at_table_id: score_column_results_list_at_table_id.append(new_value) score_column_results[table_id] = ( score_column_results_list_at_table_id ) found_columns_set.add(new_value) orphaned_score_columns.remove(new_value) keymap[non_found_score] = ( new_value # map the original value to the new value, we're ok with overwriting in a loop, we never put anything new in check_val ) if orphaned_score_columns: click.echo( f"These values were never found in the registered SCU values, they will be uploaded as non-indexed score columns: {orphaned_score_columns}" ) return score_column_results, orphaned_score_columns, keymap if keymap else {}
[docs] def check_tag_location_with_fuzzy_find_on_fail(df: pd.DataFrame,tag_column_name:str=OrderUploaderInputColumns.TAG_LOCATION.value) -> pd.DataFrame: """ Check tag location with fuzzy matching fallback. Validates tag location as one of two values: "N-term" or "C-term". No others are allowed. Parameters ---------- df : pd.DataFrame The input DataFrame containing the tag location column. tag_column_name : str The name of the tag location column. Returns ------- pd.DataFrame The input DataFrame with the tag location column fixed. """ # TODO maybe make this an enum or call oracle? Seems silly, but it is schema enforced as an enum on benchling, and will fail cryptically if we change that end. allowed_vals = ["N-term", "C-term"] n_term_mask = df[tag_column_name] == allowed_vals[0] c_term_mask = df[tag_column_name] == allowed_vals[1] either_or_mask = n_term_mask | c_term_mask correction_dict = {val:val for val in allowed_vals} if not either_or_mask.all(): unique_wrong_vals = df[tag_column_name][~either_or_mask].unique() for wrong_val in unique_wrong_vals: logger.warning(f"Tag location {wrong_val} is not one of the allowed values: N-term or C-term.") val_is_wrong = True wrong_entry_count = 0 while val_is_wrong: fuzzy_match_results = fuzzy_match(wrong_val, allowed_vals, threshold=70) result_list = [match[0] for match in fuzzy_match_results] new_val = ask_user_to_confirm_fuzzy_match(wrong_val, result_list, "tag_locations: N-term or C-term") if not new_val: continue if new_val in allowed_vals: val_is_wrong = False correction_dict[wrong_val] = new_val else: wrong_val = new_val wrong_entry_count += 1 if wrong_entry_count > 3: logger.error(f"Tries to infer a valid tag location for {wrong_val} exceeded. Please check the input CSV and try again.") exit(0) df[tag_column_name] = df[tag_column_name].map(correction_dict) return df
@click.command() @click.option( "--input-csv-path", type=str, required=True, help="The path to the input CSV file. If the sequence_name column is well formatted and follows schema, then all you need is the csv of: design_name, sequence, tag_location", ) @click.option("--program-id", type=str, default=None) @click.option("--target-id", type=str, default=None) @click.option("--binding-site-id", type=str, default=None) @click.option("--user-id", type=str, default=None) @click.option("--monday-ticket-link", type=str, default=None) @click.option("--iteration-number", type=str, default=None) @click.option("--fusion_id", type=str, default=None) @click.option( "--additional-upload-paths", default=None, type=click.Path(exists=True), multiple=True, help="Additional paths to upload, can be used to upload any files or directories recursively to the order bucket. Useful for if you used an unusual config and want to record it", ) @click.option( "--allow-cli-override", is_flag=True, default=False, help="Allow CLI arguments to override the data in the input CSV, default is to fail and complain", ) def upload_order( input_csv_path: str, program_id: Optional[str], target_id: Optional[str], binding_site_id: Optional[str], user_id: Optional[str], monday_ticket_link: Optional[str], iteration_number: Optional[str], fusion_id: Optional[str], additional_upload_paths: Tuple[str, ...], allow_cli_override: bool, ) -> None: """ Upload an order to Benchling, validate and/or generate the sequence names and score columns. If the input CSV sequence_name column is well formatted and follows schema, then all you need is the csv of: design_name, sequence, tag_location You can also omit the name, as long as you provide either a column with the appropriate program id, binder id, binding site id, etc. You may also provide missing data as an argument, which will override the data in the csv (and will regenerate the names). Parameters ---------- input_csv_path : str The path to the input CSV file. program_id : Optional[str] The ID of the program. target_id : Optional[str] The ID of the target. binding_site_id : Optional[str] The ID of the binding site. user_id : Optional[str] The ID of the user. monday_ticket_link : Optional[str] The link to the Monday ticket. iteration_number : Optional[str] The iteration number. fusion_id : Optional[str] The ID of the fusion. additional_upload_paths : Tuple[str, ...] Additional paths to upload, can be used to upload any files or directories recursively to the order bucket. Useful for if you used an unusual config and want to record it. allow_cli_override : bool Allow CLI arguments to override the data in the input CSV, default is to fail and complain. Raises ------ FileNotFoundError If the input CSV file is not found. ValueError If required columns are missing or if there are validation errors. SystemExit If user chooses to exit during interactive prompts. """ level = os.getenv("PYTHON_LOG_LEVEL", "INFO").upper() logging.basicConfig(level=level) # Load input CSV data if not os.path.exists(input_csv_path): raise FileNotFoundError(f"Input CSV file not found: {input_csv_path}") df = pd.read_csv(input_csv_path) original_input_columns = df.columns df.index = range(1, len(df) + 1) # Validate required columns required_columns = get_required_columns() all_indexed_input_columns = get_all_indexed_input_columns() potential_score_columns = set( [col for col in df.columns if col not in all_indexed_input_columns] ) if potential_score_columns: logger.warning( f"Potential score columns found in input CSV: {potential_score_columns}. These will be uploaded to the order bucket." ) missing_columns = [col for col in required_columns if col not in df.columns] have_design_column = OrderUploaderInputColumns.DESIGN_NAME.value in df.columns # we actually want if missing_columns: # Check if tag_location or sequence are missing - these are mandatory mandatory_missing = [col for col in required_columns if col in missing_columns] if mandatory_missing: raise ValueError( f"Required columns missing from input CSV: {mandatory_missing}. Tag location and sequence MUST be provided." ) # If only design_name is missing, we can construct it later if we have the necessary parameters if not have_design_column: print( f"Warning: {OrderUploaderInputColumns.DESIGN_NAME.value} column is missing. Will construct from other parameters if possible." ) else: df = initialize_df_with_design_name(df) df = check_tag_location_with_fuzzy_find_on_fail(df) if monday_ticket_link: ticket_details = ticket_details_from_monday_service(monday_ticket_link) if ticket_details.get("success") == False: logger.error( f"Error getting Monday ticket details: {ticket_details.get('error')}" ) continue_execution = click.confirm(f"Continue execution? (y/n): ") if not continue_execution: logger.error("Exiting...") exit(0) logger.info(f"Ticket details: {ticket_details}") monday_program_id_from_api = ticket_details.get(MondayBoardFields.PROJECT.value) monday_target_id_from_api = ticket_details.get(MondayBoardFields.TARGET.value) monday_iteration_code_from_api = ticket_details.get( MondayBoardFields.ITERATION_CODE.value ) logger.info(f"Monday program id from api: {monday_program_id_from_api}") logger.info(f"Monday target id from api: {monday_target_id_from_api}") logger.info(f"Monday iteration code from api: {monday_iteration_code_from_api}") program_id_query = confirm_single_value_from_user( "program_id", df, program_id, allow_cli_override ) program_id_benchling, program_id_design, program_id_monday = ( check_metadata_against_oracle_with_fuzzy_find_on_fail( MetadataTables.PROGRAM_TABLE.value, program_id_query, try_to_find_monday_id=monday_program_id_from_api, ) ) if program_id_monday != monday_program_id_from_api: logger.warning( f"Registered Monday Program ID {program_id_monday} does not match the actual Monday ID {monday_program_id_from_api} on the ticket" ) continue_execution = click.confirm(f"Continue execution? (y/n): ") if not continue_execution: logger.error("Exiting...") exit(0) # check None, check fuzzy, prompt user target_id_query = confirm_single_value_from_user( "target_id", df, target_id, allow_cli_override ) ( matching_program_id, matching_target_id_benchling, matching_target_id_design, matching_target_id_monday, matching_target_id_internal, ) = check_metadata_against_oracle_with_fuzzy_find_on_fail( MetadataTables.TARGET_TABLE.value, target_id_query, allowed_other_ids=[ program_id_benchling, program_id_design, program_id_monday, monday_program_id_from_api, ], try_to_find_monday_id=monday_target_id_from_api, ) if matching_target_id_monday != monday_target_id_from_api: logger.warning( f"Registered Monday Target ID {matching_target_id_monday} does not match the actual Monday ID {monday_target_id_from_api} on the ticket" ) continue_execution = click.confirm(f"Continue execution? (y/n): ") if not continue_execution: logger.error("Exiting...") exit(0) binding_site_queries = confirm_set_of_values_from_user( "binding_site_id", df, binding_site_id, allow_cli_override ) binding_site_id_to_binding_site_oracle_data = {} for binding_site_query in binding_site_queries: matching_target_id, binding_site_id_benchling, binding_site_id_design = ( check_metadata_against_oracle_with_fuzzy_find_on_fail( MetadataTables.BINDING_SITE_TABLE.value, binding_site_query, allowed_other_ids=[ matching_target_id_benchling, matching_target_id_design, ], ) ) binding_site_id_to_binding_site_oracle_data[binding_site_query] = { "target_id": matching_target_id, "binding_site_id_benchling": binding_site_id_benchling, "binding_site_id_design": binding_site_id_design, } fusion_id_queries = confirm_set_of_values_from_user( "fusion_id", df, fusion_id, allow_cli_override ) fusion_id_to_fusion_oracle_data = {} for fusion_id_query in fusion_id_queries: matching_target_id, fusion_id_benchling, fusion_id_design = ( check_metadata_against_oracle_with_fuzzy_find_on_fail( MetadataTables.FUSION_TABLE.value, fusion_id_query, ) ) fusion_id_to_fusion_oracle_data[fusion_id_query] = { "target_id": matching_target_id, "fusion_id_benchling": fusion_id_benchling, "fusion_id_design": fusion_id_design, } iteration = confirm_single_value_from_user( "iteration_number", df, iteration_number, allow_cli_override ) if iteration != monday_iteration_code_from_api: logger.warning( f"Registered Monday Iteration Code {iteration} does not match the actual Monday Iteration Code {monday_iteration_code_from_api} on the ticket" ) continue_execution = click.confirm(f"Continue execution? (y/n): ") if not continue_execution: logger.error("Exiting...") exit(0) logger.info(f"Iteration: {iteration}") logger.info(f"Matching target id design: {matching_target_id_design}") logger.info( f"Number of binding sites: {len(binding_site_id_to_binding_site_oracle_data)}" ) for ( binding_site_query, binding_site_oracle_data, ) in binding_site_id_to_binding_site_oracle_data.items(): logger.info(f"Binding site query: {binding_site_query}") logger.info(f"Binding site oracle data: {binding_site_oracle_data}") logger.info(f"Number of fusion ids: {len(fusion_id_to_fusion_oracle_data)}") for ( fusion_id_query, fusion_id_oracle_data, ) in fusion_id_to_fusion_oracle_data.items(): logger.info(f"Fusion id query: {fusion_id_query}") logger.info(f"Fusion id oracle data: {fusion_id_oracle_data}") try: generated_design_names = df.apply( pdapply_build_design_names_from_row, args=( iteration, matching_target_id_design, binding_site_id, fusion_id, allow_cli_override, ), axis=1, ) check_generated_design_names( df, generated_design_names, allow_cli_override, ) except UnableToFindMetadataError as e: logger.error(e) df = ask_user_to_decide_what_to_do_with_name_resolution( df, generated_design_names, original_input_columns ) small_table_data = df[get_small_table_fields()] # rename from input schema to the schema the service expects (ideally no change) small_table_data.rename( columns=get_small_table_rename_dict(), inplace=True, ) score_column_results, orphaned_score_columns, keymap = ( check_scu_against_oracle_with_fuzzy_find_on_fail( set(potential_score_columns), ) ) order_folder_response = create_benchling_order_folder( program_id_benchling, matching_target_id_benchling, iteration ) protein_registry_folder_id = order_folder_response.get("registry_folder_id") if not protein_registry_folder_id: raise ValueError( f"Failed to create protein registry folder for {program_id_benchling}, {matching_target_id_benchling}, {iteration}" ) small_table_as_serializable_dict = small_table_data.to_dict("records") register_protein_entities_response = register_protein_entities( protein_registry_folder_id, small_table_as_serializable_dict ) error = register_protein_entities_response.get("error") if error: raise ValueError(error) aa_sequences = register_protein_entities_response["aaSequences"] design_name2registry_id = { aa_sequence["name"]: aa_sequence["entityRegistryId"] for aa_sequence in aa_sequences } small_table_data[OrderUploaderColumns.ENTITY_REGISTRY_ID.value] = small_table_data[ OrderUploaderColumns.DESIGN_NAME.value ].map(design_name2registry_id) # Upload data to GCS bucket bucket_path = f"{program_id_design}/{matching_target_id_benchling}/{iteration}" gcs_bucket = DataBuckets.ORDER_UPLOADER_GCS_BUCKET.value # Get storage client from utilities storage_client = get_storage_client() bucket = storage_client.bucket(gcs_bucket) # Upload the small table as a csv to GCS csv_bytes = pandas_df_to_csv_bytes(small_table_data) blob = bucket.blob(f"{bucket_path}/benchling_sequence_table.csv") blob.upload_from_file(csv_bytes) for table_id, found_fields in score_column_results.items(): col_remap = {col:keymap[col] for col in df.columns if col in keymap.keys()} new_df = df.copy() new_df.rename(columns=col_remap, inplace=True) just_these_fields = new_df[found_fields] csv_bytes = pandas_df_to_csv_bytes(just_these_fields) blob = bucket.blob(f"{bucket_path}/{table_id}.csv") blob.upload_from_file(csv_bytes) if orphaned_score_columns: logger.warning( f"Score columns {orphaned_score_columns} not found in SCU schema" ) orphaned_score_cols_list = list(orphaned_score_columns) csv_bytes = pandas_df_to_csv_bytes(df[orphaned_score_cols_list]) blob = bucket.blob(f"{bucket_path}/orphaned_score_columns.csv") blob.upload_from_file(csv_bytes) for path in additional_upload_paths: # check if path is a file or a directory if os.path.isfile(path): blob = bucket.blob(f"{bucket_path}/{path}") blob.upload_from_filename(path) elif os.path.isdir(path): for root, dirs, files in os.walk(path): for file in files: blob = bucket.blob(f"{bucket_path}/{os.path.join(root, file)}") blob.upload_from_filename(os.path.join(root, file)) entry_name = f"{program_id_benchling}_{matching_target_id_benchling}_{iteration}" try: user_id,user_email,user_full_name,cluster_id = check_user_id(user_id) except Exception as e: logger.error(f"Error getting user info: {e}") raise e iteration_folder_id = order_folder_response.get("iteration_folder_id") # Create the benchling entry benchling_entry_query_dict = { ServiceKeys.SEQUENCE_RECORDS.value: small_table_data.to_dict("records"), ServiceKeys.BIG_CSV_DATA.value: pd.read_csv(input_csv_path).to_dict("records"), ServiceKeys.ENTRY_NAME.value: entry_name, ServiceKeys.GCS_BUCKET.value: f"{gcs_bucket}/{bucket_path}", ServiceKeys.AUTHOR_ZIPTX_EMAIL.value: user_email, ServiceKeys.MONDAY_TICKET_URL.value: monday_ticket_link, ServiceKeys.ITERATION_FOLDER_ID.value: iteration_folder_id, } entry_creation_response = publish_benchling_entry(benchling_entry_query_dict) if entry_creation_response.get("error"): raise ValueError( "error publishing benchling entry: " + "\n" + str(entry_creation_response.get("error")) ) if __name__ == "__main__": upload_order()