import logging
import os
from typing import Optional, List, Set, Tuple, Callable, Any, Dict, Iterable
import click
import numpy as np
import pandas as pd
from zcloud.benchling_order import (
# Enums
OrderUploaderColumns,
OrderUploaderInputColumns,
# Column and table utilities
get_required_columns,
get_all_indexed_input_columns,
get_small_table_fields,
get_small_table_rename_dict,
get_possible_columns_names_for_metadata_table,
# DataFrame operations
initialize_df_with_design_name,
pdapply_build_design_names_from_row,
check_generated_design_names,
# ID candidate getters
get_program_id_candidate,
get_target_id_candidate,
get_binding_site_candidates,
get_fusion_id_candidates,
get_iteration_id_candidate,
# ID checkers
check_program_id,
check_target_id,
check_binding_site_id,
check_fusion_id,
check_user_id,
# Benchling operations
publish_benchling_entry,
create_benchling_order_folder,
register_protein_entities,
# Schema and metadata
check_scu_schema,
get_full_metadata_tables,
get_all_scu_tables,
# Monday service
ticket_details_from_monday_service,
# Exceptions
UnableToFindMetadataError,
)
from zcloud.data_enums import (
DataBuckets,
MetadataColumns,
MetadataTables,
MondayBoardFields,
ServiceKeys,
)
from zcloud.gcp_services import get_storage_client, pandas_df_to_csv_bytes
from zcloud.rapidfuzz import fuzzy_match
logger = logging.getLogger(__name__)
[docs]
def get_candidate_resolver_single_value(value: str) -> Callable:
"""
Get a candidate resolver function for single-value metadata fields.
Parameters
----------
value : str
The field name to get a resolver for. Must be one of:
'iteration_number', 'program_id', 'target_id'.
Returns
-------
Callable
A function that can be used to resolve the specified metadata field.
Raises
------
KeyError
If the provided value is not a valid field name.
"""
dispatch = {
"iteration_number": get_iteration_id_candidate,
"program_id": get_program_id_candidate,
"target_id": get_target_id_candidate,
}
return dispatch[value]
[docs]
def get_candidate_resolver_set_of_values(value: str) -> Callable:
"""
Get a candidate resolver function for set-value metadata fields.
Parameters
----------
value : str
The field name to get a resolver for. Must be one of:
'binding_site_id', 'fusion_id'.
Returns
-------
Callable
A function that can be used to resolve the specified metadata field,
returning a set of values.
Raises
------
KeyError
If the provided value is not a valid field name.
"""
dispatch = {
"binding_site_id": get_binding_site_candidates,
"fusion_id": get_fusion_id_candidates,
}
return dispatch[value]
[docs]
def confirm_set_of_values_from_user(
value_type: str,
df: pd.DataFrame,
cli_value: Optional[str],
allow_cli_override: bool
) -> Set[str]:
"""
Confirm and resolve a set of values from user input or DataFrame.
This function attempts to resolve values from the DataFrame first, and if
that fails, prompts the user for input or uses CLI override values.
Parameters
----------
value_type : str
The type of value to resolve (e.g., 'binding_site_id', 'fusion_id').
df : pd.DataFrame
The input DataFrame containing the data to analyze.
cli_value : Optional[str]
Optional CLI-provided value to use as override.
allow_cli_override : bool
Whether to allow CLI values to override DataFrame values.
Returns
-------
Set[str]
A set of resolved values for the specified type.
Raises
------
SystemExit
If user chooses to exit during the confirmation process.
"""
try:
value_queries = get_candidate_resolver_set_of_values(value_type)(
df, cli_value, allow_cli_override
)
except ValueError as e:
if not allow_cli_override:
if cli_value:
override_confirmation = click.confirm(
f"Unable to unambiguously determine {value_type} from input CSV. Would you like to apply cli override and use {cli_value}? (y/n): "
)
if override_confirmation:
value_queries = {cli_value}
else:
# gracefully fail and apologize
logger.error(
f"Unable to unambiguously determine {value_type} from input CSV. Please provide a {value_type}: "
)
exit(0)
else:
try:
value_candidate = click.prompt(
f"Please provide a {value_type}: "
+ "\n"
+ "ctrl+c (keyboard interrupt) to exit",
type=str,
)
except click.exceptions.Abort:
logger.error("Exiting...")
exit(0)
value_queries = {value_candidate}
return value_queries
[docs]
def confirm_single_value_from_user(
value_type: str,
df: pd.DataFrame,
cli_value: Optional[str],
allow_cli_override: bool
) -> str:
"""
Confirm and resolve a single value from user input or DataFrame.
This function attempts to resolve a value from the DataFrame first, and if
that fails, prompts the user for input or uses CLI override values.
Parameters
----------
value_type : str
The type of value to resolve (e.g., 'program_id', 'target_id').
df : pd.DataFrame
The input DataFrame containing the data to analyze.
cli_value : Optional[str]
Optional CLI-provided value to use as override.
allow_cli_override : bool
Whether to allow CLI values to override DataFrame values.
Returns
-------
str
The resolved value for the specified type.
Raises
------
SystemExit
If user chooses to exit during the confirmation process.
"""
try:
value_candidate = get_candidate_resolver_single_value(value_type)(
df, cli_value, allow_cli_override
)
except ValueError as e:
if not allow_cli_override:
if cli_value:
override_confirmation = click.confirm(
f"Unable to unambiguously determine {value_type} from input CSV. Would you like to apply cli override and use {cli_value}? (y/n): "
)
if override_confirmation:
value_candidate = cli_value
else:
logger.error(
f"Unable to unambiguously determine {value_type} from input CSV. Please provide a {value_type}: "
)
exit(0)
else:
try:
value_candidate = click.prompt(
f"Please provide a {value_type }: ", type=str
)
except click.exceptions.Abort:
logger.error("Exiting...")
exit(0)
return value_candidate
[docs]
def ask_for_new_value(query_val: str, metadata_table_id: str, max_attempts: int = 3) -> str:
"""
Prompt the user for a new value when the original cannot be resolved.
Parameters
----------
query_val : str
The original value that could not be resolved.
metadata_table_id : str
The ID of the metadata table being queried.
max_attempts : int, optional
Maximum number of attempts before exiting, by default 3.
Returns
-------
str
A new value provided by the user.
Raises
------
SystemExit
If max attempts are reached or user provides the same value repeatedly.
"""
count = 0
while True:
try_again_val = click.prompt(
f"Unable to resolve {query_val}. Provide a different value for {metadata_table_id} or press enter to continue: ",
default="",
)
if try_again_val != query_val:
return try_again_val
else:
logger.error(
f"Your entered value {try_again_val} is the same as the value you are trying to resolve. Please provide a different value to check for in {metadata_table_id}, or exit with ctrl+c"
)
count += 1
if count >= max_attempts:
logger.error(f"Max attempts ({max_attempts}) reached. Exiting...")
exit(0)
[docs]
def ask_user_to_confirm_fuzzy_match(
query_val: str,
fuzzy_matches: Iterable[str],
metadata_table_id: str
) -> str:
"""
Ask the user to confirm a fuzzy match selection from a list of candidates.
Parameters
----------
query_val : str
The original query value that did not match exactly.
fuzzy_matches : Iterable[str]
A list of fuzzy match candidates.
metadata_table_id : str
The ID of the metadata table being queried.
Returns
-------
str
The value selected by the user (either from fuzzy matches or a new value).
Raises
------
SystemExit
If user chooses to exit during the selection process.
"""
if len(fuzzy_matches) == 1:
fuzzy_match_val = fuzzy_matches[0]
user_confirms = click.confirm(
f"{query_val} does not match any registered names. Did you mean {fuzzy_match_val}? (y/n): "
)
if user_confirms:
return fuzzy_match_val
else:
return ask_for_new_value(query_val, metadata_table_id)
if len(fuzzy_matches) > 1:
while True:
fuzzy_match_list = list(fuzzy_matches)
# Offer the user a numbered list of all the fuzzy matches
fuzzy_match_choices = [
f"{i+1}. {match}" for i, match in enumerate(fuzzy_match_list)
]
fuzzy_match_choices.append(f"{len(fuzzy_match_choices)+1}. Or, enter a new value")
fuzzy_match_str = "\n".join(fuzzy_match_choices)
user_choice = click.prompt(
f"Please select the number of the fuzzy match you meant to check for in {metadata_table_id}: \n{fuzzy_match_str}\n"
)
if user_choice.isdigit() and 1 <= int(user_choice) <= len(fuzzy_match_list):
fuzzy_match_val = fuzzy_match_list[int(user_choice) - 1]
return fuzzy_match_val
elif user_choice == str(len(fuzzy_match_choices)):
return ask_for_new_value(query_val, metadata_table_id)
else:
logger.error(
f"Invalid input. Please enter a number between 1 and {len(fuzzy_match_list)} or ctrl+c to exit"
)
return ask_for_new_value(query_val, metadata_table_id)
[docs]
def ask_user_to_decide_what_to_do_with_name_resolution(
df: pd.DataFrame,
generated_design_names: pd.Series,
original_input_columns: pd.Index
) -> pd.DataFrame:
"""
Ask the user to decide how to handle design name resolution conflicts.
When generated design names clash with expectations, this function presents
the user with options to either use the generated names, save them to a file,
or exit the process.
Parameters
----------
df : pd.DataFrame
The main DataFrame containing the order data.
generated_design_names : pd.Series
The generated design names that caused the conflict.
original_input_columns : pd.Index
The original column names from the input CSV.
Returns
-------
pd.DataFrame
The DataFrame with design names updated if user chose to use generated names.
Raises
------
SystemExit
If user chooses to save names to file or exit.
"""
use_generated_num = 1
save_generated_names_num = 2
exit_num = 3
user_choices = [
f"{use_generated_num}. Use the generated design names, continue to upload",
f"{save_generated_names_num}. Save the generated names to file, and exit",
f"{exit_num}. Exit",
]
logger.warning(
f"Design names clash with expectations based on the input configuration"
)
logger.warning(
f"WARNING: if you choose ({use_generated_num}), the 'raw' csv will be the original (mismatched) one provided at the beginning."
)
user_choice_list = "\n".join(user_choices)
user_choice = click.prompt(
f"Unable to find design names. Please select an option: \n{user_choice_list}\n"
)
if int(user_choice) == use_generated_num:
df[OrderUploaderInputColumns.DESIGN_NAME.value] = generated_design_names
elif int(user_choice) == save_generated_names_num:
user_output_path = click.prompt(
f"Please provide a path to save the generated design names: ",
default="generated_design_names.csv",
)
output_df = df[original_input_columns]
output_df[OrderUploaderInputColumns.DESIGN_NAME.value] = generated_design_names
output_df.to_csv(user_output_path, index=False)
logger.warning(f"Generated design names saved to {user_output_path}")
exit(0)
elif int(user_choice) == exit_num:
exit(0)
return df
[docs]
def check_scu_against_oracle_with_fuzzy_find_on_fail(
score_columns_to_check: Set[str],
_recursion_depth: int = 0,
all_tables: Optional[Dict[str, List[Dict[str, str]]]] = None,
keymap: Optional[Dict[str, str]] = None,
) -> Tuple[Dict[str, List[str]], Set[str], Dict[str, str]]:
"""
Check score columns against SCU oracle with fuzzy matching fallback.
Validates score column names against the SCU (Score Column Unit) schema.
If exact matches fail, provides fuzzy matching and user interaction to
resolve column names. Handles orphaned columns that cannot be matched.
Parameters
----------
score_columns_to_check : Set[str]
Set of score column names to validate.
_recursion_depth : int, optional
Internal recursion depth counter, by default 0.
all_tables : Optional[Dict[str, List[Dict[str, str]]]], optional
Cached SCU tables data to avoid repeated API calls, by default None.
keymap : Optional[Dict[str, str]], optional
Mapping of original column names to corrected names, by default None.
Returns
-------
Tuple[Dict[str, List[str]], Set[str], Dict[str, str]]
A tuple containing:
- Dictionary mapping table IDs to lists of found field names
- Set of orphaned score columns that couldn't be matched
- Dictionary mapping original column names to corrected names
Raises
------
SystemExit
If maximum recursion depth is reached.
ValueError
If there are errors retrieving SCU tables.
"""
if _recursion_depth > 10:
logger.error(
f"Max recursion depth ({_recursion_depth}) reached, you will need to start from the beginning. Exiting..."
)
exit(0)
score_column_results = check_scu_schema(
{
MetadataColumns.FIELD.value: list(score_columns_to_check),
},
all_tables=all_tables,
)
found_columns_set = set()
for table_id, found_fields in score_column_results.items():
found_columns_set.update(found_fields)
orphaned_score_columns = score_columns_to_check - found_columns_set
if len(orphaned_score_columns) == 0:
return score_column_results, set(), keymap or {}
else:
if _recursion_depth == 0:
click.echo(
f"Unable to find these columns in the registered SCU values: {orphaned_score_columns}"
)
click.echo(
f"WARNING:If you choose a fuzzy match, your score tables uploaded will not match the raw uploaded csv"
)
check_for_fuzzy_matches = click.confirm(
f"Would you like to check for fuzzy matches? (y/n): "
)
if not check_for_fuzzy_matches:
return score_column_results, orphaned_score_columns, keymap or {}
else:
return score_column_results, orphaned_score_columns, keymap or {}
all_scu_tables = get_all_scu_tables()
if all_scu_tables.get("success") == False:
raise ValueError(f"Error getting all SCU tables: {all_scu_tables.get('error')}")
for table_id, table_data in all_scu_tables.items():
cols_to_check = list(
orphaned_score_columns
) # Don't make new orphans! It breaks our keymapping
for check_val in cols_to_check:
scu_fields = set(row.get(MetadataColumns.FIELD.value) for row in table_data) # table is list of dicts
fuzzy_match_results = fuzzy_match(
check_val,
scu_fields,
threshold=70,
)
if fuzzy_match_results:
new_value = ask_user_to_confirm_fuzzy_match(
check_val,
[match[0] for match in fuzzy_match_results],
table_id,
)
if not new_value:
continue
found_score_results, _, keymap = (
check_scu_against_oracle_with_fuzzy_find_on_fail(
{new_value},
_recursion_depth=_recursion_depth + 1,
all_tables=all_scu_tables,
keymap=keymap,
)
)
if found_score_results:
for table_id, found_fields in found_score_results.items():
score_column_results_list_at_table_id = (
score_column_results.get(table_id, [])
)
for new_value in found_fields: # should only ever be one
if new_value not in score_column_results_list_at_table_id:
score_column_results_list_at_table_id.append(new_value)
score_column_results[table_id] = (
score_column_results_list_at_table_id
)
found_columns_set.add(new_value)
orphaned_score_columns.remove(check_val)
keymap[check_val] = (
new_value # map the original value to the new value, we're ok with overwriting in a loop, we never put anything new in check_val
)
if _recursion_depth == 0 and orphaned_score_columns:
click.echo(
f"Unable to find these columns in the registered SCU values: {orphaned_score_columns}"
)
rename_manually = click.confirm(
f"Would you like to rename the columns manually (and check them against the oracle) (y), or keep non-indexed score columns (n)? (y/n): "
)
if rename_manually:
remaining_orphaned_columns_list = list(orphaned_score_columns)
for non_found_score in remaining_orphaned_columns_list:
new_score_name = click.prompt(
f"Please provide a new name for {non_found_score}: "
)
found_score_results, _, keymap = (
check_scu_against_oracle_with_fuzzy_find_on_fail(
{new_score_name},
_recursion_depth=_recursion_depth + 1,
all_tables=all_scu_tables,
keymap=keymap,
)
)
if found_score_results:
for table_id, found_fields in found_score_results.items():
score_column_results_list_at_table_id = (
score_column_results.get(table_id, [])
)
for new_value in found_fields: # should only ever be one
if new_value not in score_column_results_list_at_table_id:
score_column_results_list_at_table_id.append(new_value)
score_column_results[table_id] = (
score_column_results_list_at_table_id
)
found_columns_set.add(new_value)
orphaned_score_columns.remove(new_value)
keymap[non_found_score] = (
new_value # map the original value to the new value, we're ok with overwriting in a loop, we never put anything new in check_val
)
if orphaned_score_columns:
click.echo(
f"These values were never found in the registered SCU values, they will be uploaded as non-indexed score columns: {orphaned_score_columns}"
)
return score_column_results, orphaned_score_columns, keymap if keymap else {}
[docs]
def check_tag_location_with_fuzzy_find_on_fail(df: pd.DataFrame,tag_column_name:str=OrderUploaderInputColumns.TAG_LOCATION.value) -> pd.DataFrame:
"""
Check tag location with fuzzy matching fallback.
Validates tag location as one of two values: "N-term" or "C-term". No others are allowed.
Parameters
----------
df : pd.DataFrame
The input DataFrame containing the tag location column.
tag_column_name : str
The name of the tag location column.
Returns
-------
pd.DataFrame
The input DataFrame with the tag location column fixed.
"""
# TODO maybe make this an enum or call oracle? Seems silly, but it is schema enforced as an enum on benchling, and will fail cryptically if we change that end.
allowed_vals = ["N-term", "C-term"]
n_term_mask = df[tag_column_name] == allowed_vals[0]
c_term_mask = df[tag_column_name] == allowed_vals[1]
either_or_mask = n_term_mask | c_term_mask
correction_dict = {val:val for val in allowed_vals}
if not either_or_mask.all():
unique_wrong_vals = df[tag_column_name][~either_or_mask].unique()
for wrong_val in unique_wrong_vals:
logger.warning(f"Tag location {wrong_val} is not one of the allowed values: N-term or C-term.")
val_is_wrong = True
wrong_entry_count = 0
while val_is_wrong:
fuzzy_match_results = fuzzy_match(wrong_val, allowed_vals, threshold=70)
result_list = [match[0] for match in fuzzy_match_results]
new_val = ask_user_to_confirm_fuzzy_match(wrong_val, result_list, "tag_locations: N-term or C-term")
if not new_val:
continue
if new_val in allowed_vals:
val_is_wrong = False
correction_dict[wrong_val] = new_val
else:
wrong_val = new_val
wrong_entry_count += 1
if wrong_entry_count > 3:
logger.error(f"Tries to infer a valid tag location for {wrong_val} exceeded. Please check the input CSV and try again.")
exit(0)
df[tag_column_name] = df[tag_column_name].map(correction_dict)
return df
@click.command()
@click.option(
"--input-csv-path",
type=str,
required=True,
help="The path to the input CSV file. If the sequence_name column is well formatted and follows schema, then all you need is the csv of: design_name, sequence, tag_location",
)
@click.option("--program-id", type=str, default=None)
@click.option("--target-id", type=str, default=None)
@click.option("--binding-site-id", type=str, default=None)
@click.option("--user-id", type=str, default=None)
@click.option("--monday-ticket-link", type=str, default=None)
@click.option("--iteration-number", type=str, default=None)
@click.option("--fusion_id", type=str, default=None)
@click.option(
"--additional-upload-paths",
default=None,
type=click.Path(exists=True),
multiple=True,
help="Additional paths to upload, can be used to upload any files or directories recursively to the order bucket. Useful for if you used an unusual config and want to record it",
)
@click.option(
"--allow-cli-override",
is_flag=True,
default=False,
help="Allow CLI arguments to override the data in the input CSV, default is to fail and complain",
)
def upload_order(
input_csv_path: str,
program_id: Optional[str],
target_id: Optional[str],
binding_site_id: Optional[str],
user_id: Optional[str],
monday_ticket_link: Optional[str],
iteration_number: Optional[str],
fusion_id: Optional[str],
additional_upload_paths: Tuple[str, ...],
allow_cli_override: bool,
) -> None:
"""
Upload an order to Benchling, validate and/or generate the sequence names and score columns.
If the input CSV sequence_name column is well formatted and follows schema, then all you need
is the csv of: design_name, sequence, tag_location
You can also omit the name, as long as you provide either a column with the appropriate
program id, binder id, binding site id, etc. You may also provide missing data as an argument,
which will override the data in the csv (and will regenerate the names).
Parameters
----------
input_csv_path : str
The path to the input CSV file.
program_id : Optional[str]
The ID of the program.
target_id : Optional[str]
The ID of the target.
binding_site_id : Optional[str]
The ID of the binding site.
user_id : Optional[str]
The ID of the user.
monday_ticket_link : Optional[str]
The link to the Monday ticket.
iteration_number : Optional[str]
The iteration number.
fusion_id : Optional[str]
The ID of the fusion.
additional_upload_paths : Tuple[str, ...]
Additional paths to upload, can be used to upload any files or directories
recursively to the order bucket. Useful for if you used an unusual config
and want to record it.
allow_cli_override : bool
Allow CLI arguments to override the data in the input CSV, default is to
fail and complain.
Raises
------
FileNotFoundError
If the input CSV file is not found.
ValueError
If required columns are missing or if there are validation errors.
SystemExit
If user chooses to exit during interactive prompts.
"""
level = os.getenv("PYTHON_LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=level)
# Load input CSV data
if not os.path.exists(input_csv_path):
raise FileNotFoundError(f"Input CSV file not found: {input_csv_path}")
df = pd.read_csv(input_csv_path)
original_input_columns = df.columns
df.index = range(1, len(df) + 1)
# Validate required columns
required_columns = get_required_columns()
all_indexed_input_columns = get_all_indexed_input_columns()
potential_score_columns = set(
[col for col in df.columns if col not in all_indexed_input_columns]
)
if potential_score_columns:
logger.warning(
f"Potential score columns found in input CSV: {potential_score_columns}. These will be uploaded to the order bucket."
)
missing_columns = [col for col in required_columns if col not in df.columns]
have_design_column = OrderUploaderInputColumns.DESIGN_NAME.value in df.columns
# we actually want
if missing_columns:
# Check if tag_location or sequence are missing - these are mandatory
mandatory_missing = [col for col in required_columns if col in missing_columns]
if mandatory_missing:
raise ValueError(
f"Required columns missing from input CSV: {mandatory_missing}. Tag location and sequence MUST be provided."
)
# If only design_name is missing, we can construct it later if we have the necessary parameters
if not have_design_column:
print(
f"Warning: {OrderUploaderInputColumns.DESIGN_NAME.value} column is missing. Will construct from other parameters if possible."
)
else:
df = initialize_df_with_design_name(df)
df = check_tag_location_with_fuzzy_find_on_fail(df)
if monday_ticket_link:
ticket_details = ticket_details_from_monday_service(monday_ticket_link)
if ticket_details.get("success") == False:
logger.error(
f"Error getting Monday ticket details: {ticket_details.get('error')}"
)
continue_execution = click.confirm(f"Continue execution? (y/n): ")
if not continue_execution:
logger.error("Exiting...")
exit(0)
logger.info(f"Ticket details: {ticket_details}")
monday_program_id_from_api = ticket_details.get(MondayBoardFields.PROJECT.value)
monday_target_id_from_api = ticket_details.get(MondayBoardFields.TARGET.value)
monday_iteration_code_from_api = ticket_details.get(
MondayBoardFields.ITERATION_CODE.value
)
logger.info(f"Monday program id from api: {monday_program_id_from_api}")
logger.info(f"Monday target id from api: {monday_target_id_from_api}")
logger.info(f"Monday iteration code from api: {monday_iteration_code_from_api}")
program_id_query = confirm_single_value_from_user(
"program_id", df, program_id, allow_cli_override
)
program_id_benchling, program_id_design, program_id_monday = (
check_metadata_against_oracle_with_fuzzy_find_on_fail(
MetadataTables.PROGRAM_TABLE.value,
program_id_query,
try_to_find_monday_id=monday_program_id_from_api,
)
)
if program_id_monday != monday_program_id_from_api:
logger.warning(
f"Registered Monday Program ID {program_id_monday} does not match the actual Monday ID {monday_program_id_from_api} on the ticket"
)
continue_execution = click.confirm(f"Continue execution? (y/n): ")
if not continue_execution:
logger.error("Exiting...")
exit(0)
# check None, check fuzzy, prompt user
target_id_query = confirm_single_value_from_user(
"target_id", df, target_id, allow_cli_override
)
(
matching_program_id,
matching_target_id_benchling,
matching_target_id_design,
matching_target_id_monday,
matching_target_id_internal,
) = check_metadata_against_oracle_with_fuzzy_find_on_fail(
MetadataTables.TARGET_TABLE.value,
target_id_query,
allowed_other_ids=[
program_id_benchling,
program_id_design,
program_id_monday,
monday_program_id_from_api,
],
try_to_find_monday_id=monday_target_id_from_api,
)
if matching_target_id_monday != monday_target_id_from_api:
logger.warning(
f"Registered Monday Target ID {matching_target_id_monday} does not match the actual Monday ID {monday_target_id_from_api} on the ticket"
)
continue_execution = click.confirm(f"Continue execution? (y/n): ")
if not continue_execution:
logger.error("Exiting...")
exit(0)
binding_site_queries = confirm_set_of_values_from_user(
"binding_site_id", df, binding_site_id, allow_cli_override
)
binding_site_id_to_binding_site_oracle_data = {}
for binding_site_query in binding_site_queries:
matching_target_id, binding_site_id_benchling, binding_site_id_design = (
check_metadata_against_oracle_with_fuzzy_find_on_fail(
MetadataTables.BINDING_SITE_TABLE.value,
binding_site_query,
allowed_other_ids=[
matching_target_id_benchling,
matching_target_id_design,
],
)
)
binding_site_id_to_binding_site_oracle_data[binding_site_query] = {
"target_id": matching_target_id,
"binding_site_id_benchling": binding_site_id_benchling,
"binding_site_id_design": binding_site_id_design,
}
fusion_id_queries = confirm_set_of_values_from_user(
"fusion_id", df, fusion_id, allow_cli_override
)
fusion_id_to_fusion_oracle_data = {}
for fusion_id_query in fusion_id_queries:
matching_target_id, fusion_id_benchling, fusion_id_design = (
check_metadata_against_oracle_with_fuzzy_find_on_fail(
MetadataTables.FUSION_TABLE.value,
fusion_id_query,
)
)
fusion_id_to_fusion_oracle_data[fusion_id_query] = {
"target_id": matching_target_id,
"fusion_id_benchling": fusion_id_benchling,
"fusion_id_design": fusion_id_design,
}
iteration = confirm_single_value_from_user(
"iteration_number", df, iteration_number, allow_cli_override
)
if iteration != monday_iteration_code_from_api:
logger.warning(
f"Registered Monday Iteration Code {iteration} does not match the actual Monday Iteration Code {monday_iteration_code_from_api} on the ticket"
)
continue_execution = click.confirm(f"Continue execution? (y/n): ")
if not continue_execution:
logger.error("Exiting...")
exit(0)
logger.info(f"Iteration: {iteration}")
logger.info(f"Matching target id design: {matching_target_id_design}")
logger.info(
f"Number of binding sites: {len(binding_site_id_to_binding_site_oracle_data)}"
)
for (
binding_site_query,
binding_site_oracle_data,
) in binding_site_id_to_binding_site_oracle_data.items():
logger.info(f"Binding site query: {binding_site_query}")
logger.info(f"Binding site oracle data: {binding_site_oracle_data}")
logger.info(f"Number of fusion ids: {len(fusion_id_to_fusion_oracle_data)}")
for (
fusion_id_query,
fusion_id_oracle_data,
) in fusion_id_to_fusion_oracle_data.items():
logger.info(f"Fusion id query: {fusion_id_query}")
logger.info(f"Fusion id oracle data: {fusion_id_oracle_data}")
try:
generated_design_names = df.apply(
pdapply_build_design_names_from_row,
args=(
iteration,
matching_target_id_design,
binding_site_id,
fusion_id,
allow_cli_override,
),
axis=1,
)
check_generated_design_names(
df,
generated_design_names,
allow_cli_override,
)
except UnableToFindMetadataError as e:
logger.error(e)
df = ask_user_to_decide_what_to_do_with_name_resolution(
df, generated_design_names, original_input_columns
)
small_table_data = df[get_small_table_fields()]
# rename from input schema to the schema the service expects (ideally no change)
small_table_data.rename(
columns=get_small_table_rename_dict(),
inplace=True,
)
score_column_results, orphaned_score_columns, keymap = (
check_scu_against_oracle_with_fuzzy_find_on_fail(
set(potential_score_columns),
)
)
order_folder_response = create_benchling_order_folder(
program_id_benchling, matching_target_id_benchling, iteration
)
protein_registry_folder_id = order_folder_response.get("registry_folder_id")
if not protein_registry_folder_id:
raise ValueError(
f"Failed to create protein registry folder for {program_id_benchling}, {matching_target_id_benchling}, {iteration}"
)
small_table_as_serializable_dict = small_table_data.to_dict("records")
register_protein_entities_response = register_protein_entities(
protein_registry_folder_id, small_table_as_serializable_dict
)
error = register_protein_entities_response.get("error")
if error:
raise ValueError(error)
aa_sequences = register_protein_entities_response["aaSequences"]
design_name2registry_id = {
aa_sequence["name"]: aa_sequence["entityRegistryId"]
for aa_sequence in aa_sequences
}
small_table_data[OrderUploaderColumns.ENTITY_REGISTRY_ID.value] = small_table_data[
OrderUploaderColumns.DESIGN_NAME.value
].map(design_name2registry_id)
# Upload data to GCS bucket
bucket_path = f"{program_id_design}/{matching_target_id_benchling}/{iteration}"
gcs_bucket = DataBuckets.ORDER_UPLOADER_GCS_BUCKET.value
# Get storage client from utilities
storage_client = get_storage_client()
bucket = storage_client.bucket(gcs_bucket)
# Upload the small table as a csv to GCS
csv_bytes = pandas_df_to_csv_bytes(small_table_data)
blob = bucket.blob(f"{bucket_path}/benchling_sequence_table.csv")
blob.upload_from_file(csv_bytes)
for table_id, found_fields in score_column_results.items():
col_remap = {col:keymap[col] for col in df.columns if col in keymap.keys()}
new_df = df.copy()
new_df.rename(columns=col_remap, inplace=True)
just_these_fields = new_df[found_fields]
csv_bytes = pandas_df_to_csv_bytes(just_these_fields)
blob = bucket.blob(f"{bucket_path}/{table_id}.csv")
blob.upload_from_file(csv_bytes)
if orphaned_score_columns:
logger.warning(
f"Score columns {orphaned_score_columns} not found in SCU schema"
)
orphaned_score_cols_list = list(orphaned_score_columns)
csv_bytes = pandas_df_to_csv_bytes(df[orphaned_score_cols_list])
blob = bucket.blob(f"{bucket_path}/orphaned_score_columns.csv")
blob.upload_from_file(csv_bytes)
for path in additional_upload_paths:
# check if path is a file or a directory
if os.path.isfile(path):
blob = bucket.blob(f"{bucket_path}/{path}")
blob.upload_from_filename(path)
elif os.path.isdir(path):
for root, dirs, files in os.walk(path):
for file in files:
blob = bucket.blob(f"{bucket_path}/{os.path.join(root, file)}")
blob.upload_from_filename(os.path.join(root, file))
entry_name = f"{program_id_benchling}_{matching_target_id_benchling}_{iteration}"
try:
user_id,user_email,user_full_name,cluster_id = check_user_id(user_id)
except Exception as e:
logger.error(f"Error getting user info: {e}")
raise e
iteration_folder_id = order_folder_response.get("iteration_folder_id")
# Create the benchling entry
benchling_entry_query_dict = {
ServiceKeys.SEQUENCE_RECORDS.value: small_table_data.to_dict("records"),
ServiceKeys.BIG_CSV_DATA.value: pd.read_csv(input_csv_path).to_dict("records"),
ServiceKeys.ENTRY_NAME.value: entry_name,
ServiceKeys.GCS_BUCKET.value: f"{gcs_bucket}/{bucket_path}",
ServiceKeys.AUTHOR_ZIPTX_EMAIL.value: user_email,
ServiceKeys.MONDAY_TICKET_URL.value: monday_ticket_link,
ServiceKeys.ITERATION_FOLDER_ID.value: iteration_folder_id,
}
entry_creation_response = publish_benchling_entry(benchling_entry_query_dict)
if entry_creation_response.get("error"):
raise ValueError(
"error publishing benchling entry: "
+ "\n"
+ str(entry_creation_response.get("error"))
)
if __name__ == "__main__":
upload_order()