chainfury.base

This file contains the base classes for dataflow engine using Chain.

class chainfury.base.Chain(nodes: List[Node] = [], edges: List[Edge] = [], *, sample: Dict[str, Any] = {}, main_in: str = '', main_out: str = '')[source]

Bases: object

A chain is a full flow of nodes and edges.

Parameters:
  • nodes (List[Node], optional) – The list of nodes in the chain. Defaults to [].

  • edges (List[Edge], optional) – The list of edges in the chain. Defaults to [].

  • sample (Dict[str, Any], optional) – The sample data to use for the chain. Defaults to {}.

  • main_in (str, optional) – The name of the input var for the chat input. Defaults to “”.

  • main_out (str, optional) – The name of the output var for the chat output. Defaults to “”.

__call__(data: str | Dict[str, Any], thoughts_callback: Callable | None = None, print_thoughts: bool = False) Tuple[Var, Dict[str, Any]][source]

Runs the chain on the given data. In this function it will run a full dataflow engine along with thoughts buffer and a simple callback system at each step.

Example

>>> chain = Chain(...)
>>> out, thoughts = chain("Hello world")
>>> print(out)
The first man chuckled and shook his head, "You always have the weirdest explanations for everything."
>>> print(thoughts)
{
    '38c813a2-850c-448b-8cfb-bd5775cc4b61/answer': {
        'timestamp': '2023-06-27T16:50:04.178833',
        'value': '...'
    }
    '1378538b-a15e-475b-9a9d-a31a261165c0/out': {
        'timestamp': '2023-06-27T16:50:07.818709',
        'value': '...'
    }
}

You can also stream the intermediate responses by setting using stream_call method. You can get the exact same result as above by iterating over the response and getting the last response.

Parameters:
  • data (Union[str, Dict[str, Any]]) – The data to run the chain on.

  • thoughts_callback (Optional[Callable], optional) – The callback function to call at each step. Defaults to None.

  • print_thoughts (bool, optional) – Whether to print the thoughts buffer at each step. Defaults to False.

  • stream (bool, optional) – Whether to stream the output or not. Defaults to False.

Returns:

The output of the chain and the thoughts buffer.

Return type:

Tuple[Var, Dict[str, Any]]

classmethod from_dag(dag: Dag, check_server: bool = True)[source]

Loads the chain from the DAG object.

Parameters:

dag (T.Dag) – The dag object to load from

classmethod from_dict(data: Dict[str, Any], verbose: bool = False) Chain[source]

Creates a chain from a dictionary.

Parameters:

data (Dict[str, Any]) – The dictionary representation of the chain.

Returns:

The chain created from the dictionary.

Return type:

Chain

classmethod from_id(id: str)[source]

Loads the chain from the server, and tries to recreate it locally. NOTE: this requires server connection.

Example

>>> chain = Chain.from_id("l6lnksln")
>>> chain
Parameters:

id (str) – The id of the chain to load

Returns:

The chain object

Return type:

Chain

classmethod from_json(data: str)[source]

Creates a chain from a JSON string.

Parameters:

data (str) – The JSON string representation of the chain.

Returns:

The chain created from the JSON string.

Return type:

Chain

step(node_id: str, pre_data: Dict[str, Any], full_ir: Dict[str, Any], print_thoughts: bool = False, thoughts_callback: Callable | None = None) Tuple[Dict[str, Any], Dict[str, Any]][source]

Performs a single step in the chain, useful for manual debugging.

Parameters:
  • node_id (str) – The id of the node to step.

  • pre_data (Dict[str, Any]) – The data to use for the step.

  • full_ir (Dict[str, Any]) – The full IR to use for the step.

  • print_thoughts (bool, optional) – Whether to print the thoughts. Defaults to False.

  • thoughts_callback (Optional[Callable], optional) – A callback to call with the thoughts. Defaults to None.

Returns:

The currrent output and updated thoughts ir buffer.

