pinder.core.utils package#

Submodules#

pinder.core.utils.cloud module#

exception pinder.core.utils.cloud.Error(message: str | None = None, **kwargs: str)[source][source]#

Bases: Exception

Base error class

Child classes have a title and a message format.

title: str | None = None#
code: int | None = None#
message_format: str | None = None#
exception pinder.core.utils.cloud.GCPError(message: str | None = None, **kwargs: str)[source][source]#

Bases: Error

message_format: str | None = 'Unexpected GCP Error'#
code: int | None = 502#
title: str | None = 'Bad Gateway'#
exception pinder.core.utils.cloud.GCPTimeOutError(message: str | None = None, **kwargs: str)[source][source]#

Bases: GCPError

message_format: str | None = 'The GCP request you have made timed out after attempting to reconnect'#
code: int | None = 504#
title: str | None = 'Gateway Timeout'#
pinder.core.utils.cloud.get_cpu_limit(reserve_cpu: int = 0, quota_path: Path = PosixPath('/sys/fs/cgroup/cpu/cpu.cfs_quota_us'), period_path: Path = PosixPath('/sys/fs/cgroup/cpu/cpu.cfs_period_us'), shares_path: Path = PosixPath('/sys/fs/cgroup/cpu/cpu.shares')) int[source][source]#

See https://bugs.python.org/issue36054 for more details. Attempt to correctly obtain the available CPU resources allocated to a given container since os.cpu_count always returns the machine resources, not the resources allocated to the container.

pinder.core.utils.cloud.get_container_cpu_frac(max_cpu_fraction: float = 0.9) int[source][source]#
pinder.core.utils.cloud.retry_bucket_method(source_blob: Blob, target_blob: Blob, method: str, timeout: Tuple[int, int] = (5, 120), retries: int = 5) None[source][source]#

Retry wrapper around bucket.copy_blob to tolerate intermittent network outages. Can be generalized to other Bucket methods which are not also provided as convenience methods on the Blob object.

Parameters:
source_blobBlob

blob to copy

target_blobBlob

new location of source blob

timeouttuple, default=(5, 120)

timeout forwarded to Bucket method

retriesint, default=5

how many times to attempt the method

pinder.core.utils.cloud.bucket_process_many(sources: List[Blob], targets: List[Blob], method: str, global_timeout: int = 600, timeout: Tuple[int, int] = (5, 120), retries: int = 5, threads: int = 4) None[source][source]#

Use a ThreadPoolExecutor to execute multiple bucket operations concurrently using the python storage API.

Parameters:
sourcesList[Blob]

source blobs

targetsList[Blob]

target blobs

methodstr

name of Bucket method to call

global_timeoutint=600

timeout to wait for all operations to complete

timeouttuple, default=(5, 120)

timeout forwarded to Bucket method

retriesint, default=5

how many times to attempt the method

threadsint, default=4

how many threads to use in the thread pool

pinder.core.utils.cloud.retry_blob_method(blob: Blob, method: str, path: str, retries: int = 5, timeout: Tuple[int, int] = (5, 120)) None[source][source]#
pinder.core.utils.cloud.process_many(path_pairs: List[Tuple[str, Blob]], method: str, global_timeout: int = 600, timeout: Tuple[int, int] = (5, 120), retries: int = 5, threads: int = 4) None[source][source]#
pinder.core.utils.cloud.method_context(sources: List[Path], targets: List[Path]) Tuple[List[Path], List[Path], str][source][source]#
pinder.core.utils.cloud.make_path_pairs(local_paths: List[Path], remote_keys: List[str] | List[Path] | List[Blob], bucket: Bucket) List[Tuple[str, Blob]][source][source]#

Create pairs of (str, Blob) for threaded processing.

Parameters:
local_pathsList[Path]

local file paths

remote_keysList[str] | List[Path] | List[Blob]

remote key locations

bucketBucket

client provider

Returns:
pairsList[Tuple[str, Blob]]

destinations and blobs

class pinder.core.utils.cloud.Gsutil(client: Client | None = None)[source][source]#

Bases: object

Attempt to achieve the efficiency of “gsutil -m” commands using the python storage API. This involves using a simple ThreadPoolExecutor to circumvent blocking network I/O and retain an ergonomic high-level user interface.

