chainfury utilities
- class chainfury.utils.CFEnv[source]
Bases:
object
Single namespace for all environment variables.
CF_FOLDER: database connection string
CF_BLOB_STORAGE: blob storage folder on local machine
CF_BLOB_ENGINE: blob storage engine, can be one of no, local (default) or s3
CF_BLOB_BUCKET: blob storage bucket name (only used for s3 engine)
CF_BLOB_PREFIX: blob storage prefix (only used for s3 engine)
CF_BLOB_AWS_CLOUD_FRONT: blob storage cloud front url, if not provided defaults to primary S3 URL (only used for s3 engine)
CF_URL: the URL of the chainfury server
CF_TOKEN: the token to use to authenticate with the chainfury server
- CF_BLOB_AWS_CLOUD_FRONT()
- CF_BLOB_BUCKET()
- CF_BLOB_ENGINE()
- CF_BLOB_PREFIX()
- CF_BLOB_STORAGE()
- CF_FOLDER()
- CF_LOG_LEVEL()
- CF_TOKEN()
- CF_URL()
- exception chainfury.utils.DoNotRetryException[source]
Bases:
Exception
Raised when code tells not to retry
- class chainfury.utils.SimplerTimes[source]
Bases:
object
A class that provides a simpler interface to datetime and time modules.
- tz = datetime.timezone.utc
- exception chainfury.utils.UnAuthException[source]
Bases:
Exception
Raised when the API returns a 401
- chainfury.utils.batched(iterable, n)[source]
Convert any
iterable
to a generator of batches of sizen
, last one may be smaller. Python 3.12 hasitertools.batched
which does the same thing.Example
>>> for x in batched(range(10), 3): ... print(x) [0, 1, 2] [3, 4, 5] [6, 7, 8] [9]
- Parameters:
iterable (Iterable) – The iterable to convert to batches
n (int) – The batch size
- Yields:
Iterator – The batched iterator
- chainfury.utils.exponential_backoff(foo, *args, max_retries=2, retry_delay=1, **kwargs) Dict[str, Any] [source]
Exponential backoff function
- Parameters:
foo (function) – The function to call
max_retries (int, optional) – maximum number of retries. Defaults to 2.
retry_delay (int, optional) – Initial delay in seconds. Defaults to 1.
- Raises:
e – Max retries reached. Exiting…
Exception – This should never happen
- Returns:
The completion(s) generated by the API.
- Return type:
Dict[str, Any]
- chainfury.utils.from_json(fp: str = '') Dict[str, Any] [source]
Load a JSON string or filepath and return a dictionary.
- Parameters:
fp (str) – The filepath or JSON-ified string
Returns:
- chainfury.utils.get_blob(key: str, engine: str = '', bucket: str = '') bytes [source]
A function that gets the information from a file. This can automatically route to different storage engines.
- Parameters:
key (str) – The key to read the blob
engine (str, optional) – The engine to use, either pass value or set CF_BLOB_ENGINE env var. Defaults to “”.
bucket (str, optional) – The bucket to use, either pass value or set CF_BLOB_BUCKET env var. Defaults to “”.
- Returns:
The value stored in the blob
- Return type:
bytes
- chainfury.utils.get_files_in_folder(folder, ext='*', ig_pat: str = '', abs_path: bool = True, followlinks: bool = False) List[str] [source]
Get files with ext in folder
- chainfury.utils.logger = <Logger fury (INFO)>
This is the logger object that should be used across the entire package as well as by the user what wants to leverage existing logging infrastructure.
- chainfury.utils.store_blob(key: str, value: bytes, engine: str = '', bucket: str = '') str [source]
A function that stores the information in a file. This can automatically route to different storage engines.
- Parameters:
key (str) – The key to store the file under
value (bytes) – The value to store
engine (str, optional) – The engine to use, either pass value or set CF_BLOB_ENGINE env var. Defaults to “”.
bucket (str, optional) – The bucket to use, either pass value or set CF_BLOB_BUCKET env var. Defaults to “”.
- Returns:
The url of the stored file or filepath
- Return type:
str
- chainfury.utils.terminal_top_with_text(msg: str = '') str [source]
Prints full wodth text message on the terminal
- Parameters:
msg (str, optional) – The message to print. Defaults to “”.
- Returns:
The message to print
- Return type:
str
- chainfury.utils.threaded_map(fn, inputs: List[Tuple], wait: bool = True, max_threads=20, post_fn=None, _name: str = '', safe: bool = False) Dict[Future, int] | List[Any] [source]
inputs is a list of tuples, each tuple is the input for single invocation of fn. order is preserved.
- Parameters:
fn (function) – The function to call
inputs (List[Tuple[Any]]) – All the inputs to the function, can be a generator
wait (bool, optional) – If true, wait for all the threads to finish, otherwise return a dict of futures. Defaults to True.
max_threads (int, optional) – The maximum number of threads to use. Defaults to 20.
post_fn (function, optional) – A function to call with the result. Defaults to None.
_name (str, optional) – The name of the thread pool. Defaults to “”.
safe (bool, optional) – If true, all caughts exceptions are in the results. Defaults to False.
- chainfury.utils.to_json(x: dict, fp: str = '', indent=2, tight: bool = False) str | None [source]
Convert a dict to json string and write to file if
fp
is provided.- Parameters:
x (dict) – The dict to convert
fp (str, optional) – The file path to write to. Defaults to “”.
indent (int, optional) – The indentation level. Defaults to 2.
tight (bool, optional) – If true, remove all the whitespaces, ignores
indent
. Defaults to False.
- Returns:
The json string if
fp
is not provided- Return type:
Optional[str]