API

This section includes information for using the Python API of beat.core.

algorithm

Validation for algorithms

Forward importing from beat.backend.python.algorithm beat.backend.python.algorithm.Storage beat.backend.python.algorithm.Runner

beat.core.algorithm.load_algorithm_prototype(prefix)[source]
class beat.core.algorithm.Algorithm(prefix, data, dataformat_cache=None, library_cache=None)[source]

Bases: Algorithm

Algorithms represent runnable components within the platform.

This class can only parse the meta-parameters of the algorithm (i.e., input and output declaration, grouping, synchronization details, parameters and splittability). The actual algorithm is not directly treated by this class. It can, however, provide you with a loader for actually running the algorithmic code (see runner()).

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (object, Optional) – The piece of data representing the algorithm. It must validate against the schema defined for algorithms. If a string is passed, it is supposed to be a valid path to an algorithm in the designated prefix area. If a tuple is passed (or a list), then we consider that the first element represents the algorithm declaration, while the second, the code for the algorithm (either in its source format or as a binary blob). If None is passed, loads our default prototype for algorithms (source code will be in Python).

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up algorithm loading times as dataformats that are already loaded may be re-used.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used.

name

The algorithm name

Type

str

description

The short description string, loaded from the JSON file if one was set.

Type

str

documentation

The full-length docstring for this object.

Type

str

storage

A simple object that provides information about file paths for this algorithm

Type

object

dataformats

A dictionary containing all pre-loaded dataformats used by this algorithm. Data format objects will be of type beat.core.dataformat.DataFormat.

Type

dict

libraries

A mapping object defining other libraries this algorithm needs to load so it can work properly.

Type

dict

uses

A mapping object defining the required library import name (keys) and the full-names (values).

Type

dict

parameters

A dictionary containing all pre-defined parameters that this algorithm accepts.

Type

dict

splittable

A boolean value that indicates if this algorithm is automatically parallelizeable by our backend.

Type

bool

input_map

A dictionary where the key is the input name and the value, its type. All input names (potentially from different groups) are comprised in this dictionary.

Type

dict

output_map

A dictionary where the key is the output name and the value, its type. All output names (potentially from different groups) are comprised in this dictionary.

Type

dict

results

If this algorithm is actually an analyzer (i.e., there are no formal outputs, but results that must be saved by the platform), then this dictionary contains the names and data types of those elements.

Type

dict

groups

A list containing dictionaries with inputs and outputs belonging to the same synchronization group.

Type

dict

errors

A list containing errors found while loading this algorithm.

Type

list

data

The original data for this algorithm, as loaded by our JSON decoder.

Type

dict

code

The code that is associated with this algorithm, loaded as a text (or binary) file.

Type

str

dataformat_klass

alias of DataFormat

baseformat

Froward imports from beat.backend.python.baseformat

data

Forward importing from beat.backend.python.data: beat.backend.python.data.mixDataIndices() beat.backend.python.data.getAllFilenames() beat.backend.python.data.DataSource beat.backend.python.data.CachedDataSource beat.backend.python.data.DatabaseOutputDataSource beat.backend.python.data.RemoteDataSource beat.backend.python.data.DataSink beat.backend.python.data.CachedDataSink beat.backend.python.data.StdoutDataSink beat.backend.python.data.load_data_index() beat.backend.python.data.load_data_index_db() beat.backend.python.data.foundSplitRanges()

data_loaders

Forward importing from beat.backend.python.data_loaders beat.backend.python.data_loaders.DataLoaderList beat.backend.python.data_loaders.DataLoader beat.backend.python.data_loaders.DataView

database

Validation of databases

Forward importing from beat.backend.python.database: beat.backend.python.database.Storage

beat.core.database.get_first_procotol_template(prefix)[source]
class beat.core.database.Database(prefix, data, dataformat_cache=None)[source]

Bases: Database

Databases define the start point of the dataflow in an experiment.

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (dict, str) – The piece of data representing the database. It must validate against the schema defined for databases. If a string is passed, it is supposed to be a valid path to an database in the designated prefix area.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up database loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

name

The full, valid name of this database

Type

str

description

The short description string, loaded from the JSON file if one was set.

Type

str

documentation

The full-length docstring for this object.

Type

str

storage

A simple object that provides information about file paths for this database

Type

object

errors

A list containing errors found while loading this database.

Type

list

data

The original data for this database, as loaded by our JSON decoder.

Type

dict

property is_database_rawdata_access_enabled

Returns whether raw data sharing was enabled

This property is only useful for the Docker executor.

dataformat

Validation and parsing for dataformats

Forward importing from beat.backend.python.dataformat: beat.backend.python.dataformat.Storage

class beat.core.dataformat.DataFormat(prefix, data, parent=None, dataformat_cache=None)[source]

Bases: DataFormat

Data formats define the chunks of data that circulate between blocks.

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (object, Optional) – The piece of data representing the data format. It must validate against the schema defined for data formats. If a string is passed, it is supposed to be a valid path to an data format in the designated prefix area. If None is passed, loads our default prototype for data formats.

  • parent (tuple, Optional) – The parent DataFormat for this format. If set to None, this means this dataformat is the first one on the hierarchy tree. If set to a tuple, the contents are (format-instance, field-name), which indicates the originating object that is this object’s parent and the name of the field on that object that points to this one.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up data format loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

name

The full, valid name of this dataformat

Type

str

description

The short description string, loaded from the JSON file if one was set.

Type

str

documentation

The full-length docstring for this object.

Type

str

storage

A simple object that provides information about file paths for this dataformat

Type

object

errors

A list of strings containing errors found while loading this dataformat.

Type

list

data

The original data for this dataformat, as loaded by our JSON decoder.

