Assets

The momotor.rpc.asset module contains functions and classes to transfer asset files to and from the Momotor broker.

Remote

async momotor.rpc.asset.remote.send_asset(stub, job_id, query, path, server_info, *, process_executor=None, timeout=None)

Function for use by a client or worker to send an asset to the broker server

Produces log messages on the momotor.rpc.asset.remote logger.

Parameters
  • stub (Union[ClientStub, WorkerStub]) – The connected stub to the broker.

  • job_id (str) – Id of the job

  • query (AssetQuery) – Query for the asset

  • path (Union[str, Path]) – Local file path of the file to send

  • server_info (ServerInfoResponse) – The server’s info response

  • process_executor – An asyncio executor to execute long running CPU bound tasks

  • timeout – Timeout

Return type

None

async momotor.rpc.asset.remote.receive_asset(stub, job_id, query, path, exists=None, process_executor=None, timeout=None)

Function for use by a client or worker to request and receive an asset from the broker.

Produces log messages on the momotor.rpc.asset.remote logger.

Parameters
  • stub (Union[ClientStub, WorkerStub]) – The connected stub to the broker.

  • job_id (str) – Id of the job

  • query (AssetQuery) – Query for the asset

  • path (Union[str, Path]) – Local file path where the file is to be stored

  • exists (Optional[Callable[[AssetData], Awaitable[bool]]]) – A function that checks if the file is already known locally

  • process_executor – An asyncio executor to execute long running CPU bound tasks

  • timeout – Timeout

Return type

Tuple[AssetData, bool]

Returns

A tuple with the AssetData identifying the asset, and a boolean indicating whether the asset already exists locally.

Server

async momotor.rpc.asset.server.download_asset_stream(data, chunks, stream, *, timeout=None)

Function for use by the broker to handle an asset download stream from a client or worker.

This communicates with a remote’s momotor.rpc.asset.remote.receive_asset()

Parameters
async momotor.rpc.asset.server.upload_asset_stream(creator, stream, *, timeout=None)

Function for use by the broker to handle an asset upload stream to a client or worker.

This communicates with a remote’s momotor.rpc.asset.remote.send_asset()

Parameters
  • creator (AsyncGenerator[int, Optional[bytes]]) – A generator that processes the chunks and writes them to a file or other storage, and yields the number of bytes remaining.

  • stream (Stream) – The gPRC stream to the client or worker

  • timeout – Timeout

Exceptions

exception momotor.rpc.asset.exceptions.AssetSizeMismatchError
exception momotor.rpc.asset.exceptions.AssetHashMismatchError
exception momotor.rpc.asset.exceptions.UnexpectedEndOfStream

Utils

momotor.rpc.asset.utils.get_file_multihash(path, server_info=None)

Get multihash value for file in path

If the file is small enough to fit, the entire contents will be encoded in the multihash instead of an actual hash. A boolean in the result will indicate whether the result is a real hash or encoded content

If a server_info object is provided, the resulting hash will be compatible with the server, otherwise the hash generated will use the capabilities of the local system.

Parameters
Return type

Tuple[bytes, bool]

Returns

tuple of base58 encoded multihash, and boolean indicating if identity encoding was used

momotor.rpc.asset.utils.get_file_hash(func_code, path)

Calculate a hash of the given file’s content

Parameters
  • func_code (Union[str, int]) – a multihash code (string or integer)

  • path (Union[str, Path]) – path of the file to hash

Return type

bytes

Returns

unencoded hash digest generated by the hash function

async momotor.rpc.asset.utils.file_reader(path, mode, queue, *, chunk_size=8388608)

Helper to read chunks from a file into a queue

Parameters
  • path (Path) – file to read

  • mode (str) – file read mode

  • queue (Queue) – queue to put the chunks into. Should be a one-place queue

  • chunk_size (int) – maximum size of the chunks

async momotor.rpc.asset.utils.file_writer(path, mode, queue)

Helper to write chunks from a queue into a file

Parameters
  • path (Path) – file to write

  • mode (str) – file write mode

  • queue (Queue) – queue to get the chunks from. Should be a one-place queue