From 604d1ef379a839af11662f54fc779188a69dd06c Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Fri, 7 Nov 2025 16:55:43 +0100 Subject: [PATCH 01/10] adding CLI for batch submission --- doc/source/workflows/drc/drc.rst | 4 +- .../workflows/job_manager/submit_job.rst | 126 +++++- ignore_words.txt | 4 + .../backend/README_BATCH_SUBMISSION.md | 129 ++++++ .../job_manager/backend/submit_batch_jobs.py | 400 ++++++++++++++++++ 5 files changed, 660 insertions(+), 3 deletions(-) create mode 100644 src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md create mode 100644 src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py diff --git a/doc/source/workflows/drc/drc.rst b/doc/source/workflows/drc/drc.rst index adb2323347..eb12b405b4 100644 --- a/doc/source/workflows/drc/drc.rst +++ b/doc/source/workflows/drc/drc.rst @@ -1,7 +1,7 @@ .. _ref_drc: ================================================================== -Design-rule checking (DRC)–self-contained, multi-threaded engine +Design-rule checking (DRC)—self-contained, multi-threaded engine ================================================================== .. currentmodule:: pyedb.workflows.drc.drc @@ -128,7 +128,7 @@ Load a rule deck from JSON with open("my_rules.json") as f: rules = Rules.from_dict(json.load(f)) -Export violations to CSV +Export violations to csv ~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python diff --git a/doc/source/workflows/job_manager/submit_job.rst b/doc/source/workflows/job_manager/submit_job.rst index 7685b331e5..23b12f2c15 100644 --- a/doc/source/workflows/job_manager/submit_job.rst +++ b/doc/source/workflows/job_manager/submit_job.rst @@ -44,7 +44,7 @@ It exposes: * REST & Web-Socket endpoints (``http://localhost:8080`` by default) * Thread-safe synchronous façade for scripts / Jupyter * Native async API for advanced integrations -* CLI utilities ``submit_local_job`` and ``submit_job_on_scheduler`` for shell / CI pipelines +* CLI utilities ``submit_local_job``, ``submit_batch_jobs``, and ``submit_job_on_scheduler`` for shell / CI pipelines The **same backend code path** is used regardless of front-end style; the difference is **who owns the event loop** and **how control is returned to the caller**. @@ -176,6 +176,130 @@ Example—CLI (cluster) The command returns immediately after the job is **queued**; use the printed ID with ``wait_until_done`` or monitor via the web UI. +CLI—``submit_batch_jobs`` +^^^^^^^^^^^^^^^^^^^^^^^^^^ +For bulk submissions, use ``submit_batch_jobs`` to automatically discover and submit +multiple projects from a directory tree. + +Synopsis +"""""""" +.. code-block:: bash + + $ python submit_batch_jobs.py --root-dir [options] + +Key features +"""""""""""" +* **Automatic discovery**: Scans for all ``.aedb`` folders and ``.aedt`` files +* **Smart pairing**: When both ``.aedb`` and ``.aedt`` exist, uses the ``.aedt`` file +* **Asynchronous submission**: Submits jobs concurrently for faster processing +* **Recursive scanning**: Optional recursive directory traversal + +Options +""""""" +.. list-table:: + :widths: 30 15 55 + :header-rows: 1 + + * - Argument + - Default + - Description + * - ``--root-dir`` + - *(required)* + - Root directory to scan for projects + * - ``--host`` + - ``localhost`` + - Job manager host address + * - ``--port`` + - ``8080`` + - Job manager port + * - ``--num-cores`` + - ``8`` + - Number of cores to allocate per job + * - ``--max-concurrent`` + - ``5`` + - Maximum concurrent job submissions + * - ``--delay-ms`` + - ``100`` + - Delay in milliseconds between job submissions + * - ``--recursive`` + - ``False`` + - Scan subdirectories recursively + * - ``--verbose`` + - ``False`` + - Enable debug logging + +Example—batch submission (local) +""""""""""""""""""""""""""""""""" +.. code-block:: bash + + # Submit all projects in a directory + $ python submit_batch_jobs.py --root-dir "D:\Temp\test_jobs" + + # Recursive scan with custom core count + $ python submit_batch_jobs.py \ + --root-dir "D:\Projects\simulations" \ + --num-cores 16 \ + --recursive \ + --verbose + +Example output +"""""""""""""" +.. code-block:: text + + 2025-11-07 10:30:15 - __main__ - INFO - Scanning D:\Temp\test_jobs for projects (recursive=False) + 2025-11-07 10:30:15 - __main__ - INFO - Found AEDB folder: D:\Temp\test_jobs\project1.aedb + 2025-11-07 10:30:15 - __main__ - INFO - Found AEDT file: D:\Temp\test_jobs\project2.aedt + 2025-11-07 10:30:15 - __main__ - INFO - Using AEDB folder for project: D:\Temp\test_jobs\project1.aedb + 2025-11-07 10:30:15 - __main__ - INFO - Using standalone AEDT file: D:\Temp\test_jobs\project2.aedt + 2025-11-07 10:30:15 - __main__ - INFO - Found 2 project(s) to submit + 2025-11-07 10:30:15 - __main__ - INFO - Starting batch submission of 2 project(s) to http://localhost:8080 + 2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project1.aedb (status=200) + 2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project2.aedt (status=200) + 2025-11-07 10:30:16 - __main__ - INFO - ============================================================ + 2025-11-07 10:30:16 - __main__ - INFO - Batch submission complete: + 2025-11-07 10:30:16 - __main__ - INFO - Total projects: 2 + 2025-11-07 10:30:16 - __main__ - INFO - ✓ Successful: 2 + 2025-11-07 10:30:16 - __main__ - INFO - ✗ Failed: 0 + 2025-11-07 10:30:16 - __main__ - INFO - ============================================================ + +How it works +"""""""""""" +1. **Scanning phase**: + + * Searches for all ``.aedb`` folders in the root directory + * Searches for all ``.aedt`` files in the root directory + * For each ``.aedb`` folder, checks if a corresponding ``.aedt`` file exists: + + - If yes: Uses the ``.aedt`` file + - If no: Uses the ``.aedb`` folder + + * Standalone ``.aedt`` files (without corresponding ``.aedb``) are also included + +2. **Submission phase**: + + * Creates job configurations for each project + * Submits jobs asynchronously to the job manager REST API + * Limits concurrent submissions using a semaphore (default: 5) + * Reports success/failure for each submission + +3. **Results**: + + * Displays a summary with total, successful, and failed submissions + * Logs detailed information about each submission + +.. note:: + The script does **not** wait for jobs to complete, only for submission confirmation. + Job execution happens asynchronously in the job manager service. + +.. tip:: + * Use ``--max-concurrent`` to limit load on the job manager service when submitting + large batches. + * Use ``--delay-ms`` to control the pause between submissions (default: 100ms). + This ensures HTTP requests are fully sent before the next submission starts. + * Set ``--delay-ms 0`` to disable the delay if your network is very fast and reliable. + * For very large batch submissions, consider increasing the timeout in the code if + network latency is high. + Programmatic—native asyncio """"""""""""""""""""""""""""" .. code-block:: python diff --git a/ignore_words.txt b/ignore_words.txt index c8fa792f47..7f78ee4977 100644 --- a/ignore_words.txt +++ b/ignore_words.txt @@ -32,3 +32,7 @@ aline COM gRPC Toolkits +Cohn +Pydantic +pydantic +Drc diff --git a/src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md b/src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md new file mode 100644 index 0000000000..4c2e6b7cc5 --- /dev/null +++ b/src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md @@ -0,0 +1,129 @@ +# Batch Job Submission + +## Overview + +The `submit_batch_jobs.py` script allows you to submit multiple HFSS projects to the job manager by scanning a directory for `.aedb` folders and `.aedt` files. + +## Features + +- **Automatic Project Discovery**: Scans a root directory for all `.aedb` folders and `.aedt` files +- **Smart Pairing**: When both an `.aedb` folder and corresponding `.aedt` file exist, the `.aedt` file is used +- **Asynchronous Submission**: Jobs are submitted concurrently to the job manager for faster processing +- **Recursive Scanning**: Optional recursive directory scanning +- **Configurable Concurrency**: Control how many jobs are submitted simultaneously + +## Prerequisites + +1. The job manager service must be running before executing this script: + ```bash + python -m pyedb.workflows.job_manager.backend.job_manager_handler + ``` + +2. Install required dependencies: + ```bash + pip install aiohttp + ``` + +## Usage + +### Basic Usage + +Submit all projects in a directory: + +```bash +python submit_batch_jobs.py --root-dir "D:\Temp\test_jobs" +``` + +### Advanced Options + +```bash +python submit_batch_jobs.py \ + --host localhost \ + --port 8080 \ + --root-dir "D:\Temp\test_jobs" \ + --num-cores 8 \ + --max-concurrent 5 \ + --delay-ms 100 \ + --recursive \ + --verbose +``` + +### Command-Line Arguments + +| Argument | Description | Default | +|----------|-------------|---------| +| `--host` | Job manager host address | `localhost` | +| `--port` | Job manager port | `8080` | +| `--root-dir` | Root directory to scan (required) | - | +| `--num-cores` | Number of cores per job | `8` | +| `--max-concurrent` | Max concurrent submissions | `5` | +| `--delay-ms` | Delay in milliseconds between submissions | `100` | +| `--recursive` | Scan subdirectories recursively | `False` | +| `--verbose` | Enable debug logging | `False` | + +## How It Works + +1. **Scanning Phase**: + - Searches for all `.aedb` folders in the root directory + - Searches for all `.aedt` files in the root directory + - For each `.aedb` folder, checks if a corresponding `.aedt` file exists: + - If yes: Uses the `.aedt` file + - If no: Uses the `.aedb` folder + - Standalone `.aedt` files (without corresponding `.aedb`) are also included + +2. **Submission Phase**: + - Creates job configurations for each project + - Submits jobs asynchronously to the job manager REST API + - Limits concurrent submissions using a semaphore + - Reports success/failure for each submission + +3. **Results**: + - Displays a summary with total, successful, and failed submissions + - Logs detailed information about each submission + +## Example Output + +``` +2025-11-07 10:30:15 - __main__ - INFO - Scanning D:\Temp\test_jobs for projects (recursive=False) +2025-11-07 10:30:15 - __main__ - INFO - Found AEDB folder: D:\Temp\test_jobs\project1.aedb +2025-11-07 10:30:15 - __main__ - INFO - Found AEDT file: D:\Temp\test_jobs\project2.aedt +2025-11-07 10:30:15 - __main__ - INFO - Using AEDB folder for project: D:\Temp\test_jobs\project1.aedb +2025-11-07 10:30:15 - __main__ - INFO - Using standalone AEDT file: D:\Temp\test_jobs\project2.aedt +2025-11-07 10:30:15 - __main__ - INFO - Found 2 project(s) to submit +2025-11-07 10:30:15 - __main__ - INFO - Starting batch submission of 2 project(s) to http://localhost:8080 +2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project1.aedb (status=200) +2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project2.aedt (status=200) +2025-11-07 10:30:16 - __main__ - INFO - ============================================================ +2025-11-07 10:30:16 - __main__ - INFO - Batch submission complete: +2025-11-07 10:30:16 - __main__ - INFO - Total projects: 2 +2025-11-07 10:30:16 - __main__ - INFO - ✓ Successful: 2 +2025-11-07 10:30:16 - __main__ - INFO - ✗ Failed: 0 +2025-11-07 10:30:16 - __main__ - INFO - ============================================================ +``` + +## Comparison with submit_local_job.py + +| Feature | submit_local_job.py | submit_batch_jobs.py | +|---------|---------------------|----------------------| +| Projects | Single project | Multiple projects | +| Scanning | Manual path | Automatic discovery | +| Submission | Single | Concurrent/Async | +| Pairing | N/A | .aedb ↔ .aedt pairing | +| Recursive | N/A | Optional | + +## Error Handling + +- **Service Not Running**: If the job manager service is not accessible, submissions will fail with connection errors +- **Invalid Projects**: Projects that fail to create valid configurations will be logged as errors +- **Network Timeouts**: 120-second timeout for each submission request +- **Partial Failures**: The script continues submitting other jobs even if some fail + +## Notes + +- The script does not wait for jobs to complete, only for submission confirmation +- Job execution happens asynchronously in the job manager service +- Use `--max-concurrent` to limit load on the job manager service +- Use `--delay-ms` to add a pause between submissions, ensuring HTTP requests are fully sent (default: 100ms) +- For large batch submissions, consider increasing the timeout in the code if needed +- Set `--delay-ms 0` to disable the delay if network is very fast and reliable + diff --git a/src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py b/src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py new file mode 100644 index 0000000000..8bb6368649 --- /dev/null +++ b/src/pyedb/workflows/job_manager/backend/submit_batch_jobs.py @@ -0,0 +1,400 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2023 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +""" +Submit multiple HFSS jobs to the local job-manager service by scanning a directory. +Job submissions are done via REST API asynchronously. +The service must be running locally (default: localhost:8080) prior to executing this script. +To start the service, run this command in another terminal: + python -m pyedb.workflows.job_manager.backend.job_manager_handler + +Usage examples +-------------- +# Get help +python submit_batch_jobs.py --help + +# Submit all projects in a directory with explicit values +python submit_batch_jobs.py \ + --host 127.0.0.1 \ + --port 8080 \ + --root-dir "D:\\Temp\\test_jobs" \ + --num-cores 8 + +# Use defaults (localhost:8080, 8 cores) +python submit_batch_jobs.py --root-dir "D:\\Temp\\test_jobs" + +# Recursive scan +python submit_batch_jobs.py --root-dir "D:\\Temp\\test_jobs" --recursive +""" + +import argparse +import asyncio +import logging +from pathlib import Path +import sys +from typing import Any, List, cast + +import aiohttp + +from pyedb.workflows.job_manager.backend.job_submission import ( + MachineNode, + create_hfss_config, +) + +logger = logging.getLogger(__name__) + + +def scan_projects(root_dir: Path, recursive: bool = False) -> List[Path]: + """ + Scan a directory for AEDB folders and AEDT files. + + For each AEDB folder found, check if a corresponding AEDT file exists. + If it does, use the AEDT file; otherwise, use the AEDB folder. + + Parameters + ---------- + root_dir : Path + Root directory to scan for projects. + recursive : bool, optional + If True, scan subdirectories recursively. Default is False. + + Returns + ------- + List[Path] + List of project paths (either .aedt files or .aedb folders) to submit. + """ + projects = [] + aedb_folders = set() + aedt_files = set() + + # Determine the glob pattern based on recursive flag + glob_pattern = "**/*" if recursive else "*" + + logger.info("Scanning %s for projects (recursive=%s)", root_dir, recursive) + + # Find all .aedb folders + for path in root_dir.glob(glob_pattern): + if path.is_dir() and path.suffix == ".aedb": + aedb_folders.add(path) + logger.debug("Found AEDB folder: %s", path) + + # Find all .aedt files + if recursive: + for path in root_dir.glob("**/*.aedt"): + if path.is_file(): + aedt_files.add(path) + logger.debug("Found AEDT file: %s", path) + else: + for path in root_dir.glob("*.aedt"): + if path.is_file(): + aedt_files.add(path) + logger.debug("Found AEDT file: %s", path) + + # Process AEDB folders: check if corresponding AEDT exists + for aedb_folder in aedb_folders: + # Check for corresponding .aedt file (same name, same directory) + corresponding_aedt = aedb_folder.with_suffix(".aedt") + + if corresponding_aedt in aedt_files: + # Use the AEDT file and remove it from the set to avoid duplicates + projects.append(corresponding_aedt) + aedt_files.discard(corresponding_aedt) + logger.info("Using AEDT file for project: %s", corresponding_aedt) + else: + # Use the AEDB folder + projects.append(aedb_folder) + logger.info("Using AEDB folder for project: %s", aedb_folder) + + # Add remaining AEDT files that don't have corresponding AEDB folders + for aedt_file in aedt_files: + projects.append(aedt_file) + logger.info("Using standalone AEDT file: %s", aedt_file) + + # Sort for consistent ordering + projects.sort() + + logger.info("Found %d project(s) to submit", len(projects)) + return projects + + +async def submit_single_job( + session: aiohttp.ClientSession, + backend_url: str, + project_path: Path, + num_cores: int, + priority: int = 0, +) -> tuple[Path, bool, Any]: + """ + Submit a single job to the job manager. + + Parameters + ---------- + session : aiohttp.ClientSession + Async HTTP session for making requests. + backend_url : str + Base URL of the job manager service. + project_path : Path + Path to the project file or folder. + num_cores : int + Number of CPU cores to allocate. + priority : int, optional + Job priority (default: 0). + + Returns + ------- + tuple[Path, bool, Any] + Tuple of (project_path, success_flag, response_data). + """ + try: + cfg = create_hfss_config(project_path=str(project_path)) + + # Ensure we have at least one machine node and configure it + if not cfg.machine_nodes: + cfg.machine_nodes.append(MachineNode()) + + # Use a typed reference to avoid static-analysis/indexing warnings + node = cast(MachineNode, cfg.machine_nodes[0]) + node.cores = num_cores + node.max_cores = num_cores + + async with session.post( + f"{backend_url}/jobs/submit", + json={ + "config": cfg.model_dump(mode="json", exclude_defaults=False), + "priority": priority, + }, + ) as resp: + status = resp.status + # Try to parse JSON reply safely; fall back to text + try: + reply: Any = await resp.json() + except Exception: + reply = await resp.text() + + success = 200 <= status < 300 + if success: + logger.info("✓ Successfully submitted: %s (status=%s)", project_path.name, status) + else: + logger.error("✗ Failed to submit: %s (status=%s): %s", project_path.name, status, reply) + + return (project_path, success, reply) + + except asyncio.CancelledError: + # Re-raise cancellation so callers can handle it + raise + except Exception as exc: + logger.exception("✗ Exception submitting %s: %s", project_path.name, exc) + return (project_path, False, str(exc)) + + +async def submit_batch_jobs( + *, + host: str, + port: int, + projects: List[Path], + num_cores: int, + max_concurrent: int = 5, + delay_ms: int = 100, +) -> None: + """ + Submit multiple jobs asynchronously to the job manager. + + Parameters + ---------- + host : str + Job manager host address. + port : int + Job manager port. + projects : List[Path] + List of project paths to submit. + num_cores : int + Number of CPU cores to allocate per job. + max_concurrent : int, optional + Maximum number of concurrent submissions (default: 5). + delay_ms : int, optional + Delay in milliseconds between job submissions (default: 100). + """ + backend_url = f"http://{host}:{port}" + + logger.info("Starting batch submission of %d project(s) to %s", len(projects), backend_url) + logger.info("Max concurrent submissions: %d", max_concurrent) + logger.info("Delay between submissions: %d ms", delay_ms) + + # Use a reasonable timeout for network operations + timeout = aiohttp.ClientTimeout(total=120) + + async with aiohttp.ClientSession(timeout=timeout) as session: + # Create a semaphore to limit concurrent requests + semaphore = asyncio.Semaphore(max_concurrent) + + async def submit_with_semaphore(project: Path) -> tuple[Path, bool, Any]: + async with semaphore: + result = await submit_single_job(session, backend_url, project, num_cores) + # Add a small delay to ensure HTTP request is fully sent before next submission + await asyncio.sleep(delay_ms / 1000.0) + return result + + # Submit all jobs concurrently (but limited by semaphore) + tasks = [submit_with_semaphore(project) for project in projects] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Summarize results + successful = 0 + failed = 0 + + for result in results: + if isinstance(result, Exception): + logger.error("Task raised exception: %s", result) + failed += 1 + else: + project_path, success, _reply = result + if success: + successful += 1 + else: + failed += 1 + + logger.info("=" * 60) + logger.info("Batch submission complete:") + logger.info(" Total projects: %d", len(projects)) + logger.info(" ✓ Successful: %d", successful) + logger.info(" ✗ Failed: %d", failed) + logger.info("=" * 60) + + +def parse_cli() -> argparse.Namespace: + """Parse command-line arguments.""" + parser = argparse.ArgumentParser( + description="Submit multiple HFSS jobs to the local job-manager service by scanning a directory.", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument( + "--host", + default="localhost", + help="Job-manager host (default: localhost)", + ) + parser.add_argument( + "--port", + type=int, + default=8080, + help="Job-manager port (default: 8080)", + ) + parser.add_argument( + "--root-dir", + required=True, + type=Path, + help="Root directory to scan for .aedb folders and .aedt files", + ) + parser.add_argument( + "--num-cores", + type=int, + default=8, + help="Number of cores to allocate per job (default: 8)", + ) + parser.add_argument( + "--recursive", + action="store_true", + help="Scan subdirectories recursively", + ) + parser.add_argument( + "--max-concurrent", + type=int, + default=5, + help="Maximum number of concurrent job submissions (default: 5)", + ) + parser.add_argument( + "--delay-ms", + type=int, + default=100, + help="Delay in milliseconds between job submissions (default: 100)", + ) + parser.add_argument( + "--verbose", + "-v", + action="store_true", + help="Enable verbose logging (DEBUG level)", + ) + return parser.parse_args() + + +def main() -> None: + """Main entry point for the batch job submission script.""" + args = parse_cli() + + # Configure logging + log_level = logging.DEBUG if args.verbose else logging.INFO + if not logging.getLogger().handlers: + logging.basicConfig( + level=log_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + # Basic sanity checks + if not args.root_dir.exists(): + logger.error("Error: root directory does not exist: %s", args.root_dir) + print(f"Error: root directory does not exist: {args.root_dir}", file=sys.stderr) + sys.exit(1) + + if not args.root_dir.is_dir(): + logger.error("Error: root-dir must be a directory: %s", args.root_dir) + print(f"Error: root-dir must be a directory: {args.root_dir}", file=sys.stderr) + sys.exit(1) + + if args.num_cores <= 0: + logger.error("Error: --num-cores must be positive") + print("Error: --num-cores must be positive", file=sys.stderr) + sys.exit(1) + + if args.max_concurrent <= 0: + logger.error("Error: --max-concurrent must be positive") + print("Error: --max-concurrent must be positive", file=sys.stderr) + sys.exit(1) + + if args.delay_ms < 0: + logger.error("Error: --delay-ms must be non-negative") + print("Error: --delay-ms must be non-negative", file=sys.stderr) + sys.exit(1) + + # Scan for projects + projects = scan_projects(args.root_dir, recursive=args.recursive) + + if not projects: + logger.warning("No projects found in %s", args.root_dir) + print(f"Warning: No .aedb folders or .aedt files found in {args.root_dir}") + sys.exit(0) + + # Submit jobs + asyncio.run( + submit_batch_jobs( + host=args.host, + port=args.port, + projects=projects, + num_cores=args.num_cores, + max_concurrent=args.max_concurrent, + delay_ms=args.delay_ms, + ) + ) + + +if __name__ == "__main__": + main() From 0adea3e617bb4dbf17e129e2ed44b9f4908bd52f Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Fri, 7 Nov 2025 16:57:07 +0100 Subject: [PATCH 02/10] adding CLI for batch submission --- .../backend/README_BATCH_SUBMISSION.md | 129 ------------------ 1 file changed, 129 deletions(-) delete mode 100644 src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md diff --git a/src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md b/src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md deleted file mode 100644 index 4c2e6b7cc5..0000000000 --- a/src/pyedb/workflows/job_manager/backend/README_BATCH_SUBMISSION.md +++ /dev/null @@ -1,129 +0,0 @@ -# Batch Job Submission - -## Overview - -The `submit_batch_jobs.py` script allows you to submit multiple HFSS projects to the job manager by scanning a directory for `.aedb` folders and `.aedt` files. - -## Features - -- **Automatic Project Discovery**: Scans a root directory for all `.aedb` folders and `.aedt` files -- **Smart Pairing**: When both an `.aedb` folder and corresponding `.aedt` file exist, the `.aedt` file is used -- **Asynchronous Submission**: Jobs are submitted concurrently to the job manager for faster processing -- **Recursive Scanning**: Optional recursive directory scanning -- **Configurable Concurrency**: Control how many jobs are submitted simultaneously - -## Prerequisites - -1. The job manager service must be running before executing this script: - ```bash - python -m pyedb.workflows.job_manager.backend.job_manager_handler - ``` - -2. Install required dependencies: - ```bash - pip install aiohttp - ``` - -## Usage - -### Basic Usage - -Submit all projects in a directory: - -```bash -python submit_batch_jobs.py --root-dir "D:\Temp\test_jobs" -``` - -### Advanced Options - -```bash -python submit_batch_jobs.py \ - --host localhost \ - --port 8080 \ - --root-dir "D:\Temp\test_jobs" \ - --num-cores 8 \ - --max-concurrent 5 \ - --delay-ms 100 \ - --recursive \ - --verbose -``` - -### Command-Line Arguments - -| Argument | Description | Default | -|----------|-------------|---------| -| `--host` | Job manager host address | `localhost` | -| `--port` | Job manager port | `8080` | -| `--root-dir` | Root directory to scan (required) | - | -| `--num-cores` | Number of cores per job | `8` | -| `--max-concurrent` | Max concurrent submissions | `5` | -| `--delay-ms` | Delay in milliseconds between submissions | `100` | -| `--recursive` | Scan subdirectories recursively | `False` | -| `--verbose` | Enable debug logging | `False` | - -## How It Works - -1. **Scanning Phase**: - - Searches for all `.aedb` folders in the root directory - - Searches for all `.aedt` files in the root directory - - For each `.aedb` folder, checks if a corresponding `.aedt` file exists: - - If yes: Uses the `.aedt` file - - If no: Uses the `.aedb` folder - - Standalone `.aedt` files (without corresponding `.aedb`) are also included - -2. **Submission Phase**: - - Creates job configurations for each project - - Submits jobs asynchronously to the job manager REST API - - Limits concurrent submissions using a semaphore - - Reports success/failure for each submission - -3. **Results**: - - Displays a summary with total, successful, and failed submissions - - Logs detailed information about each submission - -## Example Output - -``` -2025-11-07 10:30:15 - __main__ - INFO - Scanning D:\Temp\test_jobs for projects (recursive=False) -2025-11-07 10:30:15 - __main__ - INFO - Found AEDB folder: D:\Temp\test_jobs\project1.aedb -2025-11-07 10:30:15 - __main__ - INFO - Found AEDT file: D:\Temp\test_jobs\project2.aedt -2025-11-07 10:30:15 - __main__ - INFO - Using AEDB folder for project: D:\Temp\test_jobs\project1.aedb -2025-11-07 10:30:15 - __main__ - INFO - Using standalone AEDT file: D:\Temp\test_jobs\project2.aedt -2025-11-07 10:30:15 - __main__ - INFO - Found 2 project(s) to submit -2025-11-07 10:30:15 - __main__ - INFO - Starting batch submission of 2 project(s) to http://localhost:8080 -2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project1.aedb (status=200) -2025-11-07 10:30:16 - __main__ - INFO - ✓ Successfully submitted: project2.aedt (status=200) -2025-11-07 10:30:16 - __main__ - INFO - ============================================================ -2025-11-07 10:30:16 - __main__ - INFO - Batch submission complete: -2025-11-07 10:30:16 - __main__ - INFO - Total projects: 2 -2025-11-07 10:30:16 - __main__ - INFO - ✓ Successful: 2 -2025-11-07 10:30:16 - __main__ - INFO - ✗ Failed: 0 -2025-11-07 10:30:16 - __main__ - INFO - ============================================================ -``` - -## Comparison with submit_local_job.py - -| Feature | submit_local_job.py | submit_batch_jobs.py | -|---------|---------------------|----------------------| -| Projects | Single project | Multiple projects | -| Scanning | Manual path | Automatic discovery | -| Submission | Single | Concurrent/Async | -| Pairing | N/A | .aedb ↔ .aedt pairing | -| Recursive | N/A | Optional | - -## Error Handling - -- **Service Not Running**: If the job manager service is not accessible, submissions will fail with connection errors -- **Invalid Projects**: Projects that fail to create valid configurations will be logged as errors -- **Network Timeouts**: 120-second timeout for each submission request -- **Partial Failures**: The script continues submitting other jobs even if some fail - -## Notes - -- The script does not wait for jobs to complete, only for submission confirmation -- Job execution happens asynchronously in the job manager service -- Use `--max-concurrent` to limit load on the job manager service -- Use `--delay-ms` to add a pause between submissions, ensuring HTTP requests are fully sent (default: 100ms) -- For large batch submissions, consider increasing the timeout in the code if needed -- Set `--delay-ms 0` to disable the delay if network is very fast and reliable - From 1cc2c7ab1c62b4b063add9e97b2f051319825592 Mon Sep 17 00:00:00 2001 From: pyansys-ci-bot <92810346+pyansys-ci-bot@users.noreply.github.com> Date: Fri, 7 Nov 2025 15:58:18 +0000 Subject: [PATCH 03/10] chore: adding changelog file 1635.added.md [dependabot-skip] --- doc/changelog.d/1635.added.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changelog.d/1635.added.md diff --git a/doc/changelog.d/1635.added.md b/doc/changelog.d/1635.added.md new file mode 100644 index 0000000000..de091fa368 --- /dev/null +++ b/doc/changelog.d/1635.added.md @@ -0,0 +1 @@ +Adding CLI for batch submission From dc2c4b07fc74f0e6f717b259f8f6c64f2e17d3f2 Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Sun, 9 Nov 2025 07:56:02 +0100 Subject: [PATCH 04/10] siwave valcheck --- src/pyedb/workflows/job_manager/backend/service.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/pyedb/workflows/job_manager/backend/service.py b/src/pyedb/workflows/job_manager/backend/service.py index 890acd9004..ca9f4c6a1e 100644 --- a/src/pyedb/workflows/job_manager/backend/service.py +++ b/src/pyedb/workflows/job_manager/backend/service.py @@ -1065,6 +1065,9 @@ async def _process_jobs_continuously(self): logger.info(f"Dequeued job {next_job_id}. Starting...") self.job_pool.running_jobs.add(next_job_id) asyncio.create_task(self._process_single_job(next_job_id)) + # Continue immediately to check if we can start more jobs + # The running_jobs check will prevent exceeding max_concurrent + continue else: logger.info("Queue is empty, sleeping.") await asyncio.sleep(1) @@ -1072,8 +1075,6 @@ async def _process_jobs_continuously(self): logger.warning("Cannot start new job, waiting...") await asyncio.sleep(5) - await asyncio.sleep(0.2) - async def _process_single_job(self, job_id: str): """ Process a single job from the pool. From 3bdc7f3c10fe1eeb55e47addcdd1d4950d426bca Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Sun, 9 Nov 2025 08:00:01 +0100 Subject: [PATCH 05/10] job manager max concurrent cli --- .../workflows/job_manager/backend/start_service.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/pyedb/workflows/job_manager/backend/start_service.py b/src/pyedb/workflows/job_manager/backend/start_service.py index 81ed43801d..82a95b3984 100644 --- a/src/pyedb/workflows/job_manager/backend/start_service.py +++ b/src/pyedb/workflows/job_manager/backend/start_service.py @@ -26,8 +26,10 @@ Usage ----- -$ python start_service.py --host 0.0.0.0 --port 9090 --min-disk 2 --min-memory 1 +$ python start_service.py --host 0.0.0.0 --port 9090 --max-concurrent 4 --min-disk 2 --min-memory 1 ✅ Job-manager backend listening on http://0.0.0.0:9090 + Max concurrent jobs: 4 + Resource gates: 2.0 GB disk / 1.0 GB memory Press Ctrl-C to shut down gracefully. """ @@ -53,6 +55,12 @@ def parse_cli() -> argparse.Namespace: default=8080, help="TCP port to listen on (default: 8080)", ) + parser.add_argument( + "--max-concurrent", + type=int, + default=4, + help="Maximum number of concurrent jobs for local runs (default: 4)", + ) parser.add_argument( "--min-disk", type=float, @@ -74,12 +82,14 @@ def main(): handler = JobManagerHandler(host=args.host, port=args.port) # Override resource limits from CLI + handler.manager.resource_limits.max_concurrent_jobs = args.max_concurrent handler.manager.resource_limits.min_disk_gb = args.min_disk handler.manager.resource_limits.min_memory_gb = args.min_memory handler.start_service() # non-blocking; spins up daemon thread + aiohttp print(f"✅ Job-manager backend listening on http://{handler.host}:{handler.port}") + print(f" Max concurrent jobs: {args.max_concurrent}") print(f" Resource gates: {args.min_disk} GB disk / {args.min_memory} GB memory") # Graceful shutdown on Ctrl-C From cc47da5122367a5f0936fc85a147b93d1709d1f7 Mon Sep 17 00:00:00 2001 From: pyansys-ci-bot <92810346+pyansys-ci-bot@users.noreply.github.com> Date: Sun, 9 Nov 2025 07:07:17 +0000 Subject: [PATCH 06/10] chore: adding changelog file 1640.added.md [dependabot-skip] --- doc/changelog.d/1640.added.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changelog.d/1640.added.md diff --git a/doc/changelog.d/1640.added.md b/doc/changelog.d/1640.added.md new file mode 100644 index 0000000000..141536f469 --- /dev/null +++ b/doc/changelog.d/1640.added.md @@ -0,0 +1 @@ +Job manager concurrent job bug From 696b871a5750e39aed37ed94ae6c8dc50ae45235 Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Mon, 10 Nov 2025 13:25:31 +0100 Subject: [PATCH 07/10] job service modified --- .../backend/job_manager_handler.py | 5 +- .../workflows/job_manager/backend/service.py | 220 +++++++++--------- .../job_manager/backend/start_service.py | 10 + 3 files changed, 125 insertions(+), 110 deletions(-) diff --git a/src/pyedb/workflows/job_manager/backend/job_manager_handler.py b/src/pyedb/workflows/job_manager/backend/job_manager_handler.py index 9b01f3e504..bbcbd70256 100644 --- a/src/pyedb/workflows/job_manager/backend/job_manager_handler.py +++ b/src/pyedb/workflows/job_manager/backend/job_manager_handler.py @@ -231,8 +231,9 @@ def __init__(self, edb=None, version=None, host="localhost", port=8080): else: self.ansys_path = os.path.join(installed_versions[version], "ansysedt.exe") self.scheduler_type = self._detect_scheduler() - self.manager = JobManager(scheduler_type=self.scheduler_type) - self.manager.resource_limits = ResourceLimits(max_concurrent_jobs=1) + # Create resource limits with default values + resource_limits = ResourceLimits(max_concurrent_jobs=1) + self.manager = JobManager(resource_limits=resource_limits, scheduler_type=self.scheduler_type) self.manager.jobs = {} # In-memory job store -TODO add persistence database # Pass the detected ANSYS path to the manager self.manager.ansys_path = self.ansys_path diff --git a/src/pyedb/workflows/job_manager/backend/service.py b/src/pyedb/workflows/job_manager/backend/service.py index 890acd9004..bdbd88ec4b 100644 --- a/src/pyedb/workflows/job_manager/backend/service.py +++ b/src/pyedb/workflows/job_manager/backend/service.py @@ -500,6 +500,10 @@ def __init__(self, resource_limits: ResourceLimits = None, scheduler_type: Sched # Background task for continuous job processing self._processing_task: Optional[asyncio.Task] = None self._shutdown = False + # Event to wake up processing loop when limits change (created when loop is available) + self._limits_changed: Optional[asyncio.Event] = None + # Semaphore to enforce concurrent job limit (created when loop is available) + self._job_semaphore: Optional[asyncio.Semaphore] = None # Start resource monitoring immediately self._monitor_task = None self._ensure_monitor_running() @@ -1048,158 +1052,128 @@ async def submit_job(self, config: HFSSSimulationConfig, priority: int = 0) -> s async def _process_jobs_continuously(self): """ - Continuously process jobs until shutdown is requested. - - This is the main job processing loop that: - - Checks if new jobs can be started based on resource limits - - Dequeues the highest priority job - - Starts job execution in a separate task - - Sleeps when no jobs can be started or queue is empty + Main scheduler loop – strict concurrency enforcement. + A job is removed from the queue **only** when we already hold + a semaphore permit that will stay held until the job terminates. """ - logger.info("✅ Job processing loop started.") - while not self._shutdown: - can_start = self.job_pool.can_start_job(self.resource_monitor) - if can_start: - next_job_id = self.job_pool.get_next_job() - if next_job_id: - logger.info(f"Dequeued job {next_job_id}. Starting...") - self.job_pool.running_jobs.add(next_job_id) - asyncio.create_task(self._process_single_job(next_job_id)) - else: - logger.info("Queue is empty, sleeping.") - await asyncio.sleep(1) - else: - logger.warning("Cannot start new job, waiting...") - await asyncio.sleep(5) + if self._job_semaphore is None: + self._job_semaphore = asyncio.Semaphore(self.resource_limits.max_concurrent_jobs) - await asyncio.sleep(0.2) + logger.info("✅ Job-processing loop started (concurrency = %s)", self.resource_limits.max_concurrent_jobs) - async def _process_single_job(self, job_id: str): - """ - Process a single job from the pool. + while not self._shutdown: + await self._job_semaphore.acquire() # ❶ wait for free slot - Parameters - ---------- - job_id : str - Job identifier to process + next_job_id = None + while next_job_id is None and not self._shutdown: + next_job_id = self.job_pool.get_next_job() + if next_job_id is None: # queue empty + self._job_semaphore.release() # give slot back + try: + await asyncio.wait_for(self._limits_changed.wait(), 1.0) + self._limits_changed.clear() + await self._job_semaphore.acquire() + except asyncio.TimeoutError: + pass + + if next_job_id is None: # shutdown requested + self._job_semaphore.release() + break + + self.job_pool.running_jobs.add(next_job_id) + logger.info( + "De-queued job %s. Starting... (running: %s/%s)", + next_job_id, + len(self.job_pool.running_jobs), + self.resource_limits.max_concurrent_jobs, + ) + # transfer the **same** permit to the worker task + asyncio.create_task(self._process_single_job(next_job_id, semaphore=self._job_semaphore)) - Notes - ----- - This method handles: - - Local execution via subprocess - - Scheduler submission (SLURM/LSF) - - Status updates and notifications - - Error handling and cleanup + async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | None = None): + """ + Execute one job and **always** release the semaphore permit when done. """ job_info = self.jobs.get(job_id) if not job_info or job_info.status != JobStatus.QUEUED: + if semaphore: + semaphore.release() self.job_pool.running_jobs.discard(job_id) return - # Update job status job_info.status = JobStatus.RUNNING job_info.start_time = datetime.now() job_info.local_resources = self.resource_monitor.current_usage.copy() - # Notify web clients await self.sio.emit( "job_started", {"job_id": job_id, "start_time": job_info.start_time.isoformat(), "resources": job_info.local_resources}, ) - - logger.info(f"Job {job_id} started") + logger.info( + "Job %s actually executing (CPU %.1f %%)", job_id, self.resource_monitor.current_usage["cpu_percent"] + ) try: - # Run the simulation if job_info.config.scheduler_type != SchedulerType.NONE: - # Make sure the executable path is present + # ---------- scheduler path ---------- if not job_info.config.ansys_edt_path or not os.path.exists(job_info.config.ansys_edt_path): if self.ansys_path and os.path.exists(self.ansys_path): job_info.config = HFSSSimulationConfig( **{**job_info.config.model_dump(), "ansys_edt_path": self.ansys_path} ) - logger.info(f"Using JobManager's detected ANSYS path: {self.ansys_path}") else: - raise FileNotFoundError( - f"ANSYS executable not found. Config path: {job_info.config.ansys_edt_path}, " - f"Manager path: {self.ansys_path}" - ) + raise FileNotFoundError("ANSYS executable not found") - # Now generate the script – the path is guaranteed to be non-empty result = job_info.config.submit_to_scheduler() job_info.scheduler_job_id = job_info.config._extract_job_id(result.stdout) job_info.status = JobStatus.SCHEDULED - logger.info( - f"Job {job_id} submitted to scheduler with ID: {job_info.scheduler_job_id}, status: SCHEDULED" - ) await self.sio.emit("job_scheduled", {"job_id": job_id, "scheduler_job_id": job_info.scheduler_job_id}) + # for scheduler jobs we exit here; completion is tracked elsewhere + return - else: - # ---------------- local mode – same guarantee ----------------- - if not job_info.config.ansys_edt_path or not os.path.exists(job_info.config.ansys_edt_path): - if self.ansys_path and os.path.exists(self.ansys_path): - job_info.config = HFSSSimulationConfig( - **{**job_info.config.model_dump(), "ansys_edt_path": self.ansys_path} - ) - logger.info(f"Using JobManager's detected ANSYS path: {self.ansys_path}") - else: - raise FileNotFoundError( - f"ANSYS executable not found. Config path: {job_info.config.ansys_edt_path}, " - f"Manager path: {self.ansys_path}" - ) - - # Generate command as list for secure execution - command_list = job_info.config.generate_command_list() - - # Log the command being executed for debugging - logger.info(f"Executing command for job {job_id}: {' '.join(command_list)}") - logger.info(f"ANSYS executable path: {job_info.config.ansys_edt_path}") - logger.info(f"Project path: {job_info.config.project_path}") - - # Check if project file exists - if not os.path.exists(job_info.config.project_path): - raise FileNotFoundError(f"Project file not found: {job_info.config.project_path}") - - # Run locally - using asyncio subprocess for better control with secure command list - process = await asyncio.create_subprocess_exec( - *command_list, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - job_info.process = process - - # Wait for completion with timeout (24 hours max) - try: - stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=86400) + # ---------- local path ---------- + command_list = job_info.config.generate_command_list() + process = await asyncio.create_subprocess_exec( + *command_list, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + job_info.process = process - job_info.return_code = process.returncode - job_info.output = stdout.decode() if stdout else "" - job_info.error = stderr.decode() if stderr else "" + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=86400) + job_info.return_code = process.returncode + job_info.output = stdout.decode() if stdout else "" + job_info.error = stderr.decode() if stderr else "" - if process.returncode == 0: - job_info.status = JobStatus.COMPLETED - logger.info(f"Job {job_id} completed successfully") - else: - job_info.status = JobStatus.FAILED - logger.error(f"Job {job_id} failed with return code {process.returncode}") + if process.returncode == 0: + job_info.status = JobStatus.COMPLETED + logger.info("Job %s completed successfully", job_id) + else: + job_info.status = JobStatus.FAILED + logger.error("Job %s failed with return code %s", job_id, process.returncode) - except asyncio.TimeoutError: - job_info.status = JobStatus.FAILED - job_info.error = "Job timed out after 24 hours" - process.terminate() - logger.error(f"Job {job_id} timed out") + except asyncio.TimeoutError: + job_info.status = JobStatus.FAILED + job_info.error = "Job timed out after 24 h" + if job_info.process: + job_info.process.kill() + logger.error("Job %s timed out", job_id) - except Exception as e: + except Exception as exc: job_info.status = JobStatus.FAILED - job_info.error = str(e) - logger.error(f"Job {job_id} failed with error: {e}") + job_info.error = str(exc) + logger.error("Job %s failed with error: %s", job_id, exc) finally: job_info.end_time = datetime.now() self.job_pool.running_jobs.discard(job_id) - # Notify web clients + # ❷ release the permit **only** when the job is truly finished + if semaphore is not None: + semaphore.release() + if self._limits_changed is not None: + self._limits_changed.set() + await self.sio.emit( "job_completed", { @@ -1338,6 +1312,36 @@ async def edit_concurrent_limits(self, update_data: Dict[str, Any]) -> Optional[ f"{getattr(self.resource_limits, field)}" ) + # If max_concurrent_jobs changed, update semaphore + if "max_concurrent_jobs" in old_limits and self._job_semaphore is not None: + old_max = old_limits["max_concurrent_jobs"] + new_max = self.resource_limits.max_concurrent_jobs + diff = new_max - old_max + + logger.info(f"Updating semaphore from {old_max} to {new_max} (diff: {diff})") + + if diff > 0: + # Increase limit - release permits + for _ in range(diff): + self._job_semaphore.release() + logger.info(f"Released {diff} semaphore permits") + elif diff < 0: + # Decrease limit - acquire permits (non-blocking) + # Note: This doesn't stop already running jobs, just prevents new ones + acquired = 0 + for _ in range(abs(diff)): + if not self._job_semaphore.locked(): + try: + await asyncio.wait_for(self._job_semaphore.acquire(), timeout=0.01) + acquired += 1 + except asyncio.TimeoutError: + break + logger.info(f"Acquired {acquired} semaphore permits to reduce limit") + + # Signal processing loop to wake up and check if more jobs can be started + if self._limits_changed is not None: + self._limits_changed.set() + # Notify web clients about the update await self.sio.emit( "limits_updated", diff --git a/src/pyedb/workflows/job_manager/backend/start_service.py b/src/pyedb/workflows/job_manager/backend/start_service.py index 81ed43801d..b54d10d730 100644 --- a/src/pyedb/workflows/job_manager/backend/start_service.py +++ b/src/pyedb/workflows/job_manager/backend/start_service.py @@ -65,6 +65,12 @@ def parse_cli() -> argparse.Namespace: default=2.0, help="Minimum free memory in GB (default: 2.0)", ) + parser.add_argument( + "--max-concurrent-jobs", + type=int, + default=1, + help="Maximum number of concurrent jobs for local runs (default: 1)", + ) return parser.parse_args() @@ -76,11 +82,15 @@ def main(): # Override resource limits from CLI handler.manager.resource_limits.min_disk_gb = args.min_disk handler.manager.resource_limits.min_memory_gb = args.min_memory + handler.manager.resource_limits.max_concurrent_jobs = args.max_concurrent_jobs + # Ensure job_pool has the updated reference + handler.manager.job_pool.resource_limits = handler.manager.resource_limits handler.start_service() # non-blocking; spins up daemon thread + aiohttp print(f"✅ Job-manager backend listening on http://{handler.host}:{handler.port}") print(f" Resource gates: {args.min_disk} GB disk / {args.min_memory} GB memory") + print(f" Max concurrent jobs: {args.max_concurrent_jobs}") # Graceful shutdown on Ctrl-C stop_event = threading.Event() From 9e91634037b956f4d45dd31693229e6f942cabbe Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Mon, 10 Nov 2025 15:28:06 +0100 Subject: [PATCH 08/10] local job with massive job fixed --- .../workflows/job_manager/backend/service.py | 157 +++++++++++------- 1 file changed, 98 insertions(+), 59 deletions(-) diff --git a/src/pyedb/workflows/job_manager/backend/service.py b/src/pyedb/workflows/job_manager/backend/service.py index bdbd88ec4b..85dffc64e5 100644 --- a/src/pyedb/workflows/job_manager/backend/service.py +++ b/src/pyedb/workflows/job_manager/backend/service.py @@ -71,6 +71,7 @@ import logging import os.path import platform +import threading from typing import Any, Deque, Dict, List, Optional, Set import aiohttp @@ -504,6 +505,9 @@ def __init__(self, resource_limits: ResourceLimits = None, scheduler_type: Sched self._limits_changed: Optional[asyncio.Event] = None # Semaphore to enforce concurrent job limit (created when loop is available) self._job_semaphore: Optional[asyncio.Semaphore] = None + # Use a thread lock to protect creation of the asyncio lock (safe to create without event loop) + self._lock_init_lock = threading.Lock() + self._processing_task_lock: Optional[asyncio.Lock] = None # Start resource monitoring immediately self._monitor_task = None self._ensure_monitor_running() @@ -1029,11 +1033,20 @@ async def submit_job(self, config: HFSSSimulationConfig, priority: int = 0) -> s """ job_id = config.jobid + # Check if job already exists and is not completed + existing_job = self.jobs.get(job_id) + if existing_job: + if existing_job.status in (JobStatus.QUEUED, JobStatus.RUNNING): + logger.warning(f"⚠️ Job {job_id} already {existing_job.status.value}, skipping duplicate submission") + return job_id + elif existing_job.status in (JobStatus.COMPLETED, JobStatus.FAILED): + logger.info(f"Job {job_id} already finished ({existing_job.status.value}), allowing resubmission") + # Create job info job_info = JobInfo(config=config, status=JobStatus.QUEUED, priority=priority) self.jobs[job_id] = job_info - # Add to job pool + # Add to job pool (only if not already queued) self.job_pool.add_job(job_id, priority) # Notify web clients @@ -1044,61 +1057,110 @@ async def submit_job(self, config: HFSSSimulationConfig, priority: int = 0) -> s logger.info(f"Job {job_id} queued with priority {priority}") - # Trigger processing if not already running - if not self._processing_task or self._processing_task.done(): - self._processing_task = asyncio.create_task(self._process_jobs_continuously()) + # Trigger processing if not already running (protected by lock) + logger.debug( + f"🔒 Checking processing task status: task={self._processing_task}, lock={self._processing_task_lock}" + ) + + # Ensure asyncio lock exists using thread lock for protection + if self._processing_task_lock is None: + with self._lock_init_lock: # Thread-safe lock creation + if self._processing_task_lock is None: # Double-check pattern + logger.info("🔒 Creating new asyncio lock for processing task protection") + self._processing_task_lock = asyncio.Lock() + + logger.debug(f"🔒 Acquiring lock to check/create processing task...") + async with self._processing_task_lock: + logger.debug(f"🔒 Lock acquired! Current task: {self._processing_task}") + if not self._processing_task or self._processing_task.done(): + logger.warning(f"⚠️ CREATING NEW PROCESSING TASK (task_id will be logged)") + self._processing_task = asyncio.create_task(self._process_jobs_continuously()) + logger.warning(f"✅ NEW PROCESSING TASK CREATED: {id(self._processing_task)}") + else: + logger.debug(f"✓ Processing task already exists (id={id(self._processing_task)}), waking it up") + # Wake up the processing loop if it's waiting + if self._limits_changed is not None: + self._limits_changed.set() + logger.debug("✓ Sent wake-up signal to processing loop") return job_id async def _process_jobs_continuously(self): """ Main scheduler loop – strict concurrency enforcement. - A job is removed from the queue **only** when we already hold - a semaphore permit that will stay held until the job terminates. + Jobs are dequeued only when we have fewer than max_concurrent_jobs running. """ + loop_id = id(asyncio.current_task()) + logger.warning(f"🔄 PROCESSING LOOP STARTED - ID: {loop_id}") + if self._job_semaphore is None: self._job_semaphore = asyncio.Semaphore(self.resource_limits.max_concurrent_jobs) + if self._limits_changed is None: + self._limits_changed = asyncio.Event() + logger.info("✅ Job-processing loop started (concurrency = %s)", self.resource_limits.max_concurrent_jobs) while not self._shutdown: - await self._job_semaphore.acquire() # ❶ wait for free slot - - next_job_id = None - while next_job_id is None and not self._shutdown: - next_job_id = self.job_pool.get_next_job() - if next_job_id is None: # queue empty - self._job_semaphore.release() # give slot back - try: - await asyncio.wait_for(self._limits_changed.wait(), 1.0) - self._limits_changed.clear() - await self._job_semaphore.acquire() - except asyncio.TimeoutError: - pass - - if next_job_id is None: # shutdown requested - self._job_semaphore.release() + # Wait until we have capacity to start a new job + while len(self.job_pool.running_jobs) >= self.resource_limits.max_concurrent_jobs and not self._shutdown: + logger.debug( + "At capacity (%d/%d jobs running), waiting...", + len(self.job_pool.running_jobs), + self.resource_limits.max_concurrent_jobs, + ) + try: + await asyncio.wait_for(self._limits_changed.wait(), 1.0) + self._limits_changed.clear() + except asyncio.TimeoutError: + pass # Check again + + if self._shutdown: break + # Get next job from queue + next_job_id = self.job_pool.get_next_job() + if next_job_id is None: + # Queue is empty, wait for new jobs + try: + await asyncio.wait_for(self._limits_changed.wait(), 1.0) + self._limits_changed.clear() + except asyncio.TimeoutError: + pass # Check queue again + continue + + # Skip if job is already running (duplicate prevention) + if next_job_id in self.job_pool.running_jobs: + logger.warning(f"⚠️ Job {next_job_id} already running, skipping duplicate") + continue + + # Mark as running BEFORE creating task to prevent race condition self.job_pool.running_jobs.add(next_job_id) logger.info( - "De-queued job %s. Starting... (running: %s/%s)", + "De-queued job %s. Starting... (running: %s/%s) [Loop ID: %s]", next_job_id, len(self.job_pool.running_jobs), self.resource_limits.max_concurrent_jobs, + loop_id, ) - # transfer the **same** permit to the worker task - asyncio.create_task(self._process_single_job(next_job_id, semaphore=self._job_semaphore)) + + # Start the job (no semaphore needed now, we use running_jobs count directly) + asyncio.create_task(self._process_single_job(next_job_id, semaphore=None)) + + # Small yield to prevent tight loop + await asyncio.sleep(0) async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | None = None): """ - Execute one job and **always** release the semaphore permit when done. + Execute one job and always clean up running_jobs set when done. + Semaphore parameter is deprecated but kept for compatibility. """ job_info = self.jobs.get(job_id) if not job_info or job_info.status != JobStatus.QUEUED: - if semaphore: - semaphore.release() self.job_pool.running_jobs.discard(job_id) + # Signal that capacity is available + if self._limits_changed is not None: + self._limits_changed.set() return job_info.status = JobStatus.RUNNING @@ -1128,7 +1190,11 @@ async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | job_info.scheduler_job_id = job_info.config._extract_job_id(result.stdout) job_info.status = JobStatus.SCHEDULED await self.sio.emit("job_scheduled", {"job_id": job_id, "scheduler_job_id": job_info.scheduler_job_id}) - # for scheduler jobs we exit here; completion is tracked elsewhere + # For scheduler jobs, remove from running_jobs since they run on external scheduler + # This frees up a slot for the next local job + self.job_pool.running_jobs.discard(job_id) + if self._limits_changed is not None: + self._limits_changed.set() return # ---------- local path ---------- @@ -1168,9 +1234,7 @@ async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | job_info.end_time = datetime.now() self.job_pool.running_jobs.discard(job_id) - # ❷ release the permit **only** when the job is truly finished - if semaphore is not None: - semaphore.release() + # Signal processing loop that capacity is now available if self._limits_changed is not None: self._limits_changed.set() @@ -1312,33 +1376,8 @@ async def edit_concurrent_limits(self, update_data: Dict[str, Any]) -> Optional[ f"{getattr(self.resource_limits, field)}" ) - # If max_concurrent_jobs changed, update semaphore - if "max_concurrent_jobs" in old_limits and self._job_semaphore is not None: - old_max = old_limits["max_concurrent_jobs"] - new_max = self.resource_limits.max_concurrent_jobs - diff = new_max - old_max - - logger.info(f"Updating semaphore from {old_max} to {new_max} (diff: {diff})") - - if diff > 0: - # Increase limit - release permits - for _ in range(diff): - self._job_semaphore.release() - logger.info(f"Released {diff} semaphore permits") - elif diff < 0: - # Decrease limit - acquire permits (non-blocking) - # Note: This doesn't stop already running jobs, just prevents new ones - acquired = 0 - for _ in range(abs(diff)): - if not self._job_semaphore.locked(): - try: - await asyncio.wait_for(self._job_semaphore.acquire(), timeout=0.01) - acquired += 1 - except asyncio.TimeoutError: - break - logger.info(f"Acquired {acquired} semaphore permits to reduce limit") - - # Signal processing loop to wake up and check if more jobs can be started + # Signal processing loop to re-evaluate capacity + # (We no longer manipulate semaphore since we use running_jobs count directly) if self._limits_changed is not None: self._limits_changed.set() From 53f33e48a49640d8eea44db203fc7c8f84b39d29 Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Mon, 10 Nov 2025 17:45:17 +0100 Subject: [PATCH 09/10] local job with massive job fixed #3 --- .../job_manager/backend/job_submission.py | 9 +- .../workflows/job_manager/backend/service.py | 88 +++++++++++++++++-- 2 files changed, 90 insertions(+), 7 deletions(-) diff --git a/src/pyedb/workflows/job_manager/backend/job_submission.py b/src/pyedb/workflows/job_manager/backend/job_submission.py index 855339981b..38a15775c7 100644 --- a/src/pyedb/workflows/job_manager/backend/job_submission.py +++ b/src/pyedb/workflows/job_manager/backend/job_submission.py @@ -68,6 +68,7 @@ from datetime import datetime import enum import getpass +import hashlib import logging import os import platform @@ -77,6 +78,7 @@ import subprocess # nosec B404 import tempfile from typing import Any, Dict, List, Optional, Union +import uuid from pydantic import BaseModel, Field @@ -468,7 +470,12 @@ def __init__(self, **data): else: self.ansys_edt_path = os.path.join(list(installed_versions.values())[-1], "ansysedt.exe") # latest if not self.jobid: - self.jobid = f"JOB_ID_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + # Generate unique job ID using timestamp and UUID to avoid collisions + # when submitting multiple jobs rapidly (batch submissions) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Use short UUID (first 8 chars) for readability while ensuring uniqueness + unique_id = str(uuid.uuid4())[:8] + self.jobid = f"JOB_{timestamp}_{unique_id}" if "auto" not in data: # user did not touch it data["auto"] = self.scheduler_type != SchedulerType.NONE self.validate_fields() diff --git a/src/pyedb/workflows/job_manager/backend/service.py b/src/pyedb/workflows/job_manager/backend/service.py index 85dffc64e5..576bf13afe 100644 --- a/src/pyedb/workflows/job_manager/backend/service.py +++ b/src/pyedb/workflows/job_manager/backend/service.py @@ -1049,13 +1049,19 @@ async def submit_job(self, config: HFSSSimulationConfig, priority: int = 0) -> s # Add to job pool (only if not already queued) self.job_pool.add_job(job_id, priority) + # Get queue stats for logging + queue_stats = self.job_pool.get_queue_stats() + # Notify web clients await self.sio.emit( "job_queued", - {"job_id": job_id, "priority": priority, "queue_position": self.job_pool.get_queue_stats()["total_queued"]}, + {"job_id": job_id, "priority": priority, "queue_position": queue_stats["total_queued"]}, ) - logger.info(f"Job {job_id} queued with priority {priority}") + logger.info( + f"Job {job_id} queued with priority {priority}. Queue: {queue_stats['total_queued']}, " + f"Running: {queue_stats['running_jobs']}/{queue_stats['max_concurrent']}" + ) # Trigger processing if not already running (protected by lock) logger.debug( @@ -1118,10 +1124,36 @@ async def _process_jobs_continuously(self): if self._shutdown: break - # Get next job from queue + # CRITICAL: Check capacity immediately before dequeuing + # This prevents race conditions in fast-completing jobs + current_running = len(self.job_pool.running_jobs) + if current_running >= self.resource_limits.max_concurrent_jobs: + logger.debug( + "Capacity check failed (%d/%d), waiting for slot...", + current_running, + self.resource_limits.max_concurrent_jobs, + ) + # Don't dequeue - wait for capacity + try: + await asyncio.wait_for(self._limits_changed.wait(), 0.5) + self._limits_changed.clear() + except asyncio.TimeoutError: + pass + continue + + # Get next job from queue (only if we have capacity) next_job_id = self.job_pool.get_next_job() if next_job_id is None: # Queue is empty, wait for new jobs + queue_stats = self.job_pool.get_queue_stats() + running_count = len(self.job_pool.running_jobs) + + # Log at INFO level if we're idle (no jobs running or queued) + if running_count == 0: + logger.info("📭 Queue empty and no jobs running. Waiting for new job submissions...") + else: + logger.debug("Queue empty but %d job(s) still running. Waiting...", running_count) + try: await asyncio.wait_for(self._limits_changed.wait(), 1.0) self._limits_changed.clear() @@ -1134,6 +1166,23 @@ async def _process_jobs_continuously(self): logger.warning(f"⚠️ Job {next_job_id} already running, skipping duplicate") continue + # Get the job info to check and update status + job_info = self.jobs.get(next_job_id) + if not job_info: + logger.error(f"Job {next_job_id} not found in jobs dict, skipping") + continue + + # FINAL capacity check before committing (prevents race with job completion) + if len(self.job_pool.running_jobs) >= self.resource_limits.max_concurrent_jobs: + logger.warning(f"⚠️ Capacity exceeded after dequeue (race condition), re-queueing {next_job_id}") + # Put job back in queue + self.job_pool.add_job(next_job_id, job_info.priority) + continue + + # Mark job as RUNNING immediately to prevent re-dequeuing + job_info.status = JobStatus.RUNNING + job_info.start_time = datetime.now() + # Mark as running BEFORE creating task to prevent race condition self.job_pool.running_jobs.add(next_job_id) logger.info( @@ -1154,17 +1203,27 @@ async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | """ Execute one job and always clean up running_jobs set when done. Semaphore parameter is deprecated but kept for compatibility. + + NOTE: Job status should already be RUNNING when this is called, + as it's set in the processing loop when the job is dequeued. """ job_info = self.jobs.get(job_id) - if not job_info or job_info.status != JobStatus.QUEUED: + if not job_info: + logger.error(f"Job {job_id} not found in jobs dict") self.job_pool.running_jobs.discard(job_id) # Signal that capacity is available if self._limits_changed is not None: self._limits_changed.set() return - job_info.status = JobStatus.RUNNING - job_info.start_time = datetime.now() + # Verify job is in correct state (should be RUNNING already) + if job_info.status != JobStatus.RUNNING: + logger.warning(f"Job {job_id} has unexpected status {job_info.status.value}, expected RUNNING") + # Don't return - try to process anyway but log the issue + + # Record local resources (start_time should already be set) + if not job_info.start_time: + job_info.start_time = datetime.now() job_info.local_resources = self.resource_monitor.current_usage.copy() await self.sio.emit( @@ -1234,9 +1293,26 @@ async def _process_single_job(self, job_id: str, semaphore: asyncio.Semaphore | job_info.end_time = datetime.now() self.job_pool.running_jobs.discard(job_id) + # Log completion with queue stats + queue_stats = self.job_pool.get_queue_stats() + running_count = len(self.job_pool.running_jobs) + queued_count = queue_stats["total_queued"] + + if running_count == 0 and queued_count == 0: + logger.info("✓ Job %s finished. ✅ ALL JOBS COMPLETE! (No jobs running or queued)", job_id) + else: + logger.info( + "✓ Job %s finished. Running: %d/%d, Queued: %d", + job_id, + running_count, + self.resource_limits.max_concurrent_jobs, + queued_count, + ) + # Signal processing loop that capacity is now available if self._limits_changed is not None: self._limits_changed.set() + logger.debug("✓ Signaled processing loop that slot is available") await self.sio.emit( "job_completed", From 4ac06cb6da432cbe6800a366547f8aa7c4e08b64 Mon Sep 17 00:00:00 2001 From: svandenb-dev Date: Tue, 11 Nov 2025 10:28:49 +0100 Subject: [PATCH 10/10] Val fix --- doc/source/workflows/drc/drc.rst | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/doc/source/workflows/drc/drc.rst b/doc/source/workflows/drc/drc.rst index eb12b405b4..5b410ed54d 100644 --- a/doc/source/workflows/drc/drc.rst +++ b/doc/source/workflows/drc/drc.rst @@ -85,7 +85,7 @@ Rule models BackDrillStubLength CopperBalance -DRC engine +DRC Engine ~~~~~~~~~~ .. autosummary:: @@ -128,8 +128,8 @@ Load a rule deck from JSON with open("my_rules.json") as f: rules = Rules.from_dict(json.load(f)) -Export violations to csv -~~~~~~~~~~~~~~~~~~~~~~~~ +Export violations to CSV +~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python