DOWNLOAD: ClassVar[str] = 'download_to_filename'#
UPLOAD: ClassVar[str] = 'upload_from_filename'#
REMOTE_COPY: ClassVar[str] = 'copy_blob'#
static process_many(*args: List[Tuple[str, Blob]], **kwargs: int | tuple[int, int]) None[source]#
static bucket_process_many(*args: list[Blob] | str, **kwargs: int | tuple[int, int]) None[source]#
get_bucket(value: str | Path, bucket: Bucket | None = None) Bucket[source]#
cp_paired_paths(sources: List[str] | List[Path], targets: List[str] | List[Path], bucket: Bucket | None = None, threads: int | None = None) None[source]#

Copy a collection of sources to collection of targets.

Parameters:
sourcesList[str | Path]

source files to copy

targetsList[str | Path]

target destinations

bucketAny

optionally pass existing client

threadsint, default=None

explicit number of threads passed to ThreadPoolExecutor

cp_paths(sources: List[str] | List[Path], target: str | Path, anchor: str | Path, bucket: Bucket | None = None, threads: int | None = None) None[source]#

Copy a collection of sources to target. target must be a single path.

Parameters:
sourcesList[str | Path]

source files to copy

targetstr | Path

target destination

anchorstr | Path

make source paths relative to anchor

bucketAny

optionally pass existing client

threadsint, default=None

explicit number of threads passed to ThreadPoolExecutor

cp_dir(source: str | Path, target: str | Path, bucket: Any | None = None, threads: int | None = None) None[source]#

Copy an entire source directory to target. Assumes everything processed will be relative to source and target.

Parameters:
sourcestr | Path

source directory to copy

targetstr | Path

target destination

bucketAny

optionally pass existing client

threadsint, default=None

explicit number of threads passed to ThreadPoolExecutor

Examples

For downloading flat directories like:

gs://{bucket}/{flat_path}
├── data.csv
├── other_data.csv
└── ...
>>> Gsutil().cp_dir(
...     f"gs://{bucket}/{flat_path}",
...     "/path/to/local_dir",
... ) 
# should result in
/path/to/local_dir
├── data.csv
├── other_data.csv
└── ...

For downloading nested directories like:

gs://{bucket}/{nested_path}
├── {nested_path}-0
│   └── item.json
├── {nested_path}-1
│   └── item.json
└── ...
>>> Gsutil().cp_dir(
...     f"gs://{bucket}/{nested_path}",
...     "/path/to/local_dir",
... ) 
# should result in
/path/to/local_dir
├── {nested_path}-0
│   └── item.json
├── {nested_path}-1
│   └── item.json
└── ...

For uploading flat directories like:

/path/to/local_dir
├── data.csv
├── other_data.csv
└── ...
>>> Gsutil().cp_dir(
...     "/path/to/local_dir",
...     f"gs://{bucket}/{target_path}",
... ) 
# should result in
gs://{bucket}/{target_path}
├── data.csv
├── other_data.csv
└── ...

For uploading nested directories like:

/path/to/local_dir
├── {nested_path}-0
│   └── item.json
├── {nested_path}-1
│   └── item.json
└── ...
>>> Gsutil().cp_dir(
...     "/path/to/local_dir",
...     f"gs://{bucket}/{nested_path}",
... ) 
# should result in
gs://{bucket}/{nested_path}
├── {nested_path}-0
│   └── item.json
├── {nested_path}-1
│   └── item.json
└── ...

For downloading files using partial matches:

gs://{bucket}/{prefix}/{partial_match}
├── {partial_match}-0.csv
├── {partial_match}-1.csv
└── ...
>>> Gsutil().cp_dir(
...     "gs://{bucket}/{prefix}/{partial_match}",
...     "test_dir/subdir",
... ) 
# should result in
test_dir
└── subdir
    ├── {partial_match}-0.csv
    ├── {partial_match}-1.csv
    └── ...

For uploading files using partial matches:

test_dir
└── subdir
    ├── {partial_match}-0.csv
    ├── {partial_match}-1.csv
    └── ...
>>> Gsutil().cp_dir(
...     "test_dir/subdir/{partial_match}",
...     "gs://{bucket}/{prefix}/test_upload",
... ) 
# should result in
gs://{bucket}/{prefix}/test_upload
├── {partial_match}-0.csv
├── {partial_match}-1.csv
└── ...
ls(target: str | Path, bucket: Any | None = None, recursive: bool = True) List[Path][source]#

