Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@ cython_debug/
# PyPI configuration file
.pypirc

# Test Reports
reports-*/*.json
# Test Reports (directories)
reports-*/
6 changes: 4 additions & 2 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ load:
type: constant|poisson # Load pattern type
interval: 1.0 # Seconds between request batches
stages: # Load progression stages
- rate: 1 # Requests per second
duration: 30 # Seconds to maintain this rate
- rate: 1 # Requests per second (CONSTANT or POISSON LOADS)
duration: 30 # Seconds to maintain this rate (CONSTANT or POISSON LOADS)
concurrency_level: 3 # Level of concurrency/number of worker threads (CONCURRENT LOADS)
num_requests: 40 # Number of requests to be processed by concurrency_level worker threads (CONCURRENT LOADS)
num_workers: 4 # Concurrent worker threads (default: CPU_cores)
worker_max_concurrency: 10 # Max concurrent requests per worker
worker_max_tcp_connections: 2500 # Max TCP connections per worker
Expand Down
47 changes: 40 additions & 7 deletions docs/loadgen.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@ graph TD

## Recommended Configuration

Choose the right machine to run inference-perf on. The maximum concurrency you can get from the benchmarking tool and the ability to hit the desired QPS relies on the machine on which you are running on. Especially the number of CPUs / cores and the clock speed help with the concurrency. Maximum concurrency you can reach is bounded by `num_workers * worker_max_concurrency`. You can only have as many in-flight requests. Our recommendation is to not change `num_workers` since it is automatically set by inference-perf based on number of CPUs available and change `worker_max_concurrency` when needed. It is set to `100` by default. But more powerful CPUs can handle up to 1000.
Choose the right machine to run inference-perf on. The maximum concurrency you can get from the benchmarking tool and the ability to hit the desired QPS relies on the machine on which you are running on. Especially the number of CPUs / cores and the clock speed help with the concurrency.

**For rate-based load types (`constant`, `poisson`):**
Maximum concurrency you can reach is bounded by `num_workers * worker_max_concurrency`. You can only have as many in-flight requests. Our recommendation is to not change `num_workers` since it is automatically set by inference-perf based on number of CPUs available and change `worker_max_concurrency` when needed. It is set to `100` by default. But more powerful CPUs can handle up to 1000.

**For concurrent load type (`concurrent`):**
The tool automatically manages worker allocation based on your specified `concurrency_level`. The `worker_max_concurrency` setting is ignored for concurrent load types, as workers are dynamically allocated to achieve the exact concurrency specified.

You have the following options to generate load with inference-perf.

Expand All @@ -73,7 +79,7 @@ You have the following options to generate load with inference-perf.

```
load:
type: constant
type: constant # or 'poisson' - sweep not available for 'concurrent'
sweep:
type: linear
```
Expand All @@ -89,20 +95,47 @@ This should allow the tool to generate the requested QPS.

```
load:
type: constant
type: constant # rate-based load generation
stages:
- rate: 100
duration: 60
- rate: 100 # requests per second
duration: 60 # duration in seconds
num_workers: 32
worker_max_concurrency: 250
```

### Run with specific concurrency instead of QPS
### Generate load with fixed concurrency levels

Use the `concurrent` load type when you want to specify exact concurrency levels rather than request rates. This is ideal for testing how your system performs under specific concurrent user loads.

```yaml
load:
type: concurrent
stages:
- num_requests: 1000
concurrency_level: 32
- num_requests: 2000
concurrency_level: 64
```

**Key differences from rate-based load types:**
- Uses `num_requests` and `concurrency_level` instead of `rate` and `duration`
- Maintains exactly the specified concurrency throughout the test
- Cannot be used with sweep configuration
- Workers are dynamically allocated based on concurrency requirements

**Configuration validation:**
- `concurrent` load type requires `num_requests` and `concurrency_level` for each stage
- `rate` and `duration` are not allowed and will cause validation errors
- `sweep` configuration is incompatible with concurrent load type