Return type:

Tuple[Dict[str, Any], Dict[str, Any]]

stream(data: str | Dict[str, Any], thoughts_callback: Callable | None = None, print_thoughts: bool = False) Generator[Tuple[Any | Dict[str, Any], bool], None, None][source]

This is a streaming version of __call__ method. It will yield the intermediate responses as they come in.

Example

>>> chain = Chain(...)
>>> cf_response_gen = chain.stream("Hello world")
>>> out = None
>>> thoughts = {}
>>> for ir, done in cf_response_gen:
...     if done:
...         out = ir
...     else:
...         thoughts.update(ir)
>>> print(out)
The first man chuckled and shook his head, "You always have the weirdest explanations for everything."
>>> print(thoughts)
{
    '38c813a2-850c-448b-8cfb-bd5775cc4b61/answer': {
        'timestamp': '2023-06-27T16:50:04.178833',
        'value': '...'
    }
    '1378538b-a15e-475b-9a9d-a31a261165c0/out': {
        'timestamp': '2023-06-27T16:50:07.818709',
        'value': '...'
    }
}
Parameters:
  • data (Union[str, Dict[str, Any]]) – The data to run the chain on.

  • thoughts_callback (Optional[Callable], optional) – The callback function to call at each step. Defaults to None.

  • print_thoughts (bool, optional) – Whether to print the thoughts buffer at each step. Defaults to False.

Yields:

Generator[Tuple[Union[Any, Dict[str, Any]], bool], None, None] – The intermediate responses and whether the response is the final response or not.

to_dag() Dag[source]

Converts the current chain to a DAG object

to_dict(main_in: str = '', main_out: str = '', sample: Dict[str, Any] = {}) Dict[str, Any][source]

Serializes the chain to a dictionary.

Parameters:
  • main_in (str, optional) – The name of the input var for the chat input. Defaults to “”.

  • main_out (str, optional) – The name of the output var for the chat output. Defaults to “”.

  • sample (Dict[str, Any], optional) – The sample data to use for the chain. Defaults to {}.

Returns:

The dictionary representation of the chain.

Return type:

Dict[str, Any]

to_json(indent=None) str[source]

Serializes the chain to a JSON string.

Returns:

The JSON string representation of the chain.

Return type:

str

class chainfury.base.Edge(src_node_id: str, src_node_var: str, trg_node_id: str, trg_node_var: str)[source]

Bases: object

Creates an edge between two nodes.

Parameters:
  • src_node_id (str) – The id of the source node.

  • src_node_var (str) – The name of the source node variable.

  • trg_node_id (str) – The id of the target node.

  • trg_node_var (str) – The name of the target node variable.

classmethod from_dict(data: Dict[str, Any], verbose: bool = False) Edge[source]

Creates an edge from a dictionary.

Parameters:

data (Dict[str, Any]) – The dictionary representation of the edge.

Returns:

The edge created from the dictionary.

Return type:

Edge

to_dict() Dict[str, Any][source]

Serializes the edge to a dictionary.

Returns:

The dictionary representation of the edge.

Return type:

Dict[str, Any]

class chainfury.base.Model(collection_name: str, id: str, fn: object, description: str = '', usage: List[str | int] = [], tags=[])[source]

Bases: object

TYPE_NAME = 'model'

constant for the type name

__call__(model_data: Dict[str, Any]) Tuple[Any, Exception | None][source]

Calls the model with the given data.

Parameters:

model_data (Dict[str, Any]) – The data to pass to the model.

Returns:

The result of the model and the exception if any.

Return type:

Tuple[Any, Optional[Exception]]

to_dict(no_vars: bool = False) Dict[str, Any][source]

Converts the model to a dictionary.

Parameters:

no_vars (bool, optional) – Whether to include the vars. Defaults to False.

Returns:

The dictionary representation of the model.

Return type:

Dict[str, Any]

