pinder.core.utils package#
Submodules#
pinder.core.utils.cloud module#
- exception pinder.core.utils.cloud.Error(message: str | None = None, **kwargs: str)[source][source]#
Bases:
Exception
Base error class
Child classes have a title and a message format.
- title: str | None = None#
- code: int | None = None#
- message_format: str | None = None#
- exception pinder.core.utils.cloud.GCPError(message: str | None = None, **kwargs: str)[source][source]#
Bases:
Error
- message_format: str | None = 'Unexpected GCP Error'#
- code: int | None = 502#
- title: str | None = 'Bad Gateway'#
- exception pinder.core.utils.cloud.GCPTimeOutError(message: str | None = None, **kwargs: str)[source][source]#
Bases:
GCPError
- message_format: str | None = 'The GCP request you have made timed out after attempting to reconnect'#
- code: int | None = 504#
- title: str | None = 'Gateway Timeout'#
- pinder.core.utils.cloud.get_cpu_limit(reserve_cpu: int = 0, quota_path: Path = PosixPath('/sys/fs/cgroup/cpu/cpu.cfs_quota_us'), period_path: Path = PosixPath('/sys/fs/cgroup/cpu/cpu.cfs_period_us'), shares_path: Path = PosixPath('/sys/fs/cgroup/cpu/cpu.shares')) int [source][source]#
See https://bugs.python.org/issue36054 for more details. Attempt to correctly obtain the available CPU resources allocated to a given container since os.cpu_count always returns the machine resources, not the resources allocated to the container.
- pinder.core.utils.cloud.retry_bucket_method(source_blob: Blob, target_blob: Blob, method: str, timeout: Tuple[int, int] = (5, 120), retries: int = 5) None [source][source]#
Retry wrapper around bucket.copy_blob to tolerate intermittent network outages. Can be generalized to other Bucket methods which are not also provided as convenience methods on the Blob object.
- Parameters:
- source_blobBlob
blob to copy
- target_blobBlob
new location of source blob
- timeouttuple, default=(5, 120)
timeout forwarded to Bucket method
- retriesint, default=5
how many times to attempt the method
- pinder.core.utils.cloud.bucket_process_many(sources: List[Blob], targets: List[Blob], method: str, global_timeout: int = 600, timeout: Tuple[int, int] = (5, 120), retries: int = 5, threads: int = 4) None [source][source]#
Use a ThreadPoolExecutor to execute multiple bucket operations concurrently using the python storage API.
- Parameters:
- sourcesList[Blob]
source blobs
- targetsList[Blob]
target blobs
- methodstr
name of Bucket method to call
- global_timeoutint=600
timeout to wait for all operations to complete
- timeouttuple, default=(5, 120)
timeout forwarded to Bucket method
- retriesint, default=5
how many times to attempt the method
- threadsint, default=4
how many threads to use in the thread pool
- pinder.core.utils.cloud.retry_blob_method(blob: Blob, method: str, path: str, retries: int = 5, timeout: Tuple[int, int] = (5, 120)) None [source][source]#
- pinder.core.utils.cloud.process_many(path_pairs: List[Tuple[str, Blob]], method: str, global_timeout: int = 600, timeout: Tuple[int, int] = (5, 120), retries: int = 5, threads: int = 4) None [source][source]#
- pinder.core.utils.cloud.method_context(sources: List[Path], targets: List[Path]) Tuple[List[Path], List[Path], str] [source][source]#
- pinder.core.utils.cloud.make_path_pairs(local_paths: List[Path], remote_keys: List[str] | List[Path] | List[Blob], bucket: Bucket) List[Tuple[str, Blob]] [source][source]#
Create pairs of (str, Blob) for threaded processing.
- Parameters:
- local_pathsList[Path]
local file paths
- remote_keysList[str] | List[Path] | List[Blob]
remote key locations
- bucketBucket
client provider
- Returns:
- pairsList[Tuple[str, Blob]]
destinations and blobs
- class pinder.core.utils.cloud.Gsutil(client: Client | None = None)[source][source]#
Bases:
object
Attempt to achieve the efficiency of “gsutil -m” commands using the python storage API. This involves using a simple ThreadPoolExecutor to circumvent blocking network I/O and retain an ergonomic high-level user interface.
- DOWNLOAD: ClassVar[str] = 'download_to_filename'#
- UPLOAD: ClassVar[str] = 'upload_from_filename'#
- REMOTE_COPY: ClassVar[str] = 'copy_blob'#
- cp_paired_paths(sources: List[str] | List[Path], targets: List[str] | List[Path], bucket: Bucket | None = None, threads: int | None = None) None [source]#
Copy a collection of sources to collection of targets.
- Parameters:
- sourcesList[str | Path]
source files to copy
- targetsList[str | Path]
target destinations
- bucketAny
optionally pass existing client
- threadsint, default=None
explicit number of threads passed to ThreadPoolExecutor
- cp_paths(sources: List[str] | List[Path], target: str | Path, anchor: str | Path, bucket: Bucket | None = None, threads: int | None = None) None [source]#
Copy a collection of sources to target. target must be a single path.
- Parameters:
- sourcesList[str | Path]
source files to copy
- targetstr | Path
target destination
- anchorstr | Path
make source paths relative to anchor
- bucketAny
optionally pass existing client
- threadsint, default=None
explicit number of threads passed to ThreadPoolExecutor
- cp_dir(source: str | Path, target: str | Path, bucket: Any | None = None, threads: int | None = None) None [source]#
Copy an entire source directory to target. Assumes everything processed will be relative to source and target.
- Parameters:
- sourcestr | Path
source directory to copy
- targetstr | Path
target destination
- bucketAny
optionally pass existing client
- threadsint, default=None
explicit number of threads passed to ThreadPoolExecutor
Examples
For downloading flat directories like:
gs://{bucket}/{flat_path} ├── data.csv ├── other_data.csv └── ...
>>> Gsutil().cp_dir( ... f"gs://{bucket}/{flat_path}", ... "/path/to/local_dir", ... )
# should result in /path/to/local_dir ├── data.csv ├── other_data.csv └── ...
For downloading nested directories like:
gs://{bucket}/{nested_path} ├── {nested_path}-0 │ └── item.json ├── {nested_path}-1 │ └── item.json └── ...
>>> Gsutil().cp_dir( ... f"gs://{bucket}/{nested_path}", ... "/path/to/local_dir", ... )
# should result in /path/to/local_dir ├── {nested_path}-0 │ └── item.json ├── {nested_path}-1 │ └── item.json └── ...
For uploading flat directories like:
/path/to/local_dir ├── data.csv ├── other_data.csv └── ...
>>> Gsutil().cp_dir( ... "/path/to/local_dir", ... f"gs://{bucket}/{target_path}", ... )
# should result in gs://{bucket}/{target_path} ├── data.csv ├── other_data.csv └── ...
For uploading nested directories like:
/path/to/local_dir ├── {nested_path}-0 │ └── item.json ├── {nested_path}-1 │ └── item.json └── ...
>>> Gsutil().cp_dir( ... "/path/to/local_dir", ... f"gs://{bucket}/{nested_path}", ... )
# should result in gs://{bucket}/{nested_path} ├── {nested_path}-0 │ └── item.json ├── {nested_path}-1 │ └── item.json └── ...
For downloading files using partial matches:
gs://{bucket}/{prefix}/{partial_match} ├── {partial_match}-0.csv ├── {partial_match}-1.csv └── ...
>>> Gsutil().cp_dir( ... "gs://{bucket}/{prefix}/{partial_match}", ... "test_dir/subdir", ... )
# should result in test_dir └── subdir ├── {partial_match}-0.csv ├── {partial_match}-1.csv └── ...
For uploading files using partial matches:
test_dir └── subdir ├── {partial_match}-0.csv ├── {partial_match}-1.csv └── ...
>>> Gsutil().cp_dir( ... "test_dir/subdir/{partial_match}", ... "gs://{bucket}/{prefix}/test_upload", ... )
# should result in gs://{bucket}/{prefix}/test_upload ├── {partial_match}-0.csv ├── {partial_match}-1.csv └── ...
- ls(target: str | Path, bucket: Any | None = None, recursive: bool = True) List[Path] [source]#
List the contents of a remote directory, returning full paths to files including gs://{bucket_name}.
- Parameters:
- targetstr | Path
root path for remote files to list
- bucketAny
optionally pass existing client
- recursivebool
recursively list files in sub-directories
- Returns:
- pathsList[Path]
full remote paths to listed files
- ls_blobs(target: str | Path, bucket: Any | None = None, versions: bool = False) List[Blob] [source]#
Remote ls returning blob objects instead of paths
- Parameters:
- targetstr | Path
root path for remote files to list
- bucketAny
optionally pass existing client
- versionsbool, default=False
if True, include blob versions
- Returns:
- pathsList[Path]
full remote paths to listed files
- remote_cp(sources: List[str | Path], targets: List[str | Path], bucket: Any | None = None) None [source]#
Transfer remote files from one location to another without downloading them to a local context first. Assumes all source files reside in the same bucket, and all target files reside in the same bucket, but the source bucket can be different from the target bucket.
- Parameters:
- sourcesList[str | Path]
full remote paths to copy from
- targetsList[str | Path]
full remote paths to copy to
- bucketAny
optionally pass existing bucket (assumes source and target buckets are the same bucket)
- pinder.core.utils.cloud.gcs_read_dataframe(gcs_uri: str | Path, fs: GCSFileSystem | None = None, token: str | None = 'anon') DataFrame [source][source]#
Read remote files directly into pandas.DataFrame with anonymous client credentials.
If the gcsfs.GCSFileSystem object is not provided, one will be created with token set to anon (no authentication is performed), and you can only access data which is accessible to IAM principal allUsers.
- Parameters:
- gcs_uristr | Path
full remote paths to read. Must end with .csv, .csv.gz, or .parquet extension.
- fsgcsfs.GCSFileSystem
optionally pass an existing authenticated GCSFileSystem object to use.
- tokenstr | None
optionally pass a token type to use for authenticating requests. Default is “anon” (only public objects). If an authentication error is raised, the method is retried with token=None to attempt to infer credentials in the following order: gcloud CLI default, gcsfs cached token, google compute metadata service, anonymous.
pinder.core.utils.constants module#
pinder.core.utils.dataclass module#
- pinder.core.utils.dataclass.atom_array_summary_markdown_repr(array: AtomArray) str [source][source]#
pinder.core.utils.log module#
- pinder.core.utils.log.setup_logger(logger_name: str | None = None, log_level: int = 20, log_file: str | None = None, propagate: bool = False) Logger [source][source]#
Setup logger for the module name as the logger name by default for easy tracing of what’s happening in the code
- Parameters:
- logger_namestr
Name of the logger
- log_levelint
Log level
- log_file: str | None
optional log file to write to
- propagatebool
propagate log events to parent loggers, default = False
- Returns:
- logging.Logger:
logger object
Examples
>>> logger = setup_logger("some_logger_name") >>> logger.name 'some_logger_name' >>> logger.level 20 >>> logger = setup_logger(log_level=logging.DEBUG) >>> logger.name 'log.py' >>> logger.level 10
- pinder.core.utils.log.inject_logger(log_level: int = 20, log_file: str | None = None) Callable[[Callable[[P], T]], Callable[[P], T]] [source][source]#
A decorator function that injects a logger into the function
- Parameters:
- log_level: integer representing the log level (default: DEFAULT_LOGGING_LEVEL)
- log_file: optional file path to write logs to
Examples
>>> @inject_logger() ... def my_function(name, log): ... log.info(f"hello {name}") ... return name >>> my_function(name="pinder") 'pinder' >>> # 2023-11-01 09:15:37,683 | __main__.my_function:3 | INFO : hello pinder >>> @inject_logger(log_file="my_log.txt") # this will write logs to my_log.txt ... def my_function_writing_to_file(name, log): ... log.info(f"hello {name}") ... return name >>> my_function_writing_to_file(name="pinder") 'pinder' >>> # 2023-11-01 09:15:37,683 | __main__.my_function:3 | INFO : hello pinder >>> >>> @inject_logger() ... def my_bad(name): ... log.info(f"hello {name}") ... return name >>> my_bad(name="pinder") Traceback (most recent call last): ... core.utils.log.PinderLoggingError: The function 'core.utils.log.my_bad' should contain a variable named 'log' >>> >>> @inject_logger(log_level=logging.DEBUG) ... def my_function(name, log): ... log.debug(f"hello {name}") ... return name >>> my_function(name="pinder") 'pinder' >>> # 2023-11-01 10:23:20,456 | __main__.my_function:3 | DEBUG : hello pinder
pinder.core.utils.paths module#
Useful path utilities
See https://stackoverflow.com/questions/9727673/list-directory-tree-structure-in-python for the source of the tree function.
- pinder.core.utils.paths.expand_local_path(path: Path) list[Path] [source][source]#
Support glob path expansion from path glob shorthand.
- Parameters:
- pathPath
path, potentially including asterisks for globbing
- Returns:
- pathslist[Path]
paths that match the input path
- pinder.core.utils.paths.tree(dir_path: Path, prefix: str = '') Generator[str, str, None] [source][source]#
Recurse a directory path and build a unix tree analog to be printed for debugging purposes.
- Parameters:
- dir_pathPath
directory to tree
- prefixstr, default=’’
exposed for recursion
- Returns:
- gengenerator
the formatted directory contents
- pinder.core.utils.paths.blob_tree(gs_path: Path, gs: Any | None = None) Generator[str, str, None] [source][source]#
Like tree, but for blobs.
- pinder.core.utils.paths.blob_tree_cmd(argv: list[str] | None = None, gs: Any | None = None) None [source][source]#
- pinder.core.utils.paths.parallel_copy_files(src_files: list[Path], dest_files: list[Path], use_cache: bool = True, max_workers: int | None = None, parallel: bool = True) None [source][source]#
Safely copy list of source files to destination filepaths.
Operates in parallel and assumes that source files all exist. In case any NFS errors cause stale file stat or any other issues are encountered, the copy operation is retried up to 10 times before silently exiting.
- Parameters:
- src_fileslist[Path]
List of source files to copy. Assumes source is a valid file.
- dest_fileslist[Path]
List of fully-qualified destination filepaths. Assumes target directory exists.
- use_cachebool
Whether to skip copy if destination exists.
- max_workersint, optional
Limit number of parallel processes spawned to max_workers.
pinder.core.utils.pdb_utils module#
Utility functions for working with pdbs
Adapted from Raptorx3DModelling/Common/PDBUtils.py
- pinder.core.utils.pdb_utils.load_fasta_file(seq_file: str | Path, as_str: bool = True) str | FastaFile [source][source]#
Load a fasta file.
- Parameters:
- seq_filestr | Path
file to read (fasta) sequence from.
- as_strbool
whether to return string representation (default) or biotite.sequence.io.fasta.FastaFile.
- Returns:
- str | FastaFile
Sequence as string or biotite.sequence.io.fasta.FastaFile.
- pinder.core.utils.pdb_utils.extract_pdb_seq_from_pdb_file(pdb_path: Path, chain_id: str | None = None) tuple[list[str], list[list[int]], list[str]] [source][source]#
- pinder.core.utils.pdb_utils.safe_load_sequence(seq_path: str | None = None, pdb_path: str | None = None, chain_id: str | None = None) str [source][source]#
Loads sequence, either from fasta or given pdb file
seq_path takes priority over pdb_path. pdb_path or seq_path must be provided.
- Parameters:
seq_path (optional) – path to sequence fasta file
pdb_path (optional) – path to pdb file
chain_id (optional) – chain to load sequence from in
file (provided pdb)
pinder.core.utils.process module#
Helper functions for toggling serial or parallel processing via map and starmap.
pinder.core.utils.retry module#
- pinder.core.utils.retry.exponential_retry(max_retries: int, initial_delay: float = 1.0, multiplier: float = 2.0, exceptions: tuple[type[Exception], ...] = (<class 'Exception'>,)) Callable[[Callable[[P], T]], Callable[[P], T]] [source][source]#
Exponential backoff retry decorator.
Retries the wrapped function/method max_retries times if the exceptions listed in
exceptions
are thrown.- Parameters:
- max_retriesint
The max number of times to repeat the wrapped function/method.
- initial_delayfloat
Initial number of seconds to sleep after first failed attempt.
- multiplierfloat
Amount to multiply the delay by before the next attempt.
- exceptionstuple[Exception]
Tuple of exceptions that trigger a retry attempt.
pinder.core.utils.string module#
- pinder.core.utils.string.natural_sort(list_to_sort: list[str]) list[str] [source][source]#
Sorts the given iterable in the way that is expected.
This function sorts the given list in a natural order. For example, the list [‘A11’, ‘A9’, ‘A10’] will be sorted as [‘A9’, ‘A10’, ‘A11’].
- Parameters:
- list_to_sortList[str]
The list to be sorted.
- Returns:
- List[str]
The sorted list.
Examples
>>> natural_sort(['A11', 'A9', 'A10']) ['A9', 'A10', 'A11']
pinder.core.utils.timer module#
pinder.core.utils.unbound module#
Assess difficulty of unbound monomer or dimer with respect to bound holo.
- pinder.core.utils.unbound.fnat_unbound_summary(bound_contacts: set[tuple[str, str, int, int]], unbound_contacts: set[tuple[str, str, int, int]]) dict[str, float | int | str] [source][source]#
- pinder.core.utils.unbound.get_corresponding_residues(res_list: list[int], chain: str, mapping: dict[str, dict[int, int]]) tuple[list[int], list[int]] [source][source]#
Get corresponding residues with default mapping.
- pinder.core.utils.unbound.filter_interface_intersect(holo_interface: AtomArray, apo_interface: AtomArray, holo2apo_seq: dict[str, dict[int, int]], apo2holo_seq: dict[str, dict[int, int]], R_chain: str = 'R', L_chain: str = 'L') tuple[AtomArray, AtomArray] [source][source]#
- pinder.core.utils.unbound.get_unbound_interface_metrics(holo_complex: Structure, apo_RL: Structure, R_chain: str, L_chain: str, holo2apo_seq: dict[str, dict[int, int]], apo2holo_seq: dict[str, dict[int, int]], contact_rad: float = 5.0) dict[str, float | int | str] [source][source]#