### Run with specific concurrency instead of QPS (Legacy approach)

**Note: This approach is deprecated. Use the `concurrent` load type instead for better concurrency control.**

You might be interested in only specifying the concurrency (number of users) on the benchmarking side. In this case, modify `num_workers` and `worker_max_concurrency` in such a way that `num_workers * worker_max_concurrency` gives you the desired concurrency number. Then set the QPS really high so as to keep all the workers fully utilized.

For example, if you need to run with concurrency of 32 and you have 4 CPUs on your machine, set the following:
```
```yaml
load:
type: constant
stages:
Expand Down
121 changes: 111 additions & 10 deletions inference_perf/analysis/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,19 @@ def analyze_reports(report_dir: str) -> None:
logger.error(f"No stage lifecycle metrics files found in {report_dir}")
return

# Latency data
# Latency data (concurrency)
concurrency_vs_ttft: List[Tuple[float, float]] = []
concurrency_vs_ntpot: List[Tuple[float, float]] = []
concurrency_vs_itl: List[Tuple[float, float]] = []
# Throughput data (concurrency)
concurrency_vs_itps: List[Tuple[float, float]] = []
concurrency_vs_otps: List[Tuple[float, float]] = []
concurrency_vs_ttps: List[Tuple[float, float]] = []
# Latency data (QPS)
qps_vs_ttft: List[Tuple[float, float]] = []
qps_vs_ntpot: List[Tuple[float, float]] = []
qps_vs_itl: List[Tuple[float, float]] = []
# Throughput data
# Throughput data (QPS)
qps_vs_itps: List[Tuple[float, float]] = []
qps_vs_otps: List[Tuple[float, float]] = []
qps_vs_ttps: List[Tuple[float, float]] = []
Expand All @@ -114,6 +122,9 @@ def analyze_reports(report_dir: str) -> None:
with open(stage_file, "r") as f:
report_data = json.load(f)

# Get concurrency
concurrency = report_data.get("load_summary", {}).get("concurrency", None)

# Get QPS from report file
qps = report_data.get("load_summary", {}).get("achieved_rate")
if qps is None:
Expand All @@ -131,31 +142,49 @@ def analyze_reports(report_dir: str) -> None:
if latency_data:
ttft = _extract_latency_metric(latency_data, "time_to_first_token", convert_to_ms=True)
if ttft is not None:
qps_vs_ttft.append((qps, ttft))
if concurrency:
concurrency_vs_ttft.append((concurrency, ttft))
else:
qps_vs_ttft.append((qps, ttft))

ntpot = _extract_latency_metric(latency_data, "normalized_time_per_output_token", convert_to_ms=True)
if ntpot is not None:
qps_vs_ntpot.append((qps, ntpot))
if concurrency:
concurrency_vs_ntpot.append((concurrency, ntpot))
else:
qps_vs_ntpot.append((qps, ntpot))

itl = _extract_latency_metric(latency_data, "inter_token_latency", convert_to_ms=True)
if itl is not None:
qps_vs_itl.append((qps, itl))
if concurrency:
concurrency_vs_itl.append((concurrency, itl))
else:
qps_vs_itl.append((qps, itl))

# Extract throughput metrics if they exist
otps = None
throughput_data = success_data.get("throughput", {})
if throughput_data:
itps = _extract_throughput_metric(throughput_data, "input_tokens_per_sec")
if itps is not None:
qps_vs_itps.append((qps, itps))
if concurrency:
concurrency_vs_itps.append((concurrency, itps))
else:
qps_vs_itps.append((qps, itps))

otps = _extract_throughput_metric(throughput_data, "output_tokens_per_sec")
if otps is not None:
qps_vs_otps.append((qps, otps))
if concurrency:
concurrency_vs_otps.append((concurrency, otps))
else:
qps_vs_otps.append((qps, otps))