List the contents of a remote directory, returning full paths to files including gs://{bucket_name}.

Parameters:
targetstr | Path

root path for remote files to list

bucketAny

optionally pass existing client

recursivebool

recursively list files in sub-directories

Returns:
pathsList[Path]

full remote paths to listed files

ls_blobs(target: str | Path, bucket: Any | None = None, versions: bool = False) List[Blob][source]#

Remote ls returning blob objects instead of paths

Parameters:
targetstr | Path

root path for remote files to list

bucketAny

optionally pass existing client

versionsbool, default=False

if True, include blob versions

Returns:
pathsList[Path]

full remote paths to listed files

remote_cp(sources: List[str | Path], targets: List[str | Path], bucket: Any | None = None) None[source]#

Transfer remote files from one location to another without downloading them to a local context first. Assumes all source files reside in the same bucket, and all target files reside in the same bucket, but the source bucket can be different from the target bucket.

Parameters:
sourcesList[str | Path]

full remote paths to copy from

targetsList[str | Path]

full remote paths to copy to

bucketAny

optionally pass existing bucket (assumes source and target buckets are the same bucket)

pinder.core.utils.cloud.gcs_read_dataframe(gcs_uri: str | Path, fs: GCSFileSystem | None = None, token: str | None = 'anon') DataFrame[source][source]#

Read remote files directly into pandas.DataFrame with anonymous client credentials.

If the gcsfs.GCSFileSystem object is not provided, one will be created with token set to anon (no authentication is performed), and you can only access data which is accessible to IAM principal allUsers.

Parameters:
gcs_uristr | Path

full remote paths to read. Must end with .csv, .csv.gz, or .parquet extension.

fsgcsfs.GCSFileSystem

optionally pass an existing authenticated GCSFileSystem object to use.

tokenstr | None

optionally pass a token type to use for authenticating requests. Default is “anon” (only public objects). If an authentication error is raised, the method is retried with token=None to attempt to infer credentials in the following order: gcloud CLI default, gcsfs cached token, google compute metadata service, anonymous.

pinder.core.utils.constants module#

pinder.core.utils.constants.update_letters(x: dict[str, list[str]], is_three: bool = True) dict[str, list[str]][source][source]#

pinder.core.utils.dataclass module#

pinder.core.utils.dataclass.atom_array_summary_markdown_repr(array: AtomArray) str[source][source]#
pinder.core.utils.dataclass.stringify_dataclass(obj: Any, indent: int = 4, _indents: int = 0, verbose_atom_array: bool = False) str[source][source]#

Pretty repr (or print) a (possibly deeply-nested) dataclass. Each new block will be indented by indent spaces (default is 4).

https://stackoverflow.com/questions/66807878/pretty-print-dataclasses-prettier-with-line-breaks-and-indentation

pinder.core.utils.log module#

exception pinder.core.utils.log.PinderLoggingError[source][source]#

Bases: Exception

pinder.core.utils.log.setup_logger(logger_name: str | None = None, log_level: int = 20, log_file: str | None = None, propagate: bool = False) Logger[source][source]#

Setup logger for the module name as the logger name by default for easy tracing of what’s happening in the code

Parameters:
logger_namestr

Name of the logger

log_levelint

Log level

log_file: str | None

optional log file to write to

propagatebool

propagate log events to parent loggers, default = False

Returns:
logging.Logger:

logger object

Examples

>>> logger = setup_logger("some_logger_name")
>>> logger.name
'some_logger_name'
>>> logger.level
20
>>> logger = setup_logger(log_level=logging.DEBUG)
>>> logger.name
'log.py'
>>> logger.level
10
pinder.core.utils.log.inject_logger(log_level: int = 20, log_file: str | None = None) Callable[[Callable[[P], T]], Callable[[P], T]][source][source]#

A decorator function that injects a logger into the function

Parameters:
log_level: integer representing the log level (default: DEFAULT_LOGGING_LEVEL)
log_file: optional file path to write logs to

Examples

