diff --git a/doc/changelog.d/1635.added.md b/doc/changelog.d/1635.added.md new file mode 100644 index 0000000000..de091fa368 --- /dev/null +++ b/doc/changelog.d/1635.added.md @@ -0,0 +1 @@ +Adding CLI for batch submission diff --git a/doc/changelog.d/1640.added.md b/doc/changelog.d/1640.added.md new file mode 100644 index 0000000000..141536f469 --- /dev/null +++ b/doc/changelog.d/1640.added.md @@ -0,0 +1 @@ +Job manager concurrent job bug diff --git a/doc/source/workflows/drc/drc.rst b/doc/source/workflows/drc/drc.rst index adb2323347..5b410ed54d 100644 --- a/doc/source/workflows/drc/drc.rst +++ b/doc/source/workflows/drc/drc.rst @@ -1,7 +1,7 @@ .. _ref_drc: ================================================================== -Design-rule checking (DRC)–self-contained, multi-threaded engine +Design-rule checking (DRC)—self-contained, multi-threaded engine ================================================================== .. currentmodule:: pyedb.workflows.drc.drc @@ -85,7 +85,7 @@ Rule models BackDrillStubLength CopperBalance -DRC engine +DRC Engine ~~~~~~~~~~ .. autosummary:: @@ -129,7 +129,7 @@ Load a rule deck from JSON rules = Rules.from_dict(json.load(f)) Export violations to CSV -~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python diff --git a/doc/source/workflows/job_manager/submit_job.rst b/doc/source/workflows/job_manager/submit_job.rst index 7685b331e5..23b12f2c15 100644 --- a/doc/source/workflows/job_manager/submit_job.rst +++ b/doc/source/workflows/job_manager/submit_job.rst @@ -44,7 +44,7 @@ It exposes: * REST & Web-Socket endpoints (``http://localhost:8080`` by default) * Thread-safe synchronous façade for scripts / Jupyter * Native async API for advanced integrations -* CLI utilities ``submit_local_job`` and ``submit_job_on_scheduler`` for shell / CI pipelines +* CLI utilities ``submit_local_job``, ``submit_batch_jobs``, and ``submit_job_on_scheduler`` for shell / CI pipelines The **same backend code path** is used regardless of front-end style; the difference is **who owns the event loop** and **how control is returned to the caller**. @@ -176,6 +176,130 @@ Example—CLI (cluster) The command returns immediately after the job is **queued**; use the printed ID with ``wait_until_done`` or monitor via the web UI. +CLI—``submit_batch_jobs`` +^^^^^^^^^^^^^^^^^^^^^^^^^^ +For bulk submissions, use ``submit_batch_jobs`` to automatically discover and submit +multiple projects from a directory tree. + +Synopsis +"""""""" +.. code-block:: bash + + $ python submit_batch_jobs.py --root-dir [options] + +Key features +"""""""""""" +* **Automatic discovery**: Scans for all ``.aedb`` folders and ``.aedt`` files +* **Smart pairing**: When both ``.aedb`` and ``.aedt`` exist, uses the ``.aedt`` file +* **Asynchronous submission**: Submits jobs concurrently for faster processing +* **Recursive scanning**: Optional recursive directory traversal + +Options +""""""" +.. list-table:: + :widths: 30 15 55 + :header-rows: 1 + + * - Argument + - Default + - Description + * - ``--root-dir`` + - *(required)* + - Root directory to scan for projects + * - ``--host`` + - ``localhost`` + - Job manager host address + * - ``--port`` + - ``8080`` + - Job manager port + * - ``--num-cores`` + - ``8`` + - Number of cores to allocate per job + * - ``--max-concurrent`` + - ``5`` + - Maximum concurrent job submissions + * - ``--delay-ms`` + - ``100`` + - Delay in milliseconds between job submissions + * - ``--recursive`` + - ``False`` + - Scan subdirectories recursively + * - ``--verbose`` + - ``False`` + - Enable debug logging + +Example—batch submission (local) +""""""""""""""""""""""""""""""""" +.. code-block:: bash + + # Submit all projects in a directory + $ python submit_batch_jobs.py --root-dir "D:\Temp\test_jobs" + + # Recursive scan with custom core count + $ python submit_batch_jobs.py \ + --root-dir "D:\Projects\simulations" \ + --num-cores 16 \ + --recursive \ + --verbose + +Example output +"""""""""""""" +.. code-block:: text + + 2025-11-07 10:30:15 - __main__ - INFO - Scanning D:\Temp\test_jobs for projects (recursive=False) + 2025-11-07 10:30:15 - __main__ - INFO - Found AEDB folder: D:\Temp\test_jobs\project1.aedb + 2025-11-07 10:30:15 - __main__ - INFO - Found AEDT file: D:\Temp\test_jobs\project2.aedt + 2025-11-07 10:30:15 - __main__ - INFO - Using AEDB folder for project: D:\Temp\test_jobs\project1.aedb + 2025-11-07 10:30:15 - __main__ - INFO - Using standalone AEDT file: D:\Temp\test_jobs\project2.aedt + 2025-11-07 10:30:15 - __main__ - INFO - Found 2 project(s) to submit + 2025-11-07 10:30:15 - __main__ - INFO - Starting batch submission of 2 project(s) to http://localhost:8080 + 2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project1.aedb (status=200) + 2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project2.aedt (status=200) + 2025-11-07 10:30:16 - __main__ - INFO - ============================================================ + 2025-11-07 10:30:16 - __main__ - INFO - Batch submission complete: + 2025-11-07 10:30:16 - __main__ - INFO - Total projects: 2 + 2025-11-07 10:30:16 - __main__ - INFO - ✓ Successful: 2 + 2025-11-07 10:30:16 - __main__ - INFO - ✗ Failed: 0 + 2025-11-07 10:30:16 - __main__ - INFO - ============================================================ + +How it works +"""""""""""" +1. **Scanning phase**: + + * Searches for all ``.aedb`` folders in the root directory + * Searches for all ``.aedt`` files in the root directory + * For each ``.aedb`` folder, checks if a corresponding ``.aedt`` file exists: + + - If yes: Uses the ``.aedt`` file + - If no: Uses the ``.aedb`` folder + + * Standalone ``.aedt`` files (without corresponding ``.aedb``) are also included + +2. **Submission phase**: + + * Creates job configurations for each project + * Submits jobs asynchronously to the job manager REST API + * Limits concurrent submissions using a semaphore (default: 5) + * Reports success/failure for each submission + +3. **Results**: + + * Displays a summary with total, successful, and failed submissions + * Logs detailed information about each submission + +.. note:: + The script does **not** wait for jobs to complete, only for submission confirmation. + Job execution happens asynchronously in the job manager service. + +.. tip:: + * Use ``--max-concurrent`` to limit load on the job manager service when submitting + large batches. + * Use ``--delay-ms`` to control the pause between submissions (default: 100ms). + This ensures HTTP requests are fully sent before the next submission starts. + * Set ``--delay-ms 0`` to disable the delay if your network is very fast and reliable. + * For very large batch submissions, consider increasing the timeout in the code if + network latency is high. + Programmatic—native asyncio """"""""""""""""""""""""""""" .. code-block:: python diff --git a/ignore_words.txt b/ignore_words.txt index c8fa792f47..7f78ee4977 100644 --- a/ignore_words.txt +++ b/ignore_words.txt @@ -32,3 +32,7 @@ aline COM gRPC Toolkits +Cohn +Pydantic +pydantic +Drc diff --git a/src/pyedb/workflows/job_manager/backend/job_manager_handler.py b/src/pyedb/workflows/job_manager/backend/job_manager_handler.py index 9b01f3e504..bbcbd70256 100644 --- a/src/pyedb/workflows/job_manager/backend/job_manager_handler.py +++ b/src/pyedb/workflows/job_manager/backend/job_manager_handler.py @@ -231,8 +231,9 @@ def __init__(self, edb=None, version=None, host="localhost", port=8080): else: self.ansys_path = os.path.join(installed_versions[version], "ansysedt.exe") self.scheduler_type = self._detect_scheduler() - self.manager = JobManager(scheduler_type=self.scheduler_type) - self.manager.resource_limits = ResourceLimits(max_concurrent_jobs=1) + # Create resource limits with default values + resource_limits = ResourceLimits(max_concurrent_jobs=1) + self.manager = JobManager(resource_limits=resource_limits, scheduler_type=self.scheduler_type) self.manager.jobs = {} # In-memory job store -TODO add persistence database # Pass the detected ANSYS path to the manager self.manager.ansys_path = self.ansys_path diff --git a/src/pyedb/workflows/job_manager/backend/job_submission.py b/src/pyedb/workflows/job_manager/backend/job_submission.py index 855339981b..38a15775c7 100644 --- a/src/pyedb/workflows/job_manager/backend/job_submission.py +++ b/src/pyedb/workflows/job_manager/backend/job_submission.py @@ -68,6 +68,7 @@ from datetime import datetime import enum import getpass +import hashlib import logging import os import platform @@ -77,6 +78,7 @@ import subprocess # nosec B404 import tempfile from typing import Any, Dict, List, Optional, Union +import uuid from pydantic import BaseModel, Field @@ -468,7 +470,12 @@ def __init__(self, **data): else: self.ansys_edt_path = os.path.join(list(installed_versions.values())[-1], "ansysedt.exe") # latest if not self.jobid: - self.jobid = f"JOB_ID_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + # Generate unique job ID using timestamp and UUID to avoid collisions + # when submitting multiple jobs rapidly (batch submissions) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Use short UUID (first 8 chars) for readability while ensuring uniqueness + unique_id = str(uuid.uuid4())[:8] + self.jobid = f"JOB_{timestamp}_{unique_id}" if "auto" not in data: # user did not touch it data["auto"] = self.scheduler_type != SchedulerType.NONE self.validate_fields() diff --git a/src/pyedb/workflows/job_manager/backend/service.py b/src/pyedb/workflows/job_manager/backend/service.py index 890acd9004..576bf13afe 100644 --- a/src/pyedb/workflows/job_manager/backend/service.py +++ b/src/pyedb/workflows/job_manager/backend/service.py @@ -71,6 +71,7 @@ import logging import os.path import platform +import threading from typing import Any, Deque, Dict, List, Optional, Set import aiohttp @@ -500,6 +501,13 @@ def __init__(self, resource_limits: ResourceLimits = None, scheduler_type: Sched # Background task for continuous job processing self._processing_task: Optional[asyncio.Task] = None self._shutdown = False + # Event to wake up processing loop when limits change (created when loop is available) + self._limits_changed: Optional[asyncio.Event] = None + # Semaphore to enforce concurrent job limit (created when loop is available) + self._job_semaphore: Optional[asyncio.Semaphore] = None + # Use a thread lock to protect creation of the asyncio lock (safe to create without event loop) + self._lock_init_lock = threading.Lock() + self._processing_task_lock: Optional[asyncio.Lock] = None # Start resource monitoring immediately self._monitor_task = None self._ensure_monitor_running() @@ -1025,181 +1033,287 @@ async def submit_job(self, config: HFSSSimulationConfig, priority: int = 0) -> s """ job_id = config.jobid + # Check if job already exists and is not completed + existing_job = self.jobs.get(job_id) + if existing_job: + if existing_job.status in (JobStatus.QUEUED, JobStatus.RUNNING): + logger.warning(f"⚠️ Job {job_id} already {existing_job.status.value}, skipping duplicate submission") + return job_id + elif existing_job.status in (JobStatus.COMPLETED, JobStatus.FAILED): + logger.info(f"Job {job_id} already finished ({existing_job.status.value}), allowing resubmission") + # Create job info job_info = JobInfo(config=config, status=JobStatus.QUEUED, priority=priority) self.jobs[job_id] = job_info - # Add to job pool + # Add to job pool (only if not already queued) self.job_pool.add_job(job_id, priority) + # Get queue stats for logging + queue_stats = self.job_pool.get_queue_stats() + # Notify web clients await self.sio.emit( "job_queued", - {"job_id": job_id, "priority": priority, "queue_position": self.job_pool.get_queue_stats()["total_queued"]}, + {"job_id": job_id, "priority": priority, "queue_position": queue_stats["total_queued"]}, ) - logger.info(f"Job {job_id} queued with priority {priority}") + logger.info( + f"Job {job_id} queued with priority {priority}. Queue: {queue_stats['total_queued']}, " + f"Running: {queue_stats['running_jobs']}/{queue_stats['max_concurrent']}" + ) + + # Trigger processing if not already running (protected by lock) + logger.debug( + f"🔒 Checking processing task status: task={self._processing_task}, lock={self._processing_task_lock}" + ) - # Trigger processing if not already running - if not self._processing_task or self._processing_task.done(): - self._processing_task = asyncio.create_task(self._process_jobs_continuously()) + # Ensure asyncio lock exists using thread lock for protection + if self._processing_task_lock is None: + with self._lock_init_lock: # Thread-safe lock creation + if self._processing_task_lock is None: # Double-check pattern + logger.info("🔒 Creating new asyncio lock for processing task protection") + self._processing_task_lock = asyncio.Lock() + + logger.debug(f"🔒 Acquiring lock to check/create processing task...") + async with self._processing_task_lock: + logger.debug(f"🔒 Lock acquired! Current task: {self._processing_task}") + if not self._processing_task or self._processing_task.done(): + logger.warning(f"⚠️ CREATING NEW PROCESSING TASK (task_id will be logged)") + self._processing_task = asyncio.create_task(self._process_jobs_continuously()) + logger.warning(f"✅ NEW PROCESSING TASK CREATED: {id(self._processing_task)}") + else: + logger.debug(f"✓ Processing task already exists (id={id(self._processing_task)}), waking it up") + # Wake up the processing loop if it's waiting + if self._limits_changed is not None: + self._limits_changed.set() + logger.debug("✓ Sent wake-up signal to processing loop") return job_id async def _process_jobs_continuously(self): """ - Continuously process jobs until shutdown is requested. - - This is the main job processing loop that: - - Checks if new jobs can be started based on resource limits - - Dequeues the highest priority job - - Starts job execution in a separate task - - Sleeps when no jobs can be started or queue is empty + Main scheduler loop – strict concurrency enforcement. + Jobs are dequeued only when we have fewer than max_concurrent_jobs running. """ - logger.info("✅ Job processing loop started.") + loop_id = id(asyncio.current_task()) + logger.warning(f"🔄 PROCESSING LOOP STARTED - ID: {loop_id}") + + if self._job_semaphore is None: + self._job_semaphore = asyncio.Semaphore(self.resource_limits.max_concurrent_jobs) + + if self._limits_changed is None: + self._limits_changed = asyncio.Event() + + logger.info("✅ Job-processing loop started (concurrency = %s)", self.resource_limits.max_concurrent_jobs) + while not self._shutdown: - can_start = self.job_pool.can_start_job(self.resource_monitor) - if can_start: - next_job_id = self.job_pool.get_next_job() - if next_job_id: - logger.info(f"Dequeued job {next_job_id}. Starting...") - self.job_pool.running_jobs.add(next_job_id) - asyncio.create_task(self._process_single_job(next_job_id)) + # Wait until we have capacity to start a new job + while len(self.job_pool.running_jobs) >= self.resource_limits.max_concurrent_jobs and not self._shutdown: + logger.debug( + "At capacity (%d/%d jobs running), waiting...", + len(self.job_pool.running_jobs), + self.resource_limits.max_concurrent_jobs, + ) + try: + await asyncio.wait_for(self._limits_changed.wait(), 1.0) + self._limits_changed.clear() + except asyncio.TimeoutError: + pass # Check again + + if self._shutdown: + break + + # CRITICAL: Check capacity immediately before dequeuing + # This prevents race conditions in fast-completing jobs + current_running = len(self.job_pool.running_jobs) + if current_running >= self.resource_limits.max_concurrent_jobs: + logger.debug( + "Capacity check failed (%d/%d), waiting for slot...", + current_running, + self.resource_limits.max_concurrent_jobs, + ) + # Don't dequeue - wait for capacity + try: + await asyncio.wait_for(self._limits_changed.wait(), 0.5) + self._limits_changed.clear() + except asyncio.TimeoutError: + pass + continue + + # Get next job from queue (only if we have capacity) + next_job_id = self.job_pool.get_next_job() + if next_job_id is None: + # Queue is empty, wait for new jobs + queue_stats = self.job_pool.get_queue_stats() + running_count = len(self.job_pool.running_jobs) + + # Log at INFO level if we're idle (no jobs running or queued) + if running_count == 0: + logger.info("📭 Queue empty and no jobs running. Waiting for new job submissions...") else: - logger.info("Queue is empty, sleeping.") - await asyncio.sleep(1) - else: - logger.warning("Cannot start new job, waiting...") - await asyncio.sleep(5) + logger.debug("Queue empty but %d job(s) still running. Waiting...", running_count) - await asyncio.sleep(0.2) + try: + await asyncio.wait_for(self._limits_changed.wait(), 1.0) + self._limits_changed.clear() + except asyncio.TimeoutError: + pass # Check queue again + continue - async def _process_single_job(self, job_id: str): - """ - Process a single job from the pool. + # Skip if job is already running (duplicate prevention) + if next_job_id in self.job_pool.running_jobs: + logger.warning(f"⚠️ Job {next_job_id} already running, skipping duplicate") + continue - Parameters - ---------- - job_id : str - Job identifier to process + # Get the job info to check and update status + job_info = self.jobs.get(next_job_id) + if not job_info: + logger.error(f"Job {next_job_id} not found in jobs dict, skipping") + continue - Notes - ----- - This method handles: - - Local execution via subprocess - - Scheduler submission (SLURM/LSF) - - Status updates and notifications - - Error handling and cleanup + # FINAL capacity check before committing (prevents race with job completion) + if len(self.job_pool.running_jobs) >= self.resource_limits.max_concurrent_jobs: + logger.warning(f"⚠️ Capacity exceeded after dequeue (race condition), re-queueing {next_job_id}") + # Put job back in queue + self.job_pool.add_job(next_job_id, job_info.priority) + continue + + # Mark job as RUNNING immediately to prevent re-dequeuing + job_info.status = JobStatus.RUNNING + job_info.start_time = datetime.now() + + # Mark as running BEFORE creating task to prevent race condition + self.job_pool.running_jobs.add(next_job_id) + logger.info( + "De-queued job %s. Starting... (running: %s/%s) [Loop ID: %s]", + next_job_id, + len(self.job_pool.running_jobs), + self.resource_limits.max_concurrent_jobs, + loop_id, + ) + + # Start the job (no semaphore needed now, we use running_jobs count directly) + asyncio.create_task(self._process_single_job(next_job_id, semaphore=None)) + + # Small yield to prevent tight loop + await asyncio.sleep(0) + + async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | None = None): + """ + Execute one job and always clean up running_jobs set when done. + Semaphore parameter is deprecated but kept for compatibility. + + NOTE: Job status should already be RUNNING when this is called, + as it's set in the processing loop when the job is dequeued. """ job_info = self.jobs.get(job_id) - if not job_info or job_info.status != JobStatus.QUEUED: + if not job_info: + logger.error(f"Job {job_id} not found in jobs dict") self.job_pool.running_jobs.discard(job_id) + # Signal that capacity is available + if self._limits_changed is not None: + self._limits_changed.set() return - # Update job status - job_info.status = JobStatus.RUNNING - job_info.start_time = datetime.now() + # Verify job is in correct state (should be RUNNING already) + if job_info.status != JobStatus.RUNNING: + logger.warning(f"Job {job_id} has unexpected status {job_info.status.value}, expected RUNNING") + # Don't return - try to process anyway but log the issue + + # Record local resources (start_time should already be set) + if not job_info.start_time: + job_info.start_time = datetime.now() job_info.local_resources = self.resource_monitor.current_usage.copy() - # Notify web clients await self.sio.emit( "job_started", {"job_id": job_id, "start_time": job_info.start_time.isoformat(), "resources": job_info.local_resources}, ) - - logger.info(f"Job {job_id} started") + logger.info( + "Job %s actually executing (CPU %.1f %%)", job_id, self.resource_monitor.current_usage["cpu_percent"] + ) try: - # Run the simulation if job_info.config.scheduler_type != SchedulerType.NONE: - # Make sure the executable path is present + # ---------- scheduler path ---------- if not job_info.config.ansys_edt_path or not os.path.exists(job_info.config.ansys_edt_path): if self.ansys_path and os.path.exists(self.ansys_path): job_info.config = HFSSSimulationConfig( **{**job_info.config.model_dump(), "ansys_edt_path": self.ansys_path} ) - logger.info(f"Using JobManager's detected ANSYS path: {self.ansys_path}") else: - raise FileNotFoundError( - f"ANSYS executable not found. Config path: {job_info.config.ansys_edt_path}, " - f"Manager path: {self.ansys_path}" - ) + raise FileNotFoundError("ANSYS executable not found") - # Now generate the script – the path is guaranteed to be non-empty result = job_info.config.submit_to_scheduler() job_info.scheduler_job_id = job_info.config._extract_job_id(result.stdout) job_info.status = JobStatus.SCHEDULED - logger.info( - f"Job {job_id} submitted to scheduler with ID: {job_info.scheduler_job_id}, status: SCHEDULED" - ) await self.sio.emit("job_scheduled", {"job_id": job_id, "scheduler_job_id": job_info.scheduler_job_id}) + # For scheduler jobs, remove from running_jobs since they run on external scheduler + # This frees up a slot for the next local job + self.job_pool.running_jobs.discard(job_id) + if self._limits_changed is not None: + self._limits_changed.set() + return - else: - # ---------------- local mode – same guarantee ----------------- - if not job_info.config.ansys_edt_path or not os.path.exists(job_info.config.ansys_edt_path): - if self.ansys_path and os.path.exists(self.ansys_path): - job_info.config = HFSSSimulationConfig( - **{**job_info.config.model_dump(), "ansys_edt_path": self.ansys_path} - ) - logger.info(f"Using JobManager's detected ANSYS path: {self.ansys_path}") - else: - raise FileNotFoundError( - f"ANSYS executable not found. Config path: {job_info.config.ansys_edt_path}, " - f"Manager path: {self.ansys_path}" - ) - - # Generate command as list for secure execution - command_list = job_info.config.generate_command_list() - - # Log the command being executed for debugging - logger.info(f"Executing command for job {job_id}: {' '.join(command_list)}") - logger.info(f"ANSYS executable path: {job_info.config.ansys_edt_path}") - logger.info(f"Project path: {job_info.config.project_path}") - - # Check if project file exists - if not os.path.exists(job_info.config.project_path): - raise FileNotFoundError(f"Project file not found: {job_info.config.project_path}") - - # Run locally - using asyncio subprocess for better control with secure command list - process = await asyncio.create_subprocess_exec( - *command_list, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - job_info.process = process - - # Wait for completion with timeout (24 hours max) - try: - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=86400) + # ---------- local path ---------- + command_list = job_info.config.generate_command_list() + process = await asyncio.create_subprocess_exec( + *command_list, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + job_info.process = process - job_info.return_code = process.returncode - job_info.output = stdout.decode() if stdout else "" - job_info.error = stderr.decode() if stderr else "" + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=86400) + job_info.return_code = process.returncode + job_info.output = stdout.decode() if stdout else "" + job_info.error = stderr.decode() if stderr else "" - if process.returncode == 0: - job_info.status = JobStatus.COMPLETED - logger.info(f"Job {job_id} completed successfully") - else: - job_info.status = JobStatus.FAILED - logger.error(f"Job {job_id} failed with return code {process.returncode}") + if process.returncode == 0: + job_info.status = JobStatus.COMPLETED + logger.info("Job %s completed successfully", job_id) + else: + job_info.status = JobStatus.FAILED + logger.error("Job %s failed with return code %s", job_id, process.returncode) - except asyncio.TimeoutError: - job_info.status = JobStatus.FAILED - job_info.error = "Job timed out after 24 hours" - process.terminate() - logger.error(f"Job {job_id} timed out") + except asyncio.TimeoutError: + job_info.status = JobStatus.FAILED + job_info.error = "Job timed out after 24 h" + if job_info.process: + job_info.process.kill() + logger.error("Job %s timed out", job_id) - except Exception as e: + except Exception as exc: job_info.status = JobStatus.FAILED - job_info.error = str(e) - logger.error(f"Job {job_id} failed with error: {e}") + job_info.error = str(exc) + logger.error("Job %s failed with error: %s", job_id, exc) finally: job_info.end_time = datetime.now() self.job_pool.running_jobs.discard(job_id) - # Notify web clients + # Log completion with queue stats + queue_stats = self.job_pool.get_queue_stats() + running_count = len(self.job_pool.running_jobs) + queued_count = queue_stats["total_queued"] + + if running_count == 0 and queued_count == 0: + logger.info("✓ Job %s finished. ✅ ALL JOBS COMPLETE! (No jobs running or queued)", job_id) + else: + logger.info( + "✓ Job %s finished. Running: %d/%d, Queued: %d", + job_id, + running_count, + self.resource_limits.max_concurrent_jobs, + queued_count, + ) + + # Signal processing loop that capacity is now available + if self._limits_changed is not None: + self._limits_changed.set() + logger.debug("✓ Signaled processing loop that slot is available") + await self.sio.emit( "job_completed", { @@ -1338,6 +1452,11 @@ async def edit_concurrent_limits(self, update_data: Dict[str, Any]) -> Optional[ f"{getattr(self.resource_limits, field)}" ) + # Signal processing loop to re-evaluate capacity + # (We no longer manipulate semaphore since we use running_jobs count directly) + if self._limits_changed is not None: + self._limits_changed.set() + # Notify web clients about the update await self.sio.emit( "limits_updated", diff --git a/src/pyedb/workflows/job_manager/backend/start_service.py b/src/pyedb/workflows/job_manager/backend/start_service.py index 81ed43801d..53c6a45dbb 100644 --- a/src/pyedb/workflows/job_manager/backend/start_service.py +++ b/src/pyedb/workflows/job_manager/backend/start_service.py @@ -26,8 +26,10 @@ Usage ----- -$ python start_service.py --host 0.0.0.0 --port 9090 --min-disk 2 --min-memory 1 +$ python start_service.py --host 0.0.0.0 --port 9090 --max-concurrent 4 --min-disk 2 --min-memory 1 ✅ Job-manager backend listening on http://0.0.0.0:9090 + Max concurrent jobs: 4 + Resource gates: 2.0 GB disk / 1.0 GB memory Press Ctrl-C to shut down gracefully. """ @@ -53,6 +55,12 @@ def parse_cli() -> argparse.Namespace: default=8080, help="TCP port to listen on (default: 8080)", ) + parser.add_argument( + "--max-concurrent", + type=int, + default=4, + help="Maximum number of concurrent jobs for local runs (default: 4)", + ) parser.add_argument( "--min-disk", type=float, @@ -65,6 +73,12 @@ def parse_cli() -> argparse.Namespace: default=2.0, help="Minimum free memory in GB (default: 2.0)", ) + parser.add_argument( + "--max-concurrent-jobs", + type=int, + default=1, + help="Maximum number of concurrent jobs for local runs (default: 1)", + ) return parser.parse_args() @@ -74,13 +88,19 @@ def main(): handler = JobManagerHandler(host=args.host, port=args.port) # Override resource limits from CLI + handler.manager.resource_limits.max_concurrent_jobs = args.max_concurrent handler.manager.resource_limits.min_disk_gb = args.min_disk handler.manager.resource_limits.min_memory_gb = args.min_memory + handler.manager.resource_limits.max_concurrent_jobs = args.max_concurrent_jobs + # Ensure job_pool has the updated reference + handler.manager.job_pool.resource_limits = handler.manager.resource_limits handler.start_service() # non-blocking; spins up daemon thread + aiohttp print(f"✅ Job-manager backend listening on http://{handler.host}:{handler.port}") + print(f" Max concurrent jobs: {args.max_concurrent}") print(f" Resource gates: {args.min_disk} GB disk / {args.min_memory} GB memory") + print(f" Max concurrent jobs: {args.max_concurrent_jobs}") # Graceful shutdown on Ctrl-C stop_event = threading.Event() diff --git a/src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py b/src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py new file mode 100644 index 0000000000..8bb6368649 --- /dev/null +++ b/src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2023 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +Submit multiple HFSS jobs to the local job-manager service by scanning a directory. +Job submissions are done via REST API asynchronously. +The service must be running locally (default: localhost:8080) prior to executing this script. +To start the service, run this command in another terminal: + python -m pyedb.workflows.job_manager.backend.job_manager_handler + +Usage examples +-------------- +# Get help +python submit_batch_jobs.py --help + +# Submit all projects in a directory with explicit values +python submit_batch_jobs.py \ + --host 127.0.0.1 \ + --port 8080 \ + --root-dir "D:\\Temp\\test_jobs" \ + --num-cores 8 + +# Use defaults (localhost:8080, 8 cores) +python submit_batch_jobs.py --root-dir "D:\\Temp\\test_jobs" + +# Recursive scan +python submit_batch_jobs.py --root-dir "D:\\Temp\\test_jobs" --recursive +""" + +import argparse +import asyncio +import logging +from pathlib import Path +import sys +from typing import Any, List, cast + +import aiohttp + +from pyedb.workflows.job_manager.backend.job_submission import ( + MachineNode, + create_hfss_config, +) + +logger = logging.getLogger(__name__) + + +def scan_projects(root_dir: Path, recursive: bool = False) -> List[Path]: + """ + Scan a directory for AEDB folders and AEDT files. + + For each AEDB folder found, check if a corresponding AEDT file exists. + If it does, use the AEDT file; otherwise, use the AEDB folder. + + Parameters + ---------- + root_dir : Path + Root directory to scan for projects. + recursive : bool, optional + If True, scan subdirectories recursively. Default is False. + + Returns + ------- + List[Path] + List of project paths (either .aedt files or .aedb folders) to submit. + """ + projects = [] + aedb_folders = set() + aedt_files = set() + + # Determine the glob pattern based on recursive flag + glob_pattern = "**/*" if recursive else "*" + + logger.info("Scanning %s for projects (recursive=%s)", root_dir, recursive) + + # Find all .aedb folders + for path in root_dir.glob(glob_pattern): + if path.is_dir() and path.suffix == ".aedb": + aedb_folders.add(path) + logger.debug("Found AEDB folder: %s", path) + + # Find all .aedt files + if recursive: + for path in root_dir.glob("**/*.aedt"): + if path.is_file(): + aedt_files.add(path) + logger.debug("Found AEDT file: %s", path) + else: + for path in root_dir.glob("*.aedt"): + if path.is_file(): + aedt_files.add(path) + logger.debug("Found AEDT file: %s", path) + + # Process AEDB folders: check if corresponding AEDT exists + for aedb_folder in aedb_folders: + # Check for corresponding .aedt file (same name, same directory) + corresponding_aedt = aedb_folder.with_suffix(".aedt") + + if corresponding_aedt in aedt_files: + # Use the AEDT file and remove it from the set to avoid duplicates + projects.append(corresponding_aedt) + aedt_files.discard(corresponding_aedt) + logger.info("Using AEDT file for project: %s", corresponding_aedt) + else: + # Use the AEDB folder + projects.append(aedb_folder) + logger.info("Using AEDB folder for project: %s", aedb_folder) + + # Add remaining AEDT files that don't have corresponding AEDB folders + for aedt_file in aedt_files: + projects.append(aedt_file) + logger.info("Using standalone AEDT file: %s", aedt_file) + + # Sort for consistent ordering + projects.sort() + + logger.info("Found %d project(s) to submit", len(projects)) + return projects + + +async def submit_single_job( + session: aiohttp.ClientSession, + backend_url: str, + project_path: Path, + num_cores: int, + priority: int = 0, +) -> tuple[Path, bool, Any]: + """ + Submit a single job to the job manager. + + Parameters + ---------- + session : aiohttp.ClientSession + Async HTTP session for making requests. + backend_url : str + Base URL of the job manager service. + project_path : Path + Path to the project file or folder. + num_cores : int + Number of CPU cores to allocate. + priority : int, optional + Job priority (default: 0). + + Returns + ------- + tuple[Path, bool, Any] + Tuple of (project_path, success_flag, response_data). + """ + try: + cfg = create_hfss_config(project_path=str(project_path)) + + # Ensure we have at least one machine node and configure it + if not cfg.machine_nodes: + cfg.machine_nodes.append(MachineNode()) + + # Use a typed reference to avoid static-analysis/indexing warnings + node = cast(MachineNode, cfg.machine_nodes[0]) + node.cores = num_cores + node.max_cores = num_cores + + async with session.post( + f"{backend_url}/jobs/submit", + json={ + "config": cfg.model_dump(mode="json", exclude_defaults=False), + "priority": priority, + }, + ) as resp: + status = resp.status + # Try to parse JSON reply safely; fall back to text + try: + reply: Any = await resp.json() + except Exception: + reply = await resp.text() + + success = 200 <= status < 300 + if success: + logger.info("✓ Successfully submitted: %s (status=%s)", project_path.name, status) + else: + logger.error("✗ Failed to submit: %s (status=%s): %s", project_path.name, status, reply) + + return (project_path, success, reply) + + except asyncio.CancelledError: + # Re-raise cancellation so callers can handle it + raise + except Exception as exc: + logger.exception("✗ Exception submitting %s: %s", project_path.name, exc) + return (project_path, False, str(exc)) + + +async def submit_batch_jobs( + *, + host: str, + port: int, + projects: List[Path], + num_cores: int, + max_concurrent: int = 5, + delay_ms: int = 100, +) -> None: + """ + Submit multiple jobs asynchronously to the job manager. + + Parameters + ---------- + host : str + Job manager host address. + port : int + Job manager port. + projects : List[Path] + List of project paths to submit. + num_cores : int + Number of CPU cores to allocate per job. + max_concurrent : int, optional + Maximum number of concurrent submissions (default: 5). + delay_ms : int, optional + Delay in milliseconds between job submissions (default: 100). + """ + backend_url = f"http://{host}:{port}" + + logger.info("Starting batch submission of %d project(s) to %s", len(projects), backend_url) + logger.info("Max concurrent submissions: %d", max_concurrent) + logger.info("Delay between submissions: %d ms", delay_ms) + + # Use a reasonable timeout for network operations + timeout = aiohttp.ClientTimeout(total=120) + + async with aiohttp.ClientSession(timeout=timeout) as session: + # Create a semaphore to limit concurrent requests + semaphore = asyncio.Semaphore(max_concurrent) + + async def submit_with_semaphore(project: Path) -> tuple[Path, bool, Any]: + async with semaphore: + result = await submit_single_job(session, backend_url, project, num_cores) + # Add a small delay to ensure HTTP request is fully sent before next submission + await asyncio.sleep(delay_ms / 1000.0) + return result + + # Submit all jobs concurrently (but limited by semaphore) + tasks = [submit_with_semaphore(project) for project in projects] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Summarize results + successful = 0 + failed = 0 + + for result in results: + if isinstance(result, Exception): + logger.error("Task raised exception: %s", result) + failed += 1 + else: + project_path, success, _reply = result + if success: + successful += 1 + else: + failed += 1 + + logger.info("=" * 60) + logger.info("Batch submission complete:") + logger.info(" Total projects: %d", len(projects)) + logger.info(" ✓ Successful: %d", successful) + logger.info(" ✗ Failed: %d", failed) + logger.info("=" * 60) + + +def parse_cli() -> argparse.Namespace: + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description="Submit multiple HFSS jobs to the local job-manager service by scanning a directory.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--host", + default="localhost", + help="Job-manager host (default: localhost)", + ) + parser.add_argument( + "--port", + type=int, + default=8080, + help="Job-manager port (default: 8080)", + ) + parser.add_argument( + "--root-dir", + required=True, + type=Path, + help="Root directory to scan for .aedb folders and .aedt files", + ) + parser.add_argument( + "--num-cores", + type=int, + default=8, + help="Number of cores to allocate per job (default: 8)", + ) + parser.add_argument( + "--recursive", + action="store_true", + help="Scan subdirectories recursively", + ) + parser.add_argument( + "--max-concurrent", + type=int, + default=5, + help="Maximum number of concurrent job submissions (default: 5)", + ) + parser.add_argument( + "--delay-ms", + type=int, + default=100, + help="Delay in milliseconds between job submissions (default: 100)", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose logging (DEBUG level)", + ) + return parser.parse_args() + + +def main() -> None: + """Main entry point for the batch job submission script.""" + args = parse_cli() + + # Configure logging + log_level = logging.DEBUG if args.verbose else logging.INFO + if not logging.getLogger().handlers: + logging.basicConfig( + level=log_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + # Basic sanity checks + if not args.root_dir.exists(): + logger.error("Error: root directory does not exist: %s", args.root_dir) + print(f"Error: root directory does not exist: {args.root_dir}", file=sys.stderr) + sys.exit(1) + + if not args.root_dir.is_dir(): + logger.error("Error: root-dir must be a directory: %s", args.root_dir) + print(f"Error: root-dir must be a directory: {args.root_dir}", file=sys.stderr) + sys.exit(1) + + if args.num_cores <= 0: + logger.error("Error: --num-cores must be positive") + print("Error: --num-cores must be positive", file=sys.stderr) + sys.exit(1) + + if args.max_concurrent <= 0: + logger.error("Error: --max-concurrent must be positive") + print("Error: --max-concurrent must be positive", file=sys.stderr) + sys.exit(1) + + if args.delay_ms < 0: + logger.error("Error: --delay-ms must be non-negative") + print("Error: --delay-ms must be non-negative", file=sys.stderr) + sys.exit(1) + + # Scan for projects + projects = scan_projects(args.root_dir, recursive=args.recursive) + + if not projects: + logger.warning("No projects found in %s", args.root_dir) + print(f"Warning: No .aedb folders or .aedt files found in {args.root_dir}") + sys.exit(0) + + # Submit jobs + asyncio.run( + submit_batch_jobs( + host=args.host, + port=args.port, + projects=projects, + num_cores=args.num_cores, + max_concurrent=args.max_concurrent, + delay_ms=args.delay_ms, + ) + ) + + +if __name__ == "__main__": + main()