ttps = _extract_throughput_metric(throughput_data, "total_tokens_per_sec")
if ttps is not None:
qps_vs_ttps.append((qps, ttps))
if concurrency:
concurrency_vs_ttps.append((concurrency, ttps))
else:
qps_vs_ttps.append((qps, ttps))

# Populate latency vs throughput data
if otps is not None:
Expand All @@ -173,7 +202,79 @@ def analyze_reports(report_dir: str) -> None:
logger.error(f"An unexpected error occurred while processing {stage_file.name}: {e}")
continue

# --- Generate Latency Plot ---
# --- Generate Concurrency Latency Plot ---
concurrency_latency_charts_to_generate = []
if concurrency_vs_ttft:
concurrency_latency_charts_to_generate.append(
{
"title": "Time to First Token vs. Concurrency",
"xlabel": "Concurrency",
"ylabel": "Mean TTFT (ms)",
"data": sorted(concurrency_vs_ttft, key=operator.itemgetter(0)),
}
)
if concurrency_vs_ntpot:
concurrency_latency_charts_to_generate.append(
{
"title": "Norm. Time per Output Token vs. Concurrency",
"xlabel": "Concurrency",
"ylabel": "Mean Norm. Time (ms/token)",
"data": sorted(concurrency_vs_ntpot, key=operator.itemgetter(0)),
}
)
if concurrency_vs_itl:
concurrency_latency_charts_to_generate.append(
{
"title": "Inter-Token Latency vs. Concurrency",
"xlabel": "Concurrency",
"ylabel": "Mean ITL (ms)",
"data": sorted(concurrency_vs_itl, key=operator.itemgetter(0)),
}
)

_generate_plot(
concurrency_latency_charts_to_generate,
"Latency vs Concurrency",
report_path / "latency_vs_concurrency.png",
)

# --- Generate Concurrency Throughput Plot ---
concurrency_throughput_charts_to_generate = []
if concurrency_vs_itps:
concurrency_throughput_charts_to_generate.append(
{
"title": "Input Tokens/sec vs. Concurrency",
"xlabel": "Concurrency",
"ylabel": "Tokens/sec",
"data": sorted(concurrency_vs_itps, key=operator.itemgetter(0)),
}
)
if concurrency_vs_otps:
concurrency_throughput_charts_to_generate.append(
{
"title": "Output Tokens/sec vs. Concurrency",
"xlabel": "Concurrency",
"ylabel": "Tokens/sec",
"data": sorted(concurrency_vs_otps, key=operator.itemgetter(0)),
}
)
if concurrency_vs_ttps:
concurrency_throughput_charts_to_generate.append(
{
"title": "Total Tokens/sec vs. Concurrency",
"xlabel": "Concurrency",
"ylabel": "Tokens/sec",
"data": sorted(concurrency_vs_ttps, key=operator.itemgetter(0)),
}
)

_generate_plot(
concurrency_throughput_charts_to_generate,
"Throughput vs Concurrency",
report_path / "throughput_vs_concurrency.png",
)

# --- Generate QPS Latency Plot ---
latency_charts_to_generate = []
if qps_vs_ttft:
latency_charts_to_generate.append(
Expand Down Expand Up @@ -206,7 +307,7 @@ def analyze_reports(report_dir: str) -> None:
report_path / "latency_vs_qps.png",
)

# --- Generate Throughput Plot ---
# --- Generate QPS Throughput Plot ---
throughput_charts_to_generate = []
if qps_vs_itps:
throughput_charts_to_generate.append(
Expand Down
3 changes: 2 additions & 1 deletion inference_perf/client/metricsclient/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import TypedDict
from typing import Optional, TypedDict
from pydantic import BaseModel


Expand All @@ -35,6 +35,7 @@ class StageRuntimeInfo(BaseModel):
end_time: float
start_time: float
status: StageStatus
concurrency_level: Optional[int] = None


class PerfRuntimeParameters:
Expand Down
Loading