>>> @inject_logger()
... def my_function(name, log):
...     log.info(f"hello {name}")
...     return name
>>> my_function(name="pinder")
'pinder'
>>> # 2023-11-01 09:15:37,683 | __main__.my_function:3 | INFO : hello pinder
>>> @inject_logger(log_file="my_log.txt") # this will write logs to my_log.txt
... def my_function_writing_to_file(name, log):
...     log.info(f"hello {name}")
...     return name
>>> my_function_writing_to_file(name="pinder")
'pinder'
>>> # 2023-11-01 09:15:37,683 | __main__.my_function:3 | INFO : hello pinder
>>>
>>> @inject_logger()
... def my_bad(name):
...     log.info(f"hello {name}")
...     return name
>>> my_bad(name="pinder")
Traceback (most recent call last):
    ...
core.utils.log.PinderLoggingError: The function 'core.utils.log.my_bad' should contain a variable named 'log'
>>>
>>> @inject_logger(log_level=logging.DEBUG)
... def my_function(name, log):
...     log.debug(f"hello {name}")
...     return name
>>> my_function(name="pinder")
'pinder'
>>> # 2023-11-01 10:23:20,456 | __main__.my_function:3 | DEBUG : hello pinder

pinder.core.utils.paths module#

Useful path utilities

See https://stackoverflow.com/questions/9727673/list-directory-tree-structure-in-python for the source of the tree function.

pinder.core.utils.paths.is_local_path(path: str | Path) bool[source][source]#
pinder.core.utils.paths.remote_key(path: str | Path) str[source][source]#
pinder.core.utils.paths.strip_glob(path: str | Path) Path[source][source]#
pinder.core.utils.paths.expand_local_path(path: Path) list[Path][source][source]#

Support glob path expansion from path glob shorthand.

Parameters:
pathPath

path, potentially including asterisks for globbing

Returns:
pathslist[Path]

paths that match the input path

pinder.core.utils.paths.rmdir(path: Path) None[source][source]#

pathlib native rmdir

pinder.core.utils.paths.tree(dir_path: Path, prefix: str = '') Generator[str, str, None][source][source]#

Recurse a directory path and build a unix tree analog to be printed for debugging purposes.

Parameters:
dir_pathPath

directory to tree

prefixstr, default=’’

exposed for recursion

Returns:
gengenerator

the formatted directory contents

pinder.core.utils.paths.blob_tree(gs_path: Path, gs: Any | None = None) Generator[str, str, None][source][source]#

Like tree, but for blobs.

pinder.core.utils.paths.blob_tree_cmd(argv: list[str] | None = None, gs: Any | None = None) None[source][source]#
pinder.core.utils.paths.empty_file(file: Path) bool[source][source]#
pinder.core.utils.paths.parallel_copy_files(src_files: list[Path], dest_files: list[Path], use_cache: bool = True, max_workers: int | None = None, parallel: bool = True) None[source][source]#

Safely copy list of source files to destination filepaths.

Operates in parallel and assumes that source files all exist. In case any NFS errors cause stale file stat or any other issues are encountered, the copy operation is retried up to 10 times before silently exiting.

Parameters:
src_fileslist[Path]

List of source files to copy. Assumes source is a valid file.

dest_fileslist[Path]

List of fully-qualified destination filepaths. Assumes target directory exists.

use_cachebool

Whether to skip copy if destination exists.

max_workersint, optional

Limit number of parallel processes spawned to max_workers.

pinder.core.utils.pdb_utils module#

Utility functions for working with pdbs

Adapted from Raptorx3DModelling/Common/PDBUtils.py

pinder.core.utils.pdb_utils.three_to_one(x: str) str[source][source]#
pinder.core.utils.pdb_utils.load_fasta_file(seq_file: str | Path, as_str: bool = True) str | FastaFile[source][source]#

Load a fasta file.

Parameters:
seq_filestr | Path

file to read (fasta) sequence from.

as_strbool

whether to return string representation (default) or biotite.sequence.io.fasta.FastaFile.

Returns:
str | FastaFile

Sequence as string or biotite.sequence.io.fasta.FastaFile.

pinder.core.utils.pdb_utils.extract_pdb_seq_from_pdb_file(pdb_path: Path, chain_id: str | None = None) tuple[list[str], list[list[int]], list[str]][source][source]#
pinder.core.utils.pdb_utils.safe_load_sequence(seq_path: str | None = None, pdb_path: str | None = None, chain_id: str | None = None) str[source][source]#