Type

dict

resolved

A dictionary similar to data, but with references fully resolved.

Type

dict

referenced

A dictionary pointing to all loaded dataformats.

Type

dict

parent

The pointer to the dataformat to which the current format is part of. It is useful for internal error reporting.

Type

beat.core.dataformat.DataFormat

dock

Docker helper classes

class beat.core.dock.Host(images_cache=None, raise_on_errors=True, discover=True)[source]

Bases: object

An object of this class can connect to the docker host and resolve stuff

images_cache = {}
env2docker(key)[source]

Returns a nice docker image name given a BEAT environment key

db2docker(db_names)[source]

Returns a nice docker image name given a database name

dbenv2docker(key)[source]

Returns a nice docker image name given a BEAT database environment key

teardown()[source]
full_environment_name(name)[source]
property ip

The IP address of the docker host

create_container(image, command)[source]
start(container, virtual_memory_in_megabytes=0, max_cpu_percent=0)[source]

Starts the execution of a container

The process will be run in the background, and its standard output and standard error will be read after it finishes, into a limited size circular buffer.

Parameters
  • container (Container) – The container.

  • virtual_memory_in_megabytes (int, Optional) – The maximum amount of memory the user process can consume on the host. If not specified, a memory limit is not set.

  • max_cpu_percent (float, Optional) – The maximum amount of CPU the user process may consume on the host. The value 100 equals to using 100% of a single core. If not specified, then a CPU limitation is not put in place.

wait(container, timeout=None)[source]

Wait for the container to finish its job

Parameters

timeout (float, Optional) – A timeout in seconds to wait for the user process to finish. If a timeout value is not given, waits forever.

status(container)[source]

Checks the status of a given container

logs(container)[source]

Returns the logs of a container

statistics(container)[source]

Returns the statistics about a container

rm(container)[source]

Removes a given container. If it is not done, kill it first

kill(container)[source]

Stop a container

run(image, command)[source]

Runs a command and retrieves its status and output

get_ipaddress(container)[source]

Returns the ip address of the given container

class beat.core.dock.Container(image, command)[source]

Bases: object

This class represents a Docker container with its set of parameters

:param : param str image: Name of the image to use to build the container :param : param str command: Command to execute in the container.

set_name(name)[source]

Set the name to be used by the container in place of the docker auto generated one.

set_workdir(workdir)[source]

Set the work folder to be used by the container

set_entrypoint(entrypoint)[source]

Set the entry point to be used by the container

add_volume(path, mount_path, read_only=True)[source]

Add a volume to be mounted on the container

:param : param str path: Source path of the volume on disk :param : param str mount_path: Path of the volume in the container :param : param boolean read_only: Whether the volume will be read only

add_tmpfs(path, size)[source]

Add a tmpfs to be mounted on the container

:param : param str path: Target path for the tmpfs :param : param str size: Size of the tmps. Unlimited if empty

add_port(container_port, host_port, host_address=None)[source]

Add a port binding

:param : param int container_port: Port to bind from the container :param : param int host_port: Port to bind to on the host :param : param str host_address: Address of the host

add_environment_variable(name, value)[source]

Add an environment variable

:param : param str name: Name of the variable :param : param str value: Content of the variable

reset_ports()[source]

Empty the port bindings

property name
property workdir
property entrypoint
property volumes

Returns the volumes of this container in a suitable form to build a command to start the container.

property temporary_filesystems
property ports

Returns the ports of this container in a suitable form to build a command to start the container.

property environment_variables

Returns the environment variables to set on this container.

property network
property user
property command_line

Returns the complete docker command to start the container and execute the specified command.

Returns

Command to execute

Return type

str

drawing

Utilities for drawing toolchains and experiments

beat.core.drawing.text_color(c)[source]

Calculates if text must be black/white for a given color background

The technique deployed in this function calculates the perceptive luminance for a given color and then choses a black or white color depending on that value.

Parameters

c (str) – A color definition in the format #rrggbb, in which each color channel is represented by 2-digit hexadecimal number ranging from 0x0 to 0xff.

Returns

Either #000000 (black) or #ffffff (white) depending on which

better text color would go with the given color.

Return type

str

beat.core.drawing.lighten_color(n)[source]

Lightens the given color

Parameters

c (str) – A color definition in the format #rrggbb, in which each color channel is represented by 2-digit hexadecimal number ranging from 0x0 to 0xff.

Returns

The hexadecimal representation of the lightened color.

Return type

str

beat.core.drawing.create_port_table(type, names, color)[source]

Creates an HTML table with the defined port names

Parameters
  • type (str) – The type of port - maybe set to ‘input’, ‘output’ or ‘result’

  • names (str) – A set of strings that define the contents of each port

  • color (str) – A color definition in the format #rrggbb, in which each color channel is represented by 2-digit hexadecimal number ranging from 0x0 to 0xff.

Returns

str: A string containing the HTML representation for the table,

compatible with GraphViz.

beat.core.drawing.create_layout_ports_table(color, input_names=[], output_names=[])[source]

Creates an HTML table with the defined input & output port names

Parameters
  • input_names (str) – A set of strings that define the contents of each input port

  • output_names (str) – A set of strings that define the contents of each output port

  • color (str) – A color definition in the format #rrggbb, in which each color channel is represented by 2-digit hexadecimal number ranging from 0x0 to 0xff.

Returns

str: A string containing the HTML representation for the table,

compatible with GraphViz.

beat.core.drawing.make_label(inputs, name, outputs, color)[source]

Creates an HTML Table representing the label for a given block