class chainfury.base.Node(id: str, type: str, fn: object, fields: List[Var], outputs: List[Var], description: str = '', tags: List[str] = [], allow_callback: bool = False)[source]

Bases: object

__call__(data: Dict[str, Any], print_thoughts: bool = False) Tuple[Any, Exception | None][source]

Calls the node with the given data.

Parameters:
  • data (Dict[str, Any]) – The data to pass to the node.

  • print_thoughts (bool, optional) – Whether to print the thoughts of the node, useful for debugging. Defaults to False.

Returns:

The result of the node and the exception if any.

Return type:

Tuple[Any, Optional[Exception]]

classmethod from_dict(data: Dict[str, Any], verbose: bool = False) Node[source]

Creates a node from a dictionary.

Parameters:
  • data (Dict[str, Any]) – The dictionary representation of the node.

  • verbose (bool, optional) – Whether to print verbose logs. Defaults to False.

Returns:

The node created from the dictionary.

Return type:

Node

classmethod from_json(data: str) Node[source]

Creates a node from a json string.

Parameters:

data (str) – The json string representation of the node.

Returns:

The node created from the json string.

Return type:

Node

has_field(field: str) bool[source]

helper function to check if the node has a field with the given name.

Parameters:

field (str) – The name of the field to check.

Returns:

True if the node has the field, False otherwise.

Return type:

bool

to_dict() Dict[str, Any][source]

Converts the node to a dictionary.

Returns:

The dictionary representation of the node.

Return type:

Dict[str, Any]

to_json(indent=None) str[source]

Converts the node to a json string.

Parameters:

indent (int, optional) – The indent to use. Defaults to None.

Returns:

The json string representation of the node.

Return type:

str

types = <chainfury.base.NodeType object>
class chainfury.base.NodeType[source]

Bases: object

AI = 'ai-powered'

constant for the AI node type

MEMORY = 'memory'

constant for the memory node type

PROGRAMATIC = 'programatic'

constant for the programatic node type

exception chainfury.base.NotDAGError[source]

Bases: Exception

class chainfury.base.Secret(value='')[source]

Bases: str

This class just means that in Var it will be taken as a password field

class chainfury.base.Var(type: str | List[Var], format: str = '', items: List[Var] = [], additionalProperties: List[Var] | Var = [], password: bool = False, required: bool = False, placeholder: str = '', show: bool = False, name: str = '', *, loc: Tuple | None = ())[source]

Bases: object

classmethod from_dict(d: Dict[str, Any]) Var[source]

Deserialise a Var from a dictionary.

Parameters:

d (Dict[str, Any]) – The dictionary to deserialise from.

Returns:

The deserialised Var.

Return type:

Var

set_value(v: Any)[source]

Set the value of this Var.

Parameters:

v (Any) – The value to set.

to_dict() Dict[str, Any][source]

Serialise this Var to a dictionary that can be JSON serialised and sent to the client.

Returns:

The serialised Var.

Return type:

Dict[str, Any]

chainfury.base.adjacency_list_to_edge_map(adjacency_list) List[Edge][source]
chainfury.base.edge_array_to_adjacency_list(edges: List[Edge])[source]
chainfury.base.extract_jinja_indices(data: str | List | Dict[str, Any], current_index=(), indices=None) List[source]

This takes in a nested object and returns all the locations where jinja template was detected.

Parameters:
  • data (Union[str, List, Dict[str, Any]]) – The nested object.

  • current_index (tuple, optional) – The current index. Defaults to ().

  • indices ([type], optional) – The indices. Defaults to None.

Returns:

The list of indices.

Return type:

List

Example

