Source code for bob.pipelines.distributed.sge

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Tiago de Freitas Pereira <tiago.pereira@idiap.ch>

import logging
import sys

import dask

from dask_jobqueue.core import Job, JobQueueCluster
from distributed.deploy import Adaptive
from distributed.scheduler import Scheduler

from bob.extension import rc

from .sge_queues import QUEUE_DEFAULT

logger = logging.getLogger(__name__)


[docs]class SGEIdiapJob(Job): """Launches a SGE Job in the IDIAP cluster. This class basically encodes the CLI command that bootstrap the worker in a SGE job. Check here `https://distributed.dask.org/en/latest/resources.html#worker-resources` for more information. ..note: This is class is temporary. It's basically a copy from SGEJob from dask_jobqueue. The difference is that here I'm also handling the dask job resources tag (which is not handled anywhere). This has to be patched in the Job class. Please follow here `https://github.com/dask/dask-jobqueue/issues/378` to get news about this patch """ submit_command = "qsub" cancel_command = "qdel" config_name = "SGEIdiapJob" def __init__( self, *args, queue=None, project=rc.get("sge.project"), resource_spec=None, job_extra=None, config_name="sge", **kwargs, ): if queue is None: queue = dask.config.get("jobqueue.%s.queue" % config_name) if project is None: project = dask.config.get("jobqueue.%s.project" % config_name) if resource_spec is None: resource_spec = dask.config.get( "jobqueue.%s.resource-spec" % config_name ) if job_extra is None: job_extra = dask.config.get("jobqueue.%s.job-extra" % config_name) # Resources resources = kwargs.pop("resources", None) super().__init__( *args, config_name=config_name, death_timeout=10000, **kwargs ) # Amending the --resources in the `distributed.cli.dask_worker` CLI command if resources: # Preparing the string to be sent to `dask-worker` command resources_str = "" for k, v in resources.items(): resources_str += f"{k}={v}" self._command_template += f" --resources {resources_str}" header_lines = [] if self.job_name is not None: header_lines.append("#$ -N %(job-name)s") if queue is not None: header_lines.append("#$ -q %(queue)s") if project is not None: header_lines.append("#$ -P %(project)s") if resource_spec is not None: header_lines.append("#$ -l %(resource_spec)s") if self.log_directory is not None: header_lines.append("#$ -e %(log_directory)s/") header_lines.append("#$ -o %(log_directory)s/") header_lines.extend(["#$ -cwd", "#$ -j y"]) header_lines.extend(["#$ %s" % arg for arg in job_extra]) header_template = "\n".join(header_lines) config = { "job-name": self.job_name, "queue": queue, "project": project, "processes": self.worker_processes, "resource_spec": resource_spec, "log_directory": self.log_directory, } self.job_header = header_template % config logger.debug("Job script: \n %s" % self.job_script())
[docs]def get_max_jobs(queue_dict): """Given a queue list, get the max number of possible jobs.""" return max( [ queue_dict[r]["max_jobs"] for r in queue_dict if "max_jobs" in queue_dict[r] ] )
[docs]def get_resource_requirements(pipeline): """ Get the resource requirements to execute a graph. This is useful when it's necessary get the dictionary mapping the dask delayed keys with specific resource restrictions. Check https://distributed.dask.org/en/latest/resources.html#resources-with-collections for more information Parameters ---------- pipeline: :any:`sklearn.pipeline.Pipeline` A :any:`sklearn.pipeline.Pipeline` wrapper with :any:`bob.pipelines.DaskWrapper` Example ------- >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP >>> client = Client(cluster) # doctest: +SKIP >>> from bob.pipelines.sge import get_resource_requirements # doctest: +SKIP >>> resources = get_resource_requirements(pipeline) # doctest: +SKIP >>> my_delayed_task.compute(scheduler=client, resources=resources) # doctest: +SKIP """ resources = dict() for s in pipeline: if hasattr(s, "resource_tags"): resources.update(s.resource_tags) return resources
[docs]class SGEMultipleQueuesCluster(JobQueueCluster): """Launch Dask jobs in the SGE cluster allowing the request of multiple queues. Parameters ---------- log_directory: str Default directory for the SGE logs protocol: str Scheduler communication protocol dashboard_address: str Default port for the dask dashboard, env_extra: str, Extra environment variables to send to the workers sge_job_spec: dict Dictionary containing a minimum specification for the qsub command. It consists of: queue: SGE queue memory: Memory requirement in GB (e.g. 4GB) io_bio: set the io_big flag resource_spec: Whatever extra argument to be sent to qsub (qsub -l) tag: Mark this worker with an specific tag so dask scheduler can place specific tasks to it (https://distributed.dask.org/en/latest/resources.html) max_jobs: Maximum number of jobs in the queue min_jobs: int Lower bound for the number of jobs for `self.adapt` Example ------- Below follow a vanilla-example that will create a set of jobs on all.q: >>> from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster # doctest: +SKIP >>> from dask.distributed import Client # doctest: +SKIP >>> cluster = SGEMultipleQueuesCluster() # doctest: +SKIP >>> cluster.scale_up(10) # doctest: +SKIP >>> client = Client(cluster) # doctest: +SKIP It's possible to demand a resource specification yourself: >>> Q_1DAY_IO_BIG_SPEC = { ... "default": { ... "queue": "q_1day", ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", ... "resources": "", ... } ... } >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_IO_BIG_SPEC) # doctest: +SKIP >>> cluster.scale_up(10) # doctest: +SKIP >>> client = Client(cluster) # doctest: +SKIP More than one jon spec can be set: >>> Q_1DAY_GPU_SPEC = { ... "default": { ... "queue": "q_1day", ... "memory": "8GB", ... "io_big": True, ... "resource_spec": "", ... "resources": "", ... }, ... "gpu": { ... "queue": "q_gpu", ... "memory": "12GB", ... "io_big": False, ... "resource_spec": "", ... "resources": {"GPU":1}, ... }, ... } >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP >>> cluster.scale_up(10) # doctest: +SKIP >>> cluster.scale_up(1, sge_job_spec_key="gpu") # doctest: +SKIP >>> client = Client(cluster) # doctest: +SKIP Adaptive job allocation can also be used via `AdaptiveIdiap` extension: >>> cluster = SGEMultipleQueuesCluster(sge_job_spec=Q_1DAY_GPU_SPEC) # doctest: +SKIP >>> cluster.adapt(Adaptive=AdaptiveIdiap,minimum=2, maximum=10) # doctest: +SKIP >>> client = Client(cluster) # doctest: +SKIP """ def __init__( self, log_directory="./logs", protocol="tcp://", dashboard_address=":8787", env_extra=None, sge_job_spec=QUEUE_DEFAULT, min_jobs=1, project=rc.get("sge.project"), **kwargs, ): # Defining the job launcher self.job_cls = SGEIdiapJob self.sge_job_spec = sge_job_spec self.protocol = protocol self.log_directory = log_directory self.project = project silence_logs = "error" interface = None host = None security = None if env_extra is None: env_extra = [] elif not isinstance(env_extra, list): env_extra = [env_extra] self.env_extra = env_extra + ["export PYTHONPATH=" + ":".join(sys.path)] scheduler = { "cls": SchedulerResourceRestriction, # Use local scheduler for now "options": { "protocol": self.protocol, "interface": interface, "host": host, "dashboard_address": dashboard_address, "security": security, }, } # Spec cluster parameters loop = None asynchronous = False name = None # Starting the SpecCluster constructor super(JobQueueCluster, self).__init__( scheduler=scheduler, worker={}, loop=loop, silence_logs=silence_logs, asynchronous=asynchronous, name=name, ) def _get_worker_spec_options(self, job_spec): """Craft a dask worker_spec to be used in the qsub command.""" new_resource_spec = job_spec.get("resource_spec", "") # IO_BIG new_resource_spec += ( "io_big=TRUE," if "io_big" in job_spec and job_spec["io_big"] else "" ) memory = job_spec.get("memory", "")[:-1] new_resource_spec += f"mem_free={memory}," queue = job_spec.get("queue", "") if queue != "all.q": new_resource_spec += f"{queue}=TRUE" new_resource_spec = ( None if new_resource_spec == "" else new_resource_spec ) return { "queue": queue, "project": self.project, "memory": job_spec.get("memory", ""), "job_extra": job_spec.get("job_extra", None), "cores": 1, "processes": 1, "log_directory": self.log_directory, "local_directory": self.log_directory, "resource_spec": new_resource_spec, "interface": None, "protocol": self.protocol, "security": None, "resources": job_spec.get("resources", ""), "env_extra": self.env_extra, }
[docs] def scale(self, n_jobs, sge_job_spec_key="default"): """Launch an SGE job in the Idiap SGE cluster. Parameters ---------- n_jobs: int Quantity of jobs to scale sge_job_spec_key: str One of the specs `SGEMultipleQueuesCluster.sge_job_spec` """ if n_jobs == 0: # Shutting down all workers return super(JobQueueCluster, self).scale(0, memory=None, cores=0) job_spec = self.sge_job_spec[sge_job_spec_key] worker_spec_options = self._get_worker_spec_options(job_spec) n_cores = 1 worker_spec = {"cls": self.job_cls, "options": worker_spec_options} # Defining a new worker_spec with some SGE characteristics self.new_spec = worker_spec return super(JobQueueCluster, self).scale( n_jobs, memory=None, cores=n_cores )
[docs] def scale_up(self, n_jobs, sge_job_spec_key=None): """Scale cluster up. This is supposed to be used by the scheduler while dynamically allocating resources """ return self.scale(n_jobs, sge_job_spec_key)
[docs] async def scale_down(self, workers, sge_job_spec_key=None): """Scale cluster down. This is supposed to be used by the scheduler while dynamically allocating resources """ await super().scale_down(workers)
[docs] def adapt(self, *args, **kwargs): super().adapt(*args, Adaptive=AdaptiveMultipleQueue, **kwargs)
[docs]class AdaptiveMultipleQueue(Adaptive): """Custom mechanism to adaptively allocate workers based on scheduler load. This custom implementation extends the `Adaptive.recommendations` by looking at the `distributed.scheduler.TaskState.resource_restrictions`. The heuristics is: .. note :: If a certain task has the status `no-worker` and it has resource_restrictions, the scheduler should request a job matching those resource restrictions """
[docs] async def recommendations(self, target: int) -> dict: """Make scale up/down recommendations based on current state and target.""" plan = self.plan # Get tasks with no worker associated due to # resource restrictions resource_restrictions = ( await self.scheduler.get_no_worker_tasks_resource_restrictions() ) # If the amount of resources requested is bigger # than what available and those jobs has restrictions if target > len(plan): self.close_counts.clear() if len(resource_restrictions) > 0: return { "status": "up", "n": target, "sge_job_spec_key": list(resource_restrictions[0].keys())[ 0 ], } else: return {"status": "up", "n": target} # If the amount of resources requested is lower # than what is available, is time to downscale elif target < len(plan): to_close = set() # Get the worksers that can be closed. if target < len(plan) - len(to_close): L = await self.workers_to_close(target=target) to_close.update(L) firmly_close = set() # COUNTING THE AMOUNT OF SCHEDULER CYCLES THAT WE SHOULD KEEP # THIS WORKER BEFORE DESTROYING IT for w in to_close: self.close_counts[w] += 1 if self.close_counts[w] >= self.wait_count: firmly_close.add(w) for k in list(self.close_counts): # clear out unseen keys if k in firmly_close or k not in to_close: del self.close_counts[k] # Send message to destroy workers if firmly_close: return {"status": "down", "workers": list(firmly_close)} # If the amount of available workers is ok # for the current demand, BUT # there are tasks that need some special worker: # SCALE EVERYTHING UP if target == len(plan) and len(resource_restrictions) > 0: return { "status": "up", "n": target + 1, "sge_job_spec_key": list(resource_restrictions[0].keys())[0], } else: return {"status": "same"}
[docs] async def scale_up(self, n, sge_job_spec_key="default"): await self.cluster.scale(n, sge_job_spec_key=sge_job_spec_key)
[docs] async def scale_down(self, workers, sge_job_spec_key="default"): await super().scale_down(workers)
[docs]class SchedulerResourceRestriction(Scheduler): """Idiap extended distributed scheduler. This scheduler extends `Scheduler` by just adding a handler that fetches, at every scheduler cycle, the resource restrictions of a task that has status `no-worker` """ def __init__(self, *args, **kwargs): super(SchedulerResourceRestriction, self).__init__( idle_timeout=rc.get("bob.pipelines.sge.idle_timeout", 3600), allowed_failures=rc.get("bob.pipelines.sge.allowed_failures", 100), worker_ttl=rc.get("bob.pipelines.sge.worker_ttl", 120), synchronize_worker_interval="10s", *args, **kwargs, ) self.handlers[ "get_no_worker_tasks_resource_restrictions" ] = self.get_no_worker_tasks_resource_restrictions
[docs] def get_no_worker_tasks_resource_restrictions(self, comm=None): """Get the a task resource restrictions for jobs that has the status 'no-worker'.""" resource_restrictions = [] for k in self.tasks: if ( self.tasks[k].state == "no-worker" and self.tasks[k].resource_restrictions is not None ): resource_restrictions.append( self.tasks[k].resource_restrictions ) return resource_restrictions