Parameters
  • inputs (list) – A list of input names which represent all inputs for this block

  • name (str) – The name of the block

  • outputs (list) – A list of output names which represent all outputs for this block

  • color (str) – A color definition in the format #rrggbb, in which each color channel is represented by 2-digit hexadecimal number ranging from 0x0 to 0xff.

Returns

str: A string containing the HTML representation for the table,

compatible with GraphViz.

beat.core.drawing.make_layout_label(inputs, name, outputs, color)[source]

Creates an HTML Table representing the label for a given block

Parameters
  • inputs (list) – A list of input names which represent all inputs for this block

  • name (str) – The name of the block

  • outputs (list) – A list of output names which represent all outputs for this block

  • color (str) – A color definition in the format #rrggbb, in which each color channel is represented by 2-digit hexadecimal number ranging from 0x0 to 0xff.

Returns

str: A string containing the HTML representation for the table,

compatible with GraphViz.

environment

Helper functions related to environment management

beat.core.environments.enumerate_packages(host, environment_name)[source]

Enumerate the packages installed in given environment.

base

Execution utilities

class beat.core.execution.base.BaseExecutor(prefix, data, cache=None, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None)[source]

Bases: object

Executors runs the code given an execution block information

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (dict, str) – The piece of data representing the block to be executed. It must validate against the schema defined for execution blocks. If a string is passed, it is supposed to be a fully qualified absolute path to a JSON file containing the block execution information.

  • cache (str, Optional) – If your cache is not located under <prefix>/cache, then specify a full path here. It will be used instead.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up database loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

  • database_cache (dict, Optional) – A dictionary mapping database names to loaded databases. This parameter is optional and, if passed, may greatly speed-up database loading times as databases that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying databases change.

  • algorithm_cache (dict, Optional) – A dictionary mapping algorithm names to loaded algorithms. This parameter is optional and, if passed, may greatly speed-up database loading times as algorithms that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying algorithms change.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying libraries change.

cache

The path to the cache currently being used

Type

str

errors

A list containing errors found while loading this execution block.

Type

list

data

The original data for this executor, as loaded by our JSON decoder.

Type

dict

algorithm

An object representing the algorithm to be run.

Type

beat.core.algorithm.Algorithm

databases

A dictionary in which keys are strings with database names and values are database.Database, representing the databases required for running this block. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

views

A dictionary in which the keys are tuples pointing to the (<database-name>, <protocol>, <set>) and the value is a setup view for that particular combination of details. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

input_list

A list of inputs that will be served to the algorithm.

Type

beat.backend.python.inputs.InputList

output_list

A list of outputs that the algorithm will produce.

Type

beat.backend.python.outputs.OutputList

data_sources

A list with all data-sources created by our execution loader.

Type

list

data_sinks

A list with all data-sinks created by our execution loader. These are useful for clean-up actions in case of problems.

Type

list

process(virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0)[source]

Executes the user algorithm code

Parameters
  • virtual_memory_in_megabytes (int, Optional) – The amount of virtual memory (in Megabytes) available for the job. If set to zero, no limit will be applied.

  • max_cpu_percent (int, Optional) – The maximum amount of CPU usage allowed in a system. This number must be an integer number between 0 and 100*number_of_cores in your system. For instance, if your system has 2 cores, this number can go between 0 and 200. If it is <= 0, then we don’t track CPU usage.

  • timeout_in_minutes (int) – The number of minutes to wait for the user process to execute. After this amount of time, the user process is killed with signal.SIGKILL. If set to zero, no timeout will be applied.

Returns

A dictionary which is JSON formattable containing the summary of

this block execution.

Return type

dict

property valid

A boolean that indicates if this executor is valid or not

property analysis

A boolean that indicates if the current block is an analysis block

property outputs_exist

Returns True if outputs this block is supposed to produce exists.

property io_statistics

Summarize current I/O statistics looking at data sources and sinks, inputs and outputs

Returns

A dictionary summarizing current I/O statistics

Return type

dict

write(path)[source]

Writes contents to precise filesystem location

dump_runner_configuration(directory)[source]

Exports contents useful for a backend runner to run the algorithm

dump_databases_provider_configuration(directory)[source]

Exports contents useful for a backend runner to run the algorithm

docker

Execution utilities

class beat.core.execution.docker.DockerExecutor(host, prefix, data, cache=None, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None)[source]

Bases: RemoteExecutor

DockerExecutor runs the code given an execution block information, externally

Parameters
  • host (dock.Host) – A configured docker host that will execute the user process. If the host does not have access to the required environment, an exception will be raised.

  • prefix (str) – Establishes the prefix of your installation.

  • data (dict, str) – The piece of data representing the block to be executed. It must validate against the schema defined for execution blocks. If a string is passed, it is supposed to be a fully qualified absolute path to a JSON file containing the block execution information.

  • cache (str, Optional) – If your cache is not located under <prefix>/cache, then specify a full path here. It will be used instead.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up database loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

  • database_cache (dict, Optional) – A dictionary mapping database names to loaded databases. This parameter is optional and, if passed, may greatly speed-up database loading times as databases that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying databases change.

  • algorithm_cache (dict, Optional) – A dictionary mapping algorithm names to loaded algorithms. This parameter is optional and, if passed, may greatly speed-up database loading times as algorithms that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying algorithms change.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying libraries change.

cache

The path to the cache currently being used

Type

str

errors

A list containing errors found while loading this execution block.

Type

list

data

The original data for this executor, as loaded by our JSON decoder.

Type

dict

algorithm

An object representing the algorithm to be run.

Type

Algorithm

databases

A dictionary in which keys are strings with database names and values are database.Database, representing the databases required for running this block. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

views

A dictionary in which the keys are tuples pointing to the (<database-name>, <protocol>, <set>) and the value is a setup view for that particular combination of details. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