>>> from chainfury.base import extract_jinja_indices
>>> extract_jinja_indices({
...     "foo": {
...         "bar": "{{ baz }}",
...         "gaa": ["{{ joo }}"]
...    }
... })
[(('foo', 'bar'), Var(...)), (('foo', 'gaa', '0'), Var(...))]
>>> # more examples
[(('3', 'content'), [Var('num1', type=string, items=[], additionalProperties=[]), Var('num2', type=string, items=[], additionalProperties=[])])]
[((), [Var('message', type=string, items=[], additionalProperties=[])])]
[(('meta_prompt', 'data'), [Var('place', type=string, items=[], additionalProperties=[])])]
[('0', [Var('name', type=string, items=[], additionalProperties=[])])]
[(('meta', 'ptype'), [Var('genome', type=string, items=[], additionalProperties=[])])]
[(('level-0', 'level-1', 'level-2'), [Var('thing', type=string, items=[], additionalProperties=[])]), (('level-0', 'nice'), [Var('feeling', type=string, items=[], additionalProperties=[])])]
[]
chainfury.base.func_to_return_vars(func, returns: Dict[str, Tuple[int]]) List[Var][source]

Analyses the return annotation type of the signature of a function and converts it to an array of named Var objects.

Parameters:
  • func (Callable) – The function to extract the signature from.

  • returns (Dict[str, Tuple]) – The dictionary of return types.

Returns:

A dictionary with the name of the return type and the location of the return type.

Return type:

Dict[str, Tuple[int]]

chainfury.base.func_to_vars(func: object) List[Var][source]

Extracts the signature of a function and converts it to an array of Var objects.

Parameters:

func (Callable) – The function to extract the signature from.

Returns:

The array of Var objects.

Return type:

List[Var]

chainfury.base.get_value_by_keys(obj, keys, *, _first_sentinal: bool = False) Any[source]

Takes in an arbitrary nested object and returns the value at the location specified by the keys.

Parameters:
  • obj (Union[List, Dict[str, Any]]) – The nested object.

  • keys (Union[str, List[str], Tuple[str, ...]]) – The keys. See extract_jinja_indices for examples.

  • _first_sentinal (bool, optional) – flag to tell if this is the first input or not, user should not use this. Defaults to False.

Returns:

The value at the location specified by the keys.

Return type:

Any

chainfury.base.jinja_schema_to_vars(v) Var[source]

Converts a Jinja schema to a Var object.

Parameters:

v ([type]) – The Jinja schema.

Returns:

The Var object.

Return type:

Var

chainfury.base.jtype_to_vars(prompt: str) List[Var][source]

Converts a Jinja prompt to an array of Var objects.

Parameters:

prompt (str) – The Jinja prompt.

Returns:

The array of Var objects.

Return type:

List[Var]

chainfury.base.put_value_by_keys(obj, keys, value: Any)[source]

Takes in an arbitrary nested object and sets the value at the location specified by the keys.

Parameters:
  • obj (Union[List, Dict[str, Any]]) – The nested object.

  • keys (Union[str, List[str], Tuple[str, ...]]) – The keys. See extract_jinja_indices for examples.

  • value (Any) – The value to set.

chainfury.base.pyannotation_to_json_schema(x: Any, allow_any: bool, allow_exc: bool, allow_none: bool, *, trace: bool = False, is_return: bool = False) Var[source]

Function to convert the given annotation from python to a Var which can then be JSON serialised and sent to the clients.

Parameters:
  • x (Any) – The annotation to convert.

  • allow_any (bool) – Whether to allow the Any type.

  • allow_exc (bool) – Whether to allow the Exception type.

  • allow_none (bool) – Whether to allow the None type.

  • trace (bool, optional) – Adds verbosity the schema generation also set FURY_LOG_LEVEL=’debug’. Defaults to False.

Returns:

The converted annotation.

Return type:

Var

chainfury.base.topological_sort(edges: List[Edge]) List[str][source]

Topological sort of a DAG, raises NotDAGError if the graph is not a DAG. This is full proof version which will work even if the DAG contains several unconnected chains.

Parameters:

edges (List[Edge]) – The edges of the DAG

Returns:

The topologically sorted list of node ids

Return type:

List[str]