gnes.flow package

Module contents

class gnes.flow.Flow(with_frontend=True, is_trained=True, *args, **kwargs)[source]

Bases: gnes.base.TrainableBase

GNES Flow: an intuitive way to build workflow for GNES.

You can use add() then build() to customize your own workflow. For example:

from gnes.flow import Flow

f = (Flow(check_version=False, route_table=True)
     .add_preprocessor(yaml_path='BasePreprocessor')
     .add_encoder(yaml_path='BaseEncoder')
     .add_router(yaml_path='BaseRouter'))

with f.build(backend='thread') as flow:
    flow.index()
    ...

You can also use add(‘Encoder’, …) or add(Service.Encoder, …) to add service to the flow. The generic add() provides a convenient way to build the flow.

As shown above, it is recommend to use flow in the context manner as showed above, as it manages all opened sockets/processes/threads automatically when exit from the context.

Note the different copy behaviors in add() and build(): add() always copy the flow by default, whereas build() modify the flow in place. You can change this behavior by specifying th argument copy_flow=False.

Create a new Flow object.

Parameters:
  • with_frontend (bool) – adding frontend service to the flow
  • is_trained (bool) – indicating whether this flow is trained or not. if set to False then index() and query() can not be called before train()
  • kwargs – keyword-value arguments that will be shared by all services
Frontend = 0
add(service, name=None, recv_from=None, send_to=None, copy_flow=True, **kwargs)[source]

Add a service to the current flow object and return the new modified flow object. The attribute of the service can be later changed with set() or deleted with remove()

Note there are shortcut versions of this method. Recommend to use add_encoder(), add_preprocessor(), add_router(), add_indexer() whenever possible.

Parameters:
  • service (Union[Service, str]) – a ‘Service’ enum or string, possible choices: Encoder, Router, Preprocessor, Indexer, Frontend
  • name (Optional[str]) – the name identifier of the service, can be used in ‘recv_from’, ‘send_to’, set() and remove().
  • recv_from (Union[str, Tuple[str], List[str], Service, None]) – the name of the service(s) that this service receives data from. One can also use ‘Service.Frontend’ to indicate the connection with the frontend.
  • send_to (Union[str, Tuple[str], List[str], Service, None]) – the name of the service(s) that this service sends data to. One can also use ‘Service.Frontend’ to indicate the connection with the frontend.
  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
  • kwargs – other keyword-value arguments that the service CLI supports
Return type:

Flow

Returns:

a (new) flow object with modification

add_encoder(*args, **kwargs)[source]

Add an encoder to the current flow, a shortcut of add(Service.Encoder)()

Return type:Flow
add_frontend(*args, **kwargs)[source]

Add a frontend to the current flow, a shortcut of add(Service.Frontend)(). Usually you dont need to call this function explicitly, a flow object contains a frontend service by default. This function is useful when you build a flow without the frontend and want to customize the frontend later.

Return type:Flow
add_indexer(*args, **kwargs)[source]

Add an indexer to the current flow, a shortcut of add(Service.Indexer)()

Return type:Flow
add_preprocessor(*args, **kwargs)[source]

Add a preprocessor to the current flow, a shortcut of add(Service.Preprocessor)()

Return type:Flow
add_router(*args, **kwargs)[source]

Add a router to the current flow, a shortcut of add(Service.Router)()

Return type:Flow
build(backend='process', copy_flow=False, *args, **kwargs)[source]

Build the current flow and make it ready to use

Parameters:
  • backend (Optional[str]) – supported ‘thread’, ‘process’, ‘swarm’, ‘k8s’, ‘shell’, if None then only build graph only
  • copy_flow (bool) – return the copy of the current flow
Return type:

Flow

Returns:

the current flow (by default)

close()[source]

Release the resources as model is destroyed

index(bytes_gen=None, **kwargs)[source]

Do indexing on the current flow

Example,

with f.build(backend='thread') as flow:
    flow.index(txt_file='aa.txt')
    flow.index(image_zip_file='aa.zip', batch_size=64)
    flow.index(video_zip_file='aa.zip')
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'  # each yield generates a document to index

with f.build(backend='thread') as flow:
    flow.index(bytes_gen=my_reader())

It will start a CLIClient and call index().

Parameters:
  • bytes_gen (Optional[Iterator[bytes]]) – An iterator of bytes. If not given, then you have to specify it in kwargs.
  • kwargs – accepts all keyword arguments of gnes client CLI