input_list

A list of inputs that will be served to the algorithm.

Type

beat.backend.python.inputs.InputList

output_list

A list of outputs that the algorithm will produce.

Type

beat.backend.python.outputs.OutputList

data_sources

A list with all data-sources created by our execution loader.

Type

list

data_sinks

A list with all data-sinks created by our execution loader. These are useful for clean-up actions in case of problems.

Type

list

CONTAINER_PREFIX_PATH = '/beat/prefix'
CONTAINER_CACHE_PATH = '/beat/cache'
process(virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0)[source]

Executes the user algorithm code using an external program.

The execution interface follows the backend API as described in our documentation.

We use green subprocesses this implementation. Each co-process is linked to us via 2 uni-directional pipes which work as datain and dataout end-points. The parent process (i.e. the current one) establishes the connection to the child and then can pass/receive commands, data and logs.

Usage of the data pipes (datain, dataout) is synchronous - you send a command and block for an answer. The co-process is normally controlled by the current process, except for data requests, which are user-code driven. The nature of our problem does not require an asynchronous implementation which, in turn, would require a much more complex set of dependencies (on asyncio or Twisted for example).

Parameters
  • virtual_memory_in_megabytes (int, Optional) – The amount of virtual memory (in Megabytes) available for the job. If set to zero, no limit will be applied.

  • max_cpu_percent (int, Optional) – The maximum amount of CPU usage allowed in a system. This number must be an integer number between 0 and 100*number_of_cores in your system. For instance, if your system has 2 cores, this number can go between 0 and 200. If it is <= 0, then we don’t track CPU usage.

  • timeout_in_minutes (int, Optional) – The number of minutes to wait for the user process to execute. After this amount of time, the user process is killed with signal.SIGKILL. If set to zero, no timeout will be applied.

Returns

A dictionary which is JSON formattable containing the summary of this block execution.

Return type

dict

local

Execution utilities

class beat.core.execution.local.LocalExecutor(prefix, data, cache=None, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None)[source]

Bases: BaseExecutor

LocalExecutor runs the code given an execution block information

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (dict, str) – The piece of data representing the block to be executed. It must validate against the schema defined for execution blocks. If a string is passed, it is supposed to be a fully qualified absolute path to a JSON file containing the block execution information.

  • cache (str, Optional) – If your cache is not located under <prefix>/cache, then specify a full path here. It will be used instead.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up database loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

  • database_cache (dict, Optional) – A dictionary mapping database names to loaded databases. This parameter is optional and, if passed, may greatly speed-up database loading times as databases that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying databases change.

  • algorithm_cache (dict, Optional) – A dictionary mapping algorithm names to loaded algorithms. This parameter is optional and, if passed, may greatly speed-up database loading times as algorithms that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying algorithms change.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying libraries change.

  • custom_root_folders (dict, Optional) – A dictionary where the keys are database identifiers (<db_name>/<version>) and the values are paths to the given database’s files. These values will override the value found in the database’s metadata.

cache

The path to the cache currently being used

Type

str

errors

A list containing errors found while loading this execution block.

Type

list

data

The original data for this executor, as loaded by our JSON decoder.

Type

dict

algorithm

An object representing the algorithm to be run.

Type

Algorithm

databases

A dictionary in which keys are strings with database names and values are database.Database, representing the databases required for running this block. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

views

A dictionary in which the keys are tuples pointing to the (<database-name>, <protocol>, <set>) and the value is a setup view for that particular combination of details. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

input_list

A list of inputs that will be served to the algorithm.

Type

beat.backend.python.inputs.InputList

output_list

A list of outputs that the algorithm will produce.

Type

beat.backend.python.outputs.OutputList

data_sources

A list with all data-sources created by our execution loader.

Type

list

data_sinks

A list with all data-sinks created by our execution loader. These are useful for clean-up actions in case of problems.

Type

list

custom_root_folders

A dictionary where the keys are database identifiers (<db_name>/<version>) and the values are paths to the given database’s files. These values will override the value found in the database’s metadata.

Type

dict

process(virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0)[source]

Executes the user algorithm code

Parameters
  • virtual_memory_in_megabytes (int, Optional) – The amount of virtual memory (in Megabytes) available for the job. If set to zero, no limit will be applied.

  • max_cpu_percent (int, Optional) – The maximum amount of CPU usage allowed in a system. This number must be an integer number between 0 and 100*number_of_cores in your system. For instance, if your system has 2 cores, this number can go between 0 and 200. If it is <= 0, then we don’t track CPU usage.

  • timeout_in_minutes (int) – The number of minutes to wait for the user process to execute. After this amount of time, the user process is killed with signal.SIGKILL. If set to zero, no timeout will be applied.

Returns

A dictionary which is JSON formattable containing the summary of

this block execution.

Return type

dict

remote

Execution utilities

class beat.core.execution.remote.RemoteExecutor(prefix, data, ip_address, cache=None, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None)[source]

Bases: BaseExecutor

Base class for Executors that communicate with a message handler

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (dict, str) – The piece of data representing the block to be executed. It must validate against the schema defined for execution blocks. If a string is passed, it is supposed to be a fully qualified absolute path to a JSON file containing the block execution information.

  • cache (str, Optional) – If your cache is not located under <prefix>/cache, then specify a full path here. It will be used instead.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up database loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

  • database_cache (dict, Optional) – A dictionary mapping database names to loaded databases. This parameter is optional and, if passed, may greatly speed-up database loading times as databases that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying databases change.

  • algorithm_cache (dict, Optional) – A dictionary mapping algorithm names to loaded algorithms. This parameter is optional and, if passed, may greatly speed-up database loading times as algorithms that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying algorithms change.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying libraries change.

