diff --git a/backend/pyproject.toml b/backend/pyproject.toml index 440bdb8..8526e96 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -16,13 +16,7 @@ maintainers = [ { name = "Adrian Rumpold", email = "a.rumpold@appliedai-institute.de" }, ] license = { text = "Apache-2.0" } -dependencies = [ - "fastapi", - "uvicorn", - "docker", - "kubernetes", - "aai-jobq", -] +dependencies = ["fastapi", "uvicorn", "docker", "kubernetes", "aai-jobq"] dynamic = ["version"] [project.optional-dependencies] diff --git a/backend/src/jobq_server/models.py b/backend/src/jobq_server/models.py index ea3b647..2517518 100644 --- a/backend/src/jobq_server/models.py +++ b/backend/src/jobq_server/models.py @@ -32,6 +32,13 @@ def validate_image_ref(ref: str) -> str: ImageRef = Annotated[str, AfterValidator(validate_image_ref)] + +class ImagePullPolicy(StrEnum): + ALWAYS = "Always" + NEVER = "Never" + IFNOTPRESENT = "IfNotPresent" + + SubmissionContext: TypeAlias = dict[str, Any] @@ -60,6 +67,7 @@ class CreateJobModel(BaseModel): image_ref: ImageRef mode: ExecutionMode options: JobOptions + pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS submission_context: SubmissionContext = Field(default_factory=dict) diff --git a/backend/src/jobq_server/routers/jobs.py b/backend/src/jobq_server/routers/jobs.py index 07e3527..7c999eb 100644 --- a/backend/src/jobq_server/routers/jobs.py +++ b/backend/src/jobq_server/routers/jobs.py @@ -6,7 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Response from fastapi import status as http_status from fastapi.responses import StreamingResponse -from jobq import Image, Job +from jobq import Job from jobq_server.dependencies import Kubernetes, ManagedWorkload from jobq_server.exceptions import PodNotReadyError @@ -50,8 +50,9 @@ def job_fn(): ... detail=f"unsupported job execution mode: {opts.mode!r}", ) - image = Image(opts.image_ref) - workload_id = runner.run(job, image, opts.submission_context) + workload_id = runner.run( + job, opts.image_ref, opts.submission_context, opts.pull_policy + ) return workload_id diff --git a/backend/src/jobq_server/runner/base.py b/backend/src/jobq_server/runner/base.py index 1b5e6ec..68dd0d9 100644 --- a/backend/src/jobq_server/runner/base.py +++ b/backend/src/jobq_server/runner/base.py @@ -1,9 +1,15 @@ import abc from typing import ClassVar, Self -from jobq import Image, Job +from jobq import Job -from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier +from jobq_server.models import ( + ExecutionMode, + ImagePullPolicy, + ImageRef, + SubmissionContext, + WorkloadIdentifier, +) class Runner(abc.ABC): @@ -11,7 +17,11 @@ class Runner(abc.ABC): @abc.abstractmethod def run( - self, job: Job, image: Image, context: SubmissionContext + self, + job: Job, + image: ImageRef, + context: SubmissionContext, + pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS, ) -> WorkloadIdentifier | None: ... @classmethod diff --git a/backend/src/jobq_server/runner/docker.py b/backend/src/jobq_server/runner/docker.py index 5c16d55..c886f92 100644 --- a/backend/src/jobq_server/runner/docker.py +++ b/backend/src/jobq_server/runner/docker.py @@ -2,10 +2,16 @@ import textwrap import docker -from jobq import Image, Job +from jobq import Job from jobq.job import DockerResourceOptions - -from jobq_server.models import ExecutionMode, SubmissionContext +from typing_extensions import override + +from jobq_server.models import ( + ExecutionMode, + ImagePullPolicy, + ImageRef, + SubmissionContext, +) from jobq_server.runner.base import Runner, _make_executor_command from jobq_server.utils.helpers import remove_none_values @@ -15,7 +21,18 @@ def __init__(self, **kwargs): super().__init__() self._client = docker.from_env() - def run(self, job: Job, image: Image, context: SubmissionContext) -> None: + @override + def run( + self, + job: Job, + image: ImageRef, + context: SubmissionContext, + pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS, + ) -> None: + if pull_policy == ImagePullPolicy.ALWAYS: + logging.debug("Pulling container image %s", image) + self._client.images.pull(image) + command = _make_executor_command(job) resource_kwargs: DockerResourceOptions = { @@ -27,7 +44,7 @@ def run(self, job: Job, image: Image, context: SubmissionContext) -> None: resource_kwargs = res.to_docker() container: docker.api.client.ContainerApiMixin = self._client.containers.run( - image=image.tag, + image=image, command=command, detach=True, **remove_none_values(resource_kwargs), diff --git a/backend/src/jobq_server/runner/kueue.py b/backend/src/jobq_server/runner/kueue.py index a542bda..29882c4 100644 --- a/backend/src/jobq_server/runner/kueue.py +++ b/backend/src/jobq_server/runner/kueue.py @@ -1,11 +1,18 @@ import logging from dataclasses import asdict -from jobq import Image, Job +from jobq import Job from jobq.types import K8sResourceKind from kubernetes import client - -from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier +from typing_extensions import override + +from jobq_server.models import ( + ExecutionMode, + ImagePullPolicy, + ImageRef, + SubmissionContext, + WorkloadIdentifier, +) from jobq_server.runner.base import Runner, _make_executor_command from jobq_server.services.k8s import KubernetesService from jobq_server.utils.k8s import ( @@ -24,7 +31,11 @@ def __init__(self, k8s: KubernetesService, **kwargs: str) -> None: self._queue = kwargs.get("local_queue", "user-queue") def _make_job_crd( - self, job: Job, image: Image, context: SubmissionContext + self, + job: Job, + image: ImageRef, + context: SubmissionContext, + pull_policy: ImagePullPolicy, ) -> client.V1Job: if not job.options: raise ValueError("Job options must be specified") @@ -39,8 +50,8 @@ def _make_job_crd( # Job container container = client.V1Container( - image=image.tag, - image_pull_policy="IfNotPresent", + image=image, + image_pull_policy=pull_policy.value, name="workload", command=_make_executor_command(job), resources=( @@ -70,12 +81,17 @@ def _make_job_crd( ), ) + @override def run( - self, job: Job, image: Image, context: SubmissionContext + self, + job: Job, + image: ImageRef, + context: SubmissionContext, + pull_policy: ImagePullPolicy = ImagePullPolicy.ALWAYS, ) -> WorkloadIdentifier: logging.info(f"Submitting job {job.name} to Kueue") - k8s_job = self._make_job_crd(job, image, context) + k8s_job = self._make_job_crd(job, image, context, pull_policy) batch_api = client.BatchV1Api() resource: client.V1Job = batch_api.create_namespaced_job( self._k8s.namespace, k8s_job diff --git a/backend/src/jobq_server/runner/ray.py b/backend/src/jobq_server/runner/ray.py index 540c158..c32d713 100644 --- a/backend/src/jobq_server/runner/ray.py +++ b/backend/src/jobq_server/runner/ray.py @@ -5,11 +5,18 @@ from dataclasses import asdict import yaml -from jobq import Image, Job +from jobq import Job from jobq.types import K8sResourceKind from kubernetes import client - -from jobq_server.models import ExecutionMode, SubmissionContext, WorkloadIdentifier +from typing_extensions import override + +from jobq_server.models import ( + ExecutionMode, + ImagePullPolicy, + ImageRef, + SubmissionContext, + WorkloadIdentifier, +) from jobq_server.runner.base import Runner, _make_executor_command from jobq_server.services.k8s import KubernetesService from jobq_server.utils.k8s import ( @@ -29,7 +36,11 @@ def __init__(self, k8s: KubernetesService, **kwargs): self._k8s = k8s def _create_ray_job( - self, job: Job, image: Image, context: SubmissionContext + self, + job: Job, + image: ImageRef, + context: SubmissionContext, + pull_policy: ImagePullPolicy, ) -> dict: """Create a ``RayJob`` Kubernetes resource for the Kuberay operator.""" @@ -49,9 +60,6 @@ def _create_ray_job( suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=4)) job_id = f"{job.name}-{suffix}" - # FIXME: Image pull policy should be configurable - # It is currently hardcoded to "IfNotPresent" to support running - # the E2E tests in a cluster without a proper image registry. manifest = { "apiVersion": "ray.io/v1", "kind": "RayJob", @@ -78,8 +86,8 @@ def _create_ray_job( "containers": [ { "name": "head", - "image": image.tag, - "imagePullPolicy": "IfNotPresent", + "image": image, + "imagePullPolicy": pull_policy.value, "resources": { "requests": res_opts.to_kubernetes( kind=K8sResourceKind.REQUESTS @@ -100,8 +108,8 @@ def _create_ray_job( "containers": [ { "name": "ray-submit", - "image": image.tag, - "imagePullPolicy": "IfNotPresent", + "image": image, + "imagePullPolicy": pull_policy.value, } ], }, @@ -111,14 +119,19 @@ def _create_ray_job( return manifest + @override def run( - self, job: Job, image: Image, context: SubmissionContext + self, + job: Job, + image: ImageRef, + context: SubmissionContext, + pull_policy: ImagePullPolicy, ) -> WorkloadIdentifier: logging.info( f"Submitting RayJob {job.name} to namespace {self._k8s.namespace!r}" ) - manifest = self._create_ray_job(job, image, context) + manifest = self._create_ray_job(job, image, context, pull_policy) api = client.CustomObjectsApi() obj = api.create_namespaced_custom_object( "ray.io", "v1", self._k8s.namespace, "rayjobs", manifest diff --git a/backend/tests/e2e/conftest.py b/backend/tests/e2e/conftest.py index b63e7e7..3340605 100644 --- a/backend/tests/e2e/conftest.py +++ b/backend/tests/e2e/conftest.py @@ -66,6 +66,11 @@ def cluster() -> Generator[KubernetesCluster, None, None]: """Create a Kubernetes cluster for testing based on environment variable""" cluster_type = os.getenv("E2E_CLUSTER_TYPE", "minikube").lower() context = os.getenv("E2E_K8S_CONTEXT") + + # Don't install dependencies into an external cluster + install_kuberay = context is None + install_kueue = context is None + if cluster_type == "minikube": cluster = MinikubeCluster(name=context) else: @@ -74,8 +79,10 @@ def cluster() -> Generator[KubernetesCluster, None, None]: try: # Install Kuberay first, so that the CRDs (RayJob, RayCluster) # are available for Kueue to be watched. - setup_kuberay(cluster) - setup_kueue(cluster) + if install_kuberay: + setup_kuberay(cluster) + if install_kueue: + setup_kueue(cluster) yield cluster finally: cluster.delete() diff --git a/backend/tests/e2e/test_jobs.py b/backend/tests/e2e/test_jobs.py index 240ffd7..f079336 100644 --- a/backend/tests/e2e/test_jobs.py +++ b/backend/tests/e2e/test_jobs.py @@ -45,6 +45,8 @@ def test_job_lifecycle( ), resources=ResourceOptions(cpu="1", memory="512Mi"), ), + # e2e test image is built locally and sideloaded into the cluster + pull_policy="IfNotPresent", ) # Submit a job for execution diff --git a/backend/uv.lock b/backend/uv.lock index 54f7afa..9e42969 100644 --- a/backend/uv.lock +++ b/backend/uv.lock @@ -7,8 +7,8 @@ resolution-markers = [ [[package]] name = "aai-jobq" -version = "0.1.0rc4.dev74+g28c0261" -source = { git = "https://github.com/aai-institute/jobq?subdirectory=client&branch=main#28c0261faa3d50423c8cae1bcf6e2ac6a7edcd3a" } +version = "0.1.0rc4.dev90+gabdc993" +source = { git = "https://github.com/aai-institute/jobq?subdirectory=client&branch=main#abdc993bdfdafaf535640e0bbb1b4819e3027dfb" } dependencies = [ { name = "pydantic" }, { name = "pyyaml" }, @@ -16,7 +16,7 @@ dependencies = [ [[package]] name = "aai-jobq-server" -version = "0.1.0rc8.dev11+g28c0261.d20241007" +version = "0.1.0rc8.dev34+g82b02d7.d20241025" source = { editable = "." } dependencies = [ { name = "aai-jobq" }, diff --git a/client/src/cli/commands/submit.py b/client/src/cli/commands/submit.py index cd5c5a1..f1cb428 100644 --- a/client/src/cli/commands/submit.py +++ b/client/src/cli/commands/submit.py @@ -11,6 +11,7 @@ from jobq import Image, Job from jobq.submission_context import SubmissionContext from openapi_client import ExecutionMode +from openapi_client.models.image_pull_policy import ImagePullPolicy def submit(args: argparse.Namespace, settings: Settings) -> None: @@ -32,6 +33,7 @@ def _submit_remote_job( client: openapi_client.JobManagementApi, job: Job, mode: ExecutionMode, + pull_policy: ImagePullPolicy, settings: Settings, ) -> None: # Job options sent to server do not need image options @@ -44,6 +46,7 @@ def _submit_remote_job( file=job.file, image_ref=_build_image(job, mode).tag, mode=mode, + pull_policy=openapi_client.ImagePullPolicy(pull_policy.value), options=openapi_client.JobOptions.model_validate(job.options.model_dump()), submission_context=SubmissionContext().to_dict(), ) @@ -57,13 +60,14 @@ def submit_job( settings: Settings, ) -> None: mode = args.mode + pull_policy = args.pull_policy logging.debug(f"Execution mode: {mode}") match mode: case ExecutionMode.LOCAL: # Run the job locally job() case _: - _submit_remote_job(job, mode, settings=settings) + _submit_remote_job(job, mode, pull_policy, settings=settings) def discover_job(args: argparse.Namespace) -> Job: @@ -120,6 +124,12 @@ def add_parser(subparsers: Any, parent: argparse.ArgumentParser) -> None: choices=list(ExecutionMode), type=ExecutionMode, ) + parser.add_argument( + "--pull-policy", + default=ImagePullPolicy.ALWAYS, + choices=list(ImagePullPolicy), + type=ImagePullPolicy, + ) parser.add_argument("entrypoint") # TODO: Factor out into command class diff --git a/client/src/openapi_client/__init__.py b/client/src/openapi_client/__init__.py index 9374f1f..527c6a6 100644 --- a/client/src/openapi_client/__init__.py +++ b/client/src/openapi_client/__init__.py @@ -33,6 +33,7 @@ from openapi_client.models.create_job_model import CreateJobModel from openapi_client.models.execution_mode import ExecutionMode from openapi_client.models.http_validation_error import HTTPValidationError +from openapi_client.models.image_pull_policy import ImagePullPolicy from openapi_client.models.job_options import JobOptions from openapi_client.models.job_status import JobStatus from openapi_client.models.list_workload_model import ListWorkloadModel diff --git a/client/src/openapi_client/api_client.py b/client/src/openapi_client/api_client.py index 0bdd2ee..d622a3e 100644 --- a/client/src/openapi_client/api_client.py +++ b/client/src/openapi_client/api_client.py @@ -397,7 +397,7 @@ def deserialize( data = "" else: data = json.loads(response_text) - elif re.match(r"^text/plain\s*(;|$)", content_type, re.IGNORECASE): + elif re.match(r"^text\/[a-z.+-]+\s*(;|$)", content_type, re.IGNORECASE): data = response_text else: raise ApiException( diff --git a/client/src/openapi_client/models/__init__.py b/client/src/openapi_client/models/__init__.py index 6e174e1..093a8b7 100644 --- a/client/src/openapi_client/models/__init__.py +++ b/client/src/openapi_client/models/__init__.py @@ -16,6 +16,7 @@ from openapi_client.models.create_job_model import CreateJobModel from openapi_client.models.execution_mode import ExecutionMode from openapi_client.models.http_validation_error import HTTPValidationError +from openapi_client.models.image_pull_policy import ImagePullPolicy from openapi_client.models.job_options import JobOptions from openapi_client.models.job_status import JobStatus from openapi_client.models.list_workload_model import ListWorkloadModel diff --git a/client/src/openapi_client/models/create_job_model.py b/client/src/openapi_client/models/create_job_model.py index 9f0ce16..ef7a577 100644 --- a/client/src/openapi_client/models/create_job_model.py +++ b/client/src/openapi_client/models/create_job_model.py @@ -20,6 +20,7 @@ from typing_extensions import Self from openapi_client.models.execution_mode import ExecutionMode +from openapi_client.models.image_pull_policy import ImagePullPolicy from openapi_client.models.job_options import JobOptions @@ -33,6 +34,7 @@ class CreateJobModel(BaseModel): image_ref: StrictStr mode: ExecutionMode options: JobOptions + pull_policy: ImagePullPolicy | None = None submission_context: dict[str, Any] | None = None __properties: ClassVar[list[str]] = [ "name", @@ -40,6 +42,7 @@ class CreateJobModel(BaseModel): "image_ref", "mode", "options", + "pull_policy", "submission_context", ] @@ -102,6 +105,7 @@ def from_dict(cls, obj: dict[str, Any] | None) -> Self | None: "options": JobOptions.from_dict(obj["options"]) if obj.get("options") is not None else None, + "pull_policy": obj.get("pull_policy"), "submission_context": obj.get("submission_context"), }) return _obj diff --git a/client/src/openapi_client/models/image_pull_policy.py b/client/src/openapi_client/models/image_pull_policy.py new file mode 100644 index 0000000..b134d5b --- /dev/null +++ b/client/src/openapi_client/models/image_pull_policy.py @@ -0,0 +1,35 @@ +""" +the jobq cluster workflow management tool backend + +Backend service for the appliedAI infrastructure product + +The version of the OpenAPI document: 0.1.0 +Generated by OpenAPI Generator (https://openapi-generator.tech) + +Do not edit the class manually. +""" # noqa: E501 + +from __future__ import annotations + +import json +from enum import Enum + +from typing_extensions import Self + + +class ImagePullPolicy(str, Enum): + """ + ImagePullPolicy + """ + + """ + allowed enum values + """ + ALWAYS = "Always" + NEVER = "Never" + IFNOTPRESENT = "IfNotPresent" + + @classmethod + def from_json(cls, json_str: str) -> Self: + """Create an instance of ImagePullPolicy from a JSON string""" + return cls(json.loads(json_str)) diff --git a/client/src/openapi_client/rest.py b/client/src/openapi_client/rest.py index 04c0667..47aa2f6 100644 --- a/client/src/openapi_client/rest.py +++ b/client/src/openapi_client/rest.py @@ -205,7 +205,9 @@ def request( headers=headers, preload_content=False, ) - elif headers["Content-Type"] == "text/plain" and isinstance(body, bool): + elif headers["Content-Type"].startswith("text/") and isinstance( + body, bool + ): request_body = "true" if body else "false" r = self.pool_manager.request( method, diff --git a/docs/guide/execution-modes.md b/docs/guide/execution-modes.md index b44beab..23bbd25 100644 --- a/docs/guide/execution-modes.md +++ b/docs/guide/execution-modes.md @@ -35,7 +35,7 @@ Currently, jobq supports two cluster-based execution modes: By default, Kubernetes batch jobs are submitted using the following default parameters: - Job parallelism is set to 1 -- Image pull policy is set to `IfNotPresent` +- Image pull policy is set to `Always` - Backoff limit set to 6 (exponential backoff applies to retried jobs) - Resource requests are applied as `limits == requests` @@ -46,7 +46,7 @@ The default options for Ray jobs submitted by jobq are as follows: - Single head node, no worker nodes - Job clusters are shut down automatically after job finishes - Job image is used for worker nodes and job submission pod -- Image pull policy is set to `IfNotPresent` +- Image pull policy is set to `Always` - Resource requests are applied as `limits == requests` ## Choosing an execution mode