Source code for beat.core.worker

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################


"""
======
worker
======

Worker implementation
"""
import logging
import socket

import simplejson as json
import zmq

from .utils import send_multipart

logger = logging.getLogger(__name__)


[docs]class WorkerController(object): """Implements the controller that will handle the workers allocated. Constants: Status: :py:const:`READY` :py:const:`EXIT` :py:const:`DONE` :py:const:`JOB_ERROR` :py:const:`ERROR` :py:const:`CANCELLED` Commands: :py:const:`EXECUTE` :py:const:`CANCEL` :py:const:`ACK` :py:const:`SCHEDULER_SHUTDOWN` """ # Status code READY = b"rdy" #: The worker is ready to be used EXIT = b"ext" #: The worker has exited RECEIVED = b"rcv" #: The worker has received the task DONE = b"don" #: The worker as successfully finished its task JOB_ERROR = b"erj" #: The worker failed to finish its task ERROR = b"err" #: The worker encountered an error CANCELLED = b"cld" #: The worker's task has been canceled # Commands EXECUTE = b"exe" #: Execute the given job CANCEL = b"cnl" #: Cancel the given job ACK = b"ack" #: Acknowledge SCHEDULER_SHUTDOWN = b"shd" #: Shutdown the scheduler
[docs] class Callbacks(object): """Set of callbacks used when a worker is ready or went away""" def __init__(self): self.onWorkerReady = None self.onWorkerGone = None
def __init__(self, address, port, callbacks=None): self.context = zmq.Context() self.context.setsockopt(socket.SO_REUSEADDR, 1) self.socket = self.context.socket(zmq.ROUTER) if port is not None: self.address = "tcp://%s:%d" % (address, port) self.socket.bind(self.address) else: self.address = "tcp://%s" % address port = self.socket.bind_to_random_port(self.address, min_port=50000) self.address += ":%d" % port self.poller = zmq.Poller() self.poller.register(self.socket, zmq.POLLIN) self.workers = [] if callbacks is None: callbacks = {} self.callbacks = WorkerController.Callbacks() for k, v in callbacks.items(): setattr(self.callbacks, k, v)
[docs] def destroy(self): for worker in self.workers: parts = [worker, WorkerController.SCHEDULER_SHUTDOWN] send_multipart(self.socket, parts) self.workers = [] self.poller.unregister(self.socket) self.poller = None self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() self.socket = None self.context.destroy() self.context = None
[docs] def execute(self, worker, job_id, configuration): """Executes the given job by the given worker using passed configuration Parameters: :param str worker: Address of the worker :param int job_id: Identifier of the job to execute :param dict configuration: Configuration for the job """ parts = [ worker, WorkerController.EXECUTE, b"%d" % job_id, json.dumps(configuration), ] send_multipart(self.socket, parts)
[docs] def cancel(self, worker, job_id): """Cancels the given job on the given worker Parameters: :param str worker: Address of the worker :param int job_id: Identifier of the job to execute """ parts = [worker, WorkerController.CANCEL, b"%d" % job_id] send_multipart(self.socket, parts)
[docs] def ack(self, worker): """Send acknowledge to worker Parameters: :param str worker: Address of the worker """ parts = [worker, WorkerController.ACK] send_multipart(self.socket, parts)
[docs] def process(self, timeout=0): """Processing loop Gets processing information through ZeroMQ and acts accordingly. Parameters: :param int timeout: Maximum time allocate for processing Returns: tuple: Returns a tuple containing the worker address, job_id and corresponding data if any or None in case of error. """ while True: socks = dict(self.poller.poll(timeout)) if not (self.socket in socks) or (socks[self.socket] != zmq.POLLIN): return None (address, status, data) = self._receive() if status == WorkerController.READY: if address not in self.workers: self.workers.append(address) self.ack(address) if self.callbacks.onWorkerReady is not None: self.callbacks.onWorkerReady(address) timeout = 0 elif status == WorkerController.EXIT: try: self.workers.remove(address) except Exception: logger.error("Unknown worker: %s" % address) return None if self.callbacks.onWorkerGone is not None: self.callbacks.onWorkerGone(address) timeout = 0 elif status in [ WorkerController.DONE, WorkerController.JOB_ERROR, WorkerController.CANCELLED, ]: job_id = int(data[0]) return (address, status, job_id, data[1:]) elif (status == WorkerController.ERROR) and (len(data) >= 2): job_id = int(data[0]) return (address, status, job_id, data[1:]) else: job_id = None return (address, status, job_id, data)
def _receive(self): parts = self.socket.recv_multipart() return parts[0], parts[1], parts[2:]