cache

The path to the cache currently being used

Type

str

errors

A list containing errors found while loading this execution block.

Type

list

data

The original data for this executor, as loaded by our JSON decoder.

Type

dict

algorithm

An object representing the algorithm to be run.

Type

beat.core.algorithm.Algorithm

databases

A dictionary in which keys are strings with database names and values are database.Database, representing the databases required for running this block. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

views

A dictionary in which the keys are tuples pointing to the (<database-name>, <protocol>, <set>) and the value is a setup view for that particular combination of details. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

input_list

A list of inputs that will be served to the algorithm.

Type

beat.backend.python.inputs.InputList

output_list

A list of outputs that the algorithm will produce.

Type

beat.backend.python.outputs.OutputList

data_sources

A list with all data-sources created by our execution loader.

Type

list

data_sinks

A list with all data-sinks created by our execution loader. These are useful for clean-up actions in case of problems.

Type

list

kill()[source]

Stops the user process by force - to be called from signal handlers

subprocess

Execution utilities

class beat.core.execution.subprocess.SubprocessExecutor(prefix, data, cache=None, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None, ip_address='127.0.0.1', python_path=None)[source]

Bases: RemoteExecutor

SubprocessExecutor runs the code given an execution block information, using a subprocess

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (dict, str) – The piece of data representing the block to be executed. It must validate against the schema defined for execution blocks. If a string is passed, it is supposed to be a fully qualified absolute path to a JSON file containing the block execution information.

  • cache (str, Optional) – If your cache is not located under <prefix>/cache, then specify a full path here. It will be used instead.

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up database loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

  • database_cache (dict, Optional) – A dictionary mapping database names to loaded databases. This parameter is optional and, if passed, may greatly speed-up database loading times as databases that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying databases change.

  • algorithm_cache (dict, Optional) – A dictionary mapping algorithm names to loaded algorithms. This parameter is optional and, if passed, may greatly speed-up database loading times as algorithms that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying algorithms change.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying libraries change.

  • custom_root_folders (dict) – A dictionary mapping databases name and their location on disk

  • ip_address (str) – IP address of the machine to connect to for the database execution and message handlers.

  • python_path (str) – Path to the python executable of the environment to use for experiment execution.

cache

The path to the cache currently being used

Type

str

errors

A list containing errors found while loading this execution block.

Type

list

data

The original data for this executor, as loaded by our JSON decoder.

Type

dict

algorithm

An object representing the algorithm to be run.

Type

beat.core.algorithm.Algorithm

databases

A dictionary in which keys are strings with database names and values are database.Database, representing the databases required for running this block. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

views

A dictionary in which the keys are tuples pointing to the (<database-name>, <protocol>, <set>) and the value is a setup view for that particular combination of details. The dictionary may be empty in case all inputs are taken from the file cache.

Type

dict

input_list

A list of inputs that will be served to the algorithm.

Type

beat.backend.python.inputs.InputList

output_list

A list of outputs that the algorithm will produce.

Type

beat.backend.python.outputs.OutputList

data_sources

A list with all data-sources created by our execution loader.

Type

list

data_sinks

A list with all data-sinks created by our execution loader. These are useful for clean-up actions in case of problems.

Type

list

process(virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0)[source]

Executes the user algorithm code using an external program.

The execution interface follows the backend API as described in our documentation.

We use green subprocesses this implementation. Each co-process is linked to us via 2 uni-directional pipes which work as datain and dataout end-points. The parent process (i.e. the current one) establishes the connection to the child and then can pass/receive commands, data and logs.

Usage of the data pipes (datain, dataout) is synchronous - you send a command and block for an answer. The co-process is normally controlled by the current process, except for data requests, which are user-code driven. The nature of our problem does not require an asynchronous implementation which, in turn, would require a much more complex set of dependencies (on asyncio or Twisted for example).

Parameters
  • virtual_memory_in_megabytes (int, Optional) – The amount of virtual memory (in Megabytes) available for the job. If set to zero, no limit will be applied.

  • max_cpu_percent (int, Optional) – The maximum amount of CPU usage allowed in a system. This number must be an integer number between 0 and 100*number_of_cores in your system. For instance, if your system has 2 cores, this number can go between 0 and 200. If it is <= 0, then we don’t track CPU usage.

  • timeout_in_minutes (int) – The number of minutes to wait for the user process to execute. After this amount of time, the user process is killed with signal.SIGKILL. If set to zero, no timeout will be applied.

Returns

A dictionary which is JSON formattable containing the summary of

this block execution.

Return type

dict

experiment

Validation for experiments

class beat.core.experiment.Storage(prefix, name)[source]

Bases: Storage

Resolves paths for experiments

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • name (str) – The name of the experiment object in the format <user>/<toolchain-user>/<toolchain-name>/<version>/<name> or <user>/<toolchain-name>/<version>/<name>, in case <user> and <toolchain-user> are the same.

asset_type = 'experiment'
asset_folder = 'experiments'
class beat.core.experiment.Experiment(prefix, data, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None)[source]

Bases: object

Experiments define the complete workflow for user test on the platform.

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (object, Optional) – The piece of data representing the experiment. It must validate against the schema defined for toolchains. If a string is passed, it is supposed to be a valid path to an experiment in the designated prefix area. If None is passed, loads our default prototype for toolchains. If a tuple is passed (or a list), then we consider that the first element represents the experiment, while the second, the toolchain definition. The toolchain bit can be defined as a dictionary or as a string (pointing to a valid path in the designated prefix area).

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up experiment loading times as dataformats that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying dataformats change.

  • database_cache (dict, Optional) – A dictionary mapping database names to loaded databases. This parameter is optional and, if passed, may greatly speed-up experiment loading times as databases that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying databases change.

  • algorithm_cache (dict, Optional) – A dictionary mapping algorithm names to loaded algorithms. This parameter is optional and, if passed, may greatly speed-up experiment loading times as algorithms that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying algorithms change.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used. If you use this parameter, you must guarantee that the cache is refreshed as appropriate in case the underlying libraries change.