query(bytes_gen=None, **kwargs)[source]

Do indexing on the current flow

It will start a CLIClient and call query().

Example,

with f.build(backend='thread') as flow:
    flow.query(txt_file='aa.txt')
    flow.query(image_zip_file='aa.zip', batch_size=64)
    flow.query(video_zip_file='aa.zip')
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a query for searching

with f.build(backend='thread') as flow:
    flow.query(bytes_gen=my_reader())
Parameters:
  • bytes_gen (Optional[Iterator[bytes]]) – An iterator of bytes. If not given, then you have to specify it in kwargs.
  • kwargs – accepts all keyword arguments of gnes client CLI
remove(name=None, copy_flow=True)[source]

Remove a service from the flow.

Parameters:
  • name (Optional[str]) – the name of the existing service
  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
Return type:

Flow

Returns:

a (new) flow object with modification

set(name, recv_from=None, send_to=None, copy_flow=True, clear_old_attr=False, as_last_service=False, **kwargs)[source]

Set the attribute of an existing service (added by add()) in the flow. For the attributes or kwargs that aren’t given, they will remain unchanged as before.

Parameters:
  • name (str) – the name of the existing service
  • recv_from (Union[str, Tuple[str], List[str], Service, None]) – the name of the service(s) that this service receives data from. One can also use ‘Service.Frontend’ to indicate the connection with the frontend.
  • send_to (Union[str, Tuple[str], List[str], Service, None]) – the name of the service(s) that this service sends data to. One can also use ‘Service.Frontend’ to indicate the connection with the frontend.
  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
  • clear_old_attr (bool) – remove old attribute value before setting the new one
  • as_last_service (bool) – whether setting the changed service as the last service in the graph
  • kwargs – other keyword-value arguments that the service CLI supports
Return type:

Flow

Returns:

a (new) flow object with modification

set_last_service(name, copy_flow=True)[source]

Set a service as the last service in the flow, useful when modifying the flow.

Parameters:
  • name (str) – the name of the existing service
  • copy_flow (bool) – when set to true, then always copy the current flow and do the modification on top of it then return, otherwise, do in-line modification
Return type:

Flow

Returns:

a (new) flow object with modification

to_jpg(path='flow.jpg', **kwargs)[source]

Rendering the current flow as a jpg image, this will call to_mermaid() and it needs internet connection

Parameters:
  • path (str) – the file path of the image
  • kwargs – keyword arguments of to_mermaid()
Return type:

None

Returns:

to_k8s_yaml()[source]
Return type:str
to_mermaid(left_right=True)[source]

Output the mermaid graph for visualization

Parameters:left_right (bool) – render the flow in left-to-right manner, otherwise top-down manner.
Return type:str
Returns:a mermaid-formatted string
to_python_code(indent=4)[source]

Generate the python code of this flow

Parameters:indent (int) – the number of whitespaces of indent
Return type:str
Returns:the generated python code
to_shell_script()[source]
Return type:str
to_swarm_yaml(image='gnes/gnes:latest-alpine')[source]

Generate the docker swarm YAML compose file

Parameters:image (str) – the default GNES docker image
Return type:str
Returns:the generated YAML compose file
to_url(**kwargs)[source]

Rendering the current flow as a url points to a SVG, it needs internet connection

Parameters:kwargs – keyword arguments of to_mermaid()
Return type:str
Returns:the url points to a SVG
train(bytes_gen=None, **kwargs)[source]

Do training on the current flow

It will start a CLIClient and call train().

Example,

with f.build(backend='thread') as flow:
    flow.train(txt_file='aa.txt')
    flow.train(image_zip_file='aa.zip', batch_size=64)
    flow.train(video_zip_file='aa.zip')
    ...

This will call the pre-built reader to read files into an iterator of bytes and feed to the flow.

One may also build a reader/generator on your own.

Example,

def my_reader():
    for _ in range(10):
        yield b'abcdfeg'   # each yield generates a document for training

with f.build(backend='thread') as flow:
    flow.train(bytes_gen=my_reader())
Parameters:
  • bytes_gen (Optional[Iterator[bytes]]) – An iterator of bytes. If not given, then you have to specify it in kwargs.
  • kwargs – accepts all keyword arguments of gnes client CLI