Loads sequence, either from fasta or given pdb file

seq_path takes priority over pdb_path. pdb_path or seq_path must be provided.

Parameters:
  • seq_path (optional) – path to sequence fasta file

  • pdb_path (optional) – path to pdb file

  • chain_id (optional) – chain to load sequence from in

  • file (provided pdb)

pinder.core.utils.pdb_utils.is_homodimer(chain_1_seq: str, chain_2_seq: str, min_seq_id: float = 0.9) bool[source][source]#

Whether the two sequences have a similarity above threshold.

pinder.core.utils.process module#

Helper functions for toggling serial or parallel processing via map and starmap.

pinder.core.utils.process.process_starmap(func: Callable[[...], Any], args: Iterable[Iterable[Any]], parallel: bool = True, max_workers: int | None = None) list[Any][source][source]#
pinder.core.utils.process.process_map(func: Callable[[...], Any], args: Iterable[Iterable[Any]], parallel: bool = True, max_workers: int | None = None) list[Any][source][source]#

pinder.core.utils.retry module#

pinder.core.utils.retry.exponential_retry(max_retries: int, initial_delay: float = 1.0, multiplier: float = 2.0, exceptions: tuple[type[Exception], ...] = (<class 'Exception'>,)) Callable[[Callable[[P], T]], Callable[[P], T]][source][source]#

Exponential backoff retry decorator.

Retries the wrapped function/method max_retries times if the exceptions listed in exceptions are thrown.

Parameters:
max_retriesint

The max number of times to repeat the wrapped function/method.

initial_delayfloat

Initial number of seconds to sleep after first failed attempt.

multiplierfloat

Amount to multiply the delay by before the next attempt.

exceptionstuple[Exception]

Tuple of exceptions that trigger a retry attempt.

pinder.core.utils.string module#

pinder.core.utils.string.natural_sort(list_to_sort: list[str]) list[str][source][source]#

Sorts the given iterable in the way that is expected.

This function sorts the given list in a natural order. For example, the list [‘A11’, ‘A9’, ‘A10’] will be sorted as [‘A9’, ‘A10’, ‘A11’].

Parameters:
list_to_sortList[str]

The list to be sorted.

Returns:
List[str]

The sorted list.

Examples

>>> natural_sort(['A11', 'A9', 'A10'])
['A9', 'A10', 'A11']

pinder.core.utils.timer module#

pinder.core.utils.timer.timeit(func: Callable[[P], T]) Callable[[P], T][source][source]#

Simple function timer decorator

pinder.core.utils.unbound module#

Assess difficulty of unbound monomer or dimer with respect to bound holo.

pinder.core.utils.unbound.get_db4_difficulty(irmsd: float, fnonnat: float) str[source][source]#
pinder.core.utils.unbound.fnat_unbound_summary(bound_contacts: set[tuple[str, str, int, int]], unbound_contacts: set[tuple[str, str, int, int]]) dict[str, float | int | str][source][source]#
pinder.core.utils.unbound.get_corresponding_residues(res_list: list[int], chain: str, mapping: dict[str, dict[int, int]]) tuple[list[int], list[int]][source][source]#

Get corresponding residues with default mapping.

pinder.core.utils.unbound.filter_interface_intersect(holo_interface: AtomArray, apo_interface: AtomArray, holo2apo_seq: dict[str, dict[int, int]], apo2holo_seq: dict[str, dict[int, int]], R_chain: str = 'R', L_chain: str = 'L') tuple[AtomArray, AtomArray][source][source]#
pinder.core.utils.unbound.get_unbound_interface_metrics(holo_complex: Structure, apo_RL: Structure, R_chain: str, L_chain: str, holo2apo_seq: dict[str, dict[int, int]], apo2holo_seq: dict[str, dict[int, int]], contact_rad: float = 5.0) dict[str, float | int | str][source][source]#
pinder.core.utils.unbound.get_unbound_difficulty(holo_R: Structure, holo_L: Structure, apo_R: Structure, apo_L: Structure, contact_rad: float = 5.0) dict[str, float | int | str][source][source]#
pinder.core.utils.unbound.get_apo_monomer_difficulty(holo_R: Structure, holo_L: Structure, apo_mono: Structure, apo_body: str, contact_rad: float = 5.0) dict[str, float | int | str][source][source]#

Module contents#