storage

A simple object that provides information about file paths for this toolchain

Type

object

toolchain

The toolchain in which this experiment is based.

Type

beat.core.toolchain.Toolchain

databases

A dictionary containing the names and beat.core.database.Database pointers for all referenced databases.

Type

dict

algorithms

A dictionary containing the names and beat.core.algorithm.Algorithm pointers for all referenced algorithms.

Type

dict

datasets

A dictionary containing the names and beat.core.database.Database pointers for all datasets in this experiment.

Type

dict

blocks

A dictionary containing the names and beat.core.algorithm.Algorithm pointers for all blocks in this experiment.

Type

dict

analyzers

A dictionary containing the names and beat.core.algorithm.Algorithm pointers for all analyzers in this experiment.

Type

dict

errors

A list strings containing errors found while loading this experiment.

Type

list

data

The original data for this experiment, as loaded by our JSON decoder.

Type

dict

property name

Label of this experiment

property label

Label of this experiment

property schema_version

Schema version

property valid

A boolean that indicates if this experiment is valid or not

setup()[source]

Prepares the experiment so it can be executed by a scheduling service.

This method will calculate the block execution order and prepare the configuration for each block in the experiment so its execution can be carried out by an adequate scheduling service.

Returns

An ordered dictionary with the

block/analyzer execution order and configuration details. The keys of this ordered dictionary correspond to the block and analyzer names in the toolchain. The values correspond to a list of dependencies for the given block in terms of other block names and the block/analyzer configuration, as a dictionary.

Return type

collections.OrderedDict

dot_diagram()[source]

Returns a dot diagram representation of the experiment

property description

The short description for this object

property documentation

The full-length description for this object

hash()[source]

Returns the hexadecimal hash for its declaration

json_dumps(indent=4)[source]

Dumps the JSON declaration of this object in a string

Parameters
  • indent (int) – The number of indentation spaces at every indentation

  • level

Returns

The JSON representation for this object

Return type

str

write(storage=None)[source]

Writes contents to prefix location

Parameters

storage (Storage, Optional) – If you pass a new storage, then this object will be written to that storage point rather than its default.

export(prefix)[source]

Recursively exports itself into another prefix

Databases and algorithms associated are also exported recursively

Parameters

prefix (str) – A path to a prefix that must different then my own.

Returns

None

Raises

RuntimeError – If prefix and self.prefix point to the same directory.

hash

Various functions for hashing platform contributions and others

Also forward importing from beat.backend.python.hash

beat.core.hash.hashBlockOutput(block_name, algorithm_name, algorithm_hash, parameters, environment, input_hashes, output_name)[source]

Generate a hash for a given block output

:param : param str block_name: Name of the block (unused) :param : param str algorithm_name: Name of the algorithm used by the block :param (parameter unused): :param : param str algorithm_hash: Hash of the algorithm used by the block :param : param dict parameters: Configured parameters :param : param dict environment: Environment parameters :param : param dict input_hashes: Dictionary containing the input’s hashes :param : param str output_name: Name of the output

beat.core.hash.hashAnalyzer(analyzer_name, algorithm_name, algorithm_hash, parameters, environment, input_hashes)[source]

Generate a hash for a given analyzer

:param : param str analyzer_name: Name of the analyzer (unused) :param : param str algorithm_name: Name of the algorithm used by the analyzer :param : param str algorithm_hash: Hash of the algorithm used by the analyzer :param : param dict parameters: Configured parameters :param : param dict environment: Environment parameters :param : param dict input_hashes: Dictionary containing the inputs’s hashes

beat.core.hash.hashJSONStr(contents, description)[source]

Hashes the JSON string contents using hashlib.sha256

Excludes description changes

inputs

Forward imported from beat.backend.python.inputs:

beat.backend.python.inputs.InputList beat.backend.python.inputs.Input beat.backend.python.inputs.InputGroup

library

Validation for libraries

Forward imported from beat.backend.python.library: beat.backend.python.library.Storage

class beat.core.library.Library(prefix, data, library_cache=None)[source]

Bases: Library

Librarys represent independent algorithm components within the platform.

This class can only parse the meta-parameters of the library. The actual library is not directly treated by this class - only by the associated algorithms.

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (object, Optional) – The piece of data representing the library. It must validate against the schema defined for libraries. If a string is passed, it is supposed to be a valid path to an library in the designated prefix area. If a tuple is passed (or a list), then we consider that the first element represents the library declaration, while the second, the code for the library (either in its source format or as a binary blob). If None is passed, loads our default prototype for libraries (source code will be in Python).

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used.

name

The library name

Type

str

description

The short description string, loaded from the JSON file if one was set.

Type

str

documentation

The full-length docstring for this object.

Type

str

storage

A simple object that provides information about file paths for this library

Type

object

libraries

A mapping object defining other libraries this library needs to load so it can work properly.

Type

dict

uses

A mapping object defining the required library import name (keys) and the full-names (values).

Type

dict

errors

A list containing errors found while loading this library.

Type

list

data

The original data for this library, as loaded by our JSON decoder.

Type

dict

code

The code that is associated with this library, loaded as a text (or binary) file.

Type

str

loader

Forward imports from beat.backend.python.loader

outputs

Forward imported from beat.backend.python.outputs: beat.backend.python.outputs.SynchronizationListener beat.backend.python.outputs.Output beat.backend.python.outputs.OutputList

plotter

Validation for plotters

class beat.core.plotter.Storage(prefix, name, language=None)[source]

Bases: CodeStorage

Resolves paths for plotter

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • name (str) – The name of the algorithm object in the format <user>/<name>/<version>.

asset_type = 'plotter'
asset_folder = 'plotters'
class beat.core.plotter.Runner(module, obj_name, algorithm, exc=None)[source]

Bases: Runner

A special loader class for plotters, with specialized methods

process(inputs=None)[source]

Runs through data

class beat.core.plotter.Plotter(prefix, data, dataformat_cache=None, library_cache=None)[source]

Bases: object

Plotter represent runnable components within the platform that generate images from data points.

This class can only parse the meta-parameters of the plotter (i.e., parameters and applicable dataformat). The actual plotter is not directly treated by this class - it can, however, provide you with a loader for actually running the plotting code (see Plotter.runner()).

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (object, Optional) – The piece of data representing the plotter. It must validate against the schema defined for plotters. If a string is passed, it is supposed to be a valid path to a plotter in the designated prefix area. If a tuple is passed (or a list), then we consider that the first element represents the plotter declaration, while the second, the code for the plotter (either in its source format or as a binary blob). If None is passed, loads our default prototype for plotters (source code will be in Python).

  • dataformat_cache (dict, Optional) – A dictionary mapping dataformat names to loaded dataformats. This parameter is optional and, if passed, may greatly speed-up algorithm loading times as dataformats that are already loaded may be re-used.

  • library_cache (dict, Optional) – A dictionary mapping library names to loaded libraries. This parameter is optional and, if passed, may greatly speed-up library loading times as libraries that are already loaded may be re-used.

storage

A simple object that provides information about file paths for this algorithm

Type

object

dataformat

An object of type dataformat.DataFormat that represents the dataformat to which this plotter is applicable.

Type

object

libraries

A mapping object defining other libraries this plotter needs to load so it can work properly.

Type

dict

errors

A list containing errors found while loading this algorithm.

Type

list

data

The original data for this algorithm, as loaded by our JSON decoder.

Type

dict

code

The code that is associated with this algorithm, loaded as a text (or binary) file.

Type

str

property schema_version

Returns the schema version

property name

The name of this object

property language

Returns the current language set for the executable code

clean_parameter(parameter, value)

Checks if a given value against a declared parameter

This method checks if the provided user value can be safe-cast to the parameter type as defined on its specification and that it conforms to any parameter-imposed restrictions.

Parameters
  • parameter (str) – The name of the parameter to check the value against

  • value (object) – An object that will be safe cast into the defined parameter type.

Returns

The converted value, with an appropriate numpy type.

Raises
  • KeyError – If the parameter cannot be found on this algorithm’s declaration.

  • ValueError – If the parameter cannot be safe cast into the algorithm’s type. Alternatively, a ValueError may also be raised if a range or choice was specified and the value does not obey those settings stipulated for the parameter

property valid

A boolean that indicates if this algorithm is valid or not

property api_version

Returns the API version

uses_dict()[source]

A mapping object defining the required library import name (keys) and the full-names (values).

runner(klass='Plotter', exc=None)[source]

Returns a runnable plotter object.

Parameters
  • klass (str) – The name of the class to load the runnable algorithm from

  • exc (class) – If passed, must be a valid exception class that will be used to report errors in the read-out of this plotter’s code.

Returns

An instance of the

algorithm, which will be constructed, but not setup. You must set it up before using the process method.

Return type

beat.backend.python.algorithm.Runner

property description

The short description for this object

json_dumps(indent=4)

Dumps the JSON declaration of this object in a string

Parameters

indent (int) – The number of indentation spaces at every indentation level

Returns

The JSON representation for this object

Return type

str

property documentation

The full-length description for this object

property uses

Mapping object defining the required library import name (keys) and the full-names (values)

property parameters

Dictionary containing all pre-defined parameters that this algorithm accepts

hash()[source]

Returns the hexadecimal hash for the current plotter

write(storage=None)[source]

Writes contents to prefix location

Parameters

storage (Storage, Optional) – If you pass a new storage, then this object will be written to that storage point rather than its default.

stats

A class that can read, validate and update statistical information

Forward impored from beat.backend.python.stats: beat.backend.python.stats.io_statistics() beat.backend.python.stats.update()

class beat.core.stats.Statistics(data=None)[source]

Bases: object

Statistics define resource usage for algorithmic code runs

Parameters

data (object, Optional) – The piece of data representing the statistics the be read, it must validate against our pre-defined execution schema. If the input is None or empty, then start a new statistics from scratch.

errors

A list strings containing errors found while loading this statistics information.

Type

list

property schema_version

Returns the schema version

property cpu

Returns only CPU information

property memory

Returns only memory information

property data

Returns only I/O information

property valid

A boolean that indicates if this executor is valid or not

as_json(indent=None)[source]

Returns self as as JSON

:param : param indent int: Indentation to use for the JSON generation

Returns

JSON representation

Return type

dict

as_dict()[source]

Returns self as a dictionary

write(f)[source]

Writes contents to a file-like object

beat.core.stats.cpu_statistics(start, end)[source]

Summarizes current CPU usage

This method should be used when the currently set algorithm is the only one executed through the whole process. It is done for collecting resource statistics on separate processing environments. It follows the recipe in: http://stackoverflow.com/questions/30271942/get-docker-container-cpu-usage-as-percentage

Returns

A dictionary summarizing current CPU usage

Return type

dict

beat.core.stats.memory_statistics(data)[source]

Summarizes current memory usage

This method should be used when the currently set algorithm is the only one executed through the whole process. It is done for collecting resource statistics on separate processing environments.

Returns

A dictionary summarizing current memory usage

Return type

dict

toolchain

Validation for toolchains

class beat.core.toolchain.Storage(prefix, name)[source]

Bases: Storage

Resolves paths for toolchains

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • name (str) – The name of the toolchain object in the format <user>/<name>/<version>.

asset_type = 'toolchain'
asset_folder = 'toolchains'
class beat.core.toolchain.Toolchain(prefix, data)[source]

Bases: object

Toolchains define the dataflow in an experiment.

Parameters
  • prefix (str) – Establishes the prefix of your installation.

  • data (object, Optional) – The piece of data representing the toolchain. It must validate against the schema defined for toolchains. If a string is passed, it is supposed to be a valid path to an toolchain in the designated prefix area. If None is passed, loads our default prototype for toolchains.

storage

A simple object that provides information about file paths for this toolchain

Type

object

errors

A list containing errors found while loading this toolchain.

Type

list

data

The original data for this toolchain, as loaded by our JSON decoder.

Type

dict

property schema_version

Returns the schema version

property name

The name of this object

property datasets

All declared datasets

property blocks

All declared blocks

property loops

All declared loops

property analyzers

All declared analyzers

algorithm_item(name)[source]

Returns a block, loop or analyzer matching the name given

property connections

All declared connections

dependencies(name)[source]

Returns the block dependencies for a given block/analyzer in a set

The calculation uses all declared connections for that block/analyzer. Dataset connections are ignored.

execution_order()[source]

Returns the execution order in an ordered dictionary with block deps.

dot_diagram(title=None, label_callback=None, edge_callback=None, result_callback=None, is_layout=False)[source]

Returns a dot diagram representation of the toolchain

Parameters
  • title (str) – A title for the generated drawing. If None is given, then prints out the toolchain name.

  • label_callback (function) – A python function that is called back each time a label needs to be inserted into a block. The prototype of this function is label_callback(type, name). type may be one of dataset, block or analyzer. This callback is used by the experiment class to complement diagram information before plotting.

  • edge_callback (function) – A python function that is called back each time an edge needs to be inserted into the graph. The prototype of this function is edge_callback(start). start is the name of the starting point for the connection, it should determine the dataformat for the connection.

  • result_callback (function) – A function to draw ports on analyzer blocks. The prototype of this function is result_callback(name, color).

Returns

graphviz.Digraph: With the graph ready for show-time.

property valid

A boolean that indicates if this toolchain is valid or not

property description

The short description for this object

property documentation

The full-length description for this object

hash()[source]

Returns the hexadecimal hash for its declaration

json_dumps(indent=4)[source]

Dumps the JSON declaration of this object in a string

Parameters

indent (int) – The number of indentation spaces at every indentation level

Returns

The JSON representation for this object

Return type

str

write(storage=None)[source]

Writes contents to prefix location

Parameters

storage (Storage, Optional) – If you pass a new storage, then this object will be written to that storage point rather than its default.

utils

Helper methods

Forward imports from beat.backend.python.utils

beat.core.utils.temporary_directory(prefix='beat_')[source]

Generates a temporary directory

beat.core.utils.uniq(seq)[source]

Order preserving (very fast) uniq function for sequences

beat.core.utils.send_multipart(socket, parts)[source]

Send the parts through the socket after having encoded them if necessary.

beat.core.utils.find_free_port()[source]

Returns the value of a free random port

beat.core.utils.find_free_port_in_range(min_port, max_port)[source]

Returns the value of a free port in range

beat.core.utils.id_generator(size=6, chars='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789')[source]

Simple id generator based on https://stackoverflow.com/a/2257449/5843716

beat.core.utils.setup_logging(verbosity, format_name, name=None, stream=None)[source]

Setup logging

beat.core.utils.build_env_name(env_data)[source]

Build the environment name used for string lookups

worker

Worker implementation

class beat.core.worker.WorkerController(address, port, callbacks=None)[source]

Bases: object

Implements the controller that will handle the workers allocated.

Constants:
Status:

READY EXIT DONE JOB_ERROR ERROR CANCELLED

Commands:

EXECUTE CANCEL ACK SCHEDULER_SHUTDOWN

READY = b'rdy'

The worker is ready to be used

EXIT = b'ext'

The worker has exited

RECEIVED = b'rcv'

The worker has received the task

DONE = b'don'

The worker as successfully finished its task

JOB_ERROR = b'erj'

The worker failed to finish its task

ERROR = b'err'

The worker encountered an error

CANCELLED = b'cld'

The worker’s task has been canceled

EXECUTE = b'exe'

Execute the given job

CANCEL = b'cnl'

Cancel the given job

ACK = b'ack'

Acknowledge

SCHEDULER_SHUTDOWN = b'shd'

Shutdown the scheduler

class Callbacks[source]

Bases: object

Set of callbacks used when a worker is ready or went away

destroy()[source]
execute(worker, job_id, configuration)[source]

Executes the given job by the given worker using passed configuration

:param : param str worker: Address of the worker :param : param int job_id: Identifier of the job to execute :param : param dict configuration: Configuration for the job

cancel(worker, job_id)[source]

Cancels the given job on the given worker

:param : param str worker: Address of the worker :param : param int job_id: Identifier of the job to execute

ack(worker)[source]

Send acknowledge to worker

:param : param str worker: Address of the worker

process(timeout=0)[source]

Processing loop

Gets processing information through ZeroMQ and acts accordingly.

:param : param int timeout: Maximum time allocate for processing

Returns

Returns a tuple containing the worker address, job_id and

corresponding data if any or None in case of error.

Return type

tuple