diff --git a/docs/features/disaggregated.md b/docs/features/disaggregated.md index 58ecaeb245d..e240d33a283 100644 --- a/docs/features/disaggregated.md +++ b/docs/features/disaggregated.md @@ -29,48 +29,6 @@ In multi-instance scenarios, each incoming request needs to be assigned to diffe ## Usage Instructions -### Single-machine Disaggregated Deployment - -#### Online Inference Service -Use the following commands for service deployment: - -**Prefill Instance** - -```bash -export FD_LOG_DIR="log_prefill" -export CUDA_VISIBLE_DEVICES=0,1,2,3 -python -m fastdeploy.entrypoints.openai.api_server \ - --model ERNIE-4.5-300B-A47B-BF16 \ - --port 8180 --metrics-port 8181 \ - --engine-worker-queue-port 8182 \ - --cache-queue-port 8183 \ - --tensor-parallel-size 4 \ - --quantization wint4 \ - --splitwise-role "prefill" -``` - -**Decode Instance** - -```bash -export FD_LOG_DIR="log_decode" -export CUDA_VISIBLE_DEVICES=4,5,6,7 -# Note: innode-prefill-ports should specify the engine-worker-queue-port of the Prefill service -python -m fastdeploy.entrypoints.openai.api_server \ - --model ERNIE-4.5-300B-A47B-BF16 \ - --port 8184 --metrics-port 8185 \ - --engine-worker-queue-port 8186 \ - --cache-queue-port 8187 \ - --tensor-parallel-size 4 \ - --quantization wint4 \ - --innode-prefill-ports 8182 \ - --splitwise-role "decode" -``` - -Note: When requesting single-machine PD disaggregated service, **users should request the Decode service's port**. - -#### Offline Inference Service -Refer to the example code `offline_disaggregated_demo.py` in the `fastdeploy/demo` directory for offline inference service deployment. - ### Multi-machine Disaggregated Deployment #### Prerequisite: Redis @@ -118,12 +76,14 @@ For multi-machine deployment, confirm that the NIC supports RDMA and that all no ```bash export FD_LOG_DIR="log_prefill" export CUDA_VISIBLE_DEVICES=0,1,2,3 +export ENABLE_V1_KVCACHE_SCHEDULER=0 echo "set RDMA NICS" export $(bash scripts/get_rdma_nics.sh gpu) echo "KVCACHE_RDMA_NICS ${KVCACHE_RDMA_NICS}" python -m fastdeploy.entrypoints.openai.api_server \ --model ERNIE-4.5-300B-A47B-BF16 \ - --port 8180 --metrics-port 8181 \ + --port 8180 \ + --metrics-port 8181 \ --engine-worker-queue-port 8182 \ --cache-queue-port 8183 \ --tensor-parallel-size 4 \ @@ -143,12 +103,14 @@ python -m fastdeploy.entrypoints.openai.api_server \ ```bash export FD_LOG_DIR="log_decode" export CUDA_VISIBLE_DEVICES=4,5,6,7 +export ENABLE_V1_KVCACHE_SCHEDULER=0 echo "set RDMA NICS" export $(bash scripts/get_rdma_nics.sh gpu) echo "KVCACHE_RDMA_NICS ${KVCACHE_RDMA_NICS}" python -m fastdeploy.entrypoints.openai.api_server \ --model ERNIE-4.5-300B-A47B-BF16 \ - --port 8184 --metrics-port 8185 \ + --port 8184 \ + --metrics-port 8185 \ --engine-worker-queue-port 8186 \ --cache-queue-port 8187 \ --tensor-parallel-size 4 \ diff --git a/docs/zh/features/disaggregated.md b/docs/zh/features/disaggregated.md index 909925ea697..093fdd24c6a 100644 --- a/docs/zh/features/disaggregated.md +++ b/docs/zh/features/disaggregated.md @@ -29,49 +29,6 @@ ## 使用说明 -### 单机分离式部署 - -#### 在线推理服务 -使用如下命令进行服务部署 - -**prefill 实例** - -```bash -export FD_LOG_DIR="log_prefill" -export CUDA_VISIBLE_DEVICES=0,1,2,3 -python -m fastdeploy.entrypoints.openai.api_server \ - --model ERNIE-4.5-300B-A47B-BF16 \ - --port 8180 --metrics-port 8181 \ - --engine-worker-queue-port 8182 \ - --cache-queue-port 8183 \ - --tensor-parallel-size 4 \ - --quantization wint4 \ - --splitwise-role "prefill" -``` - -**decode 实例** - -```bash -export FD_LOG_DIR="log_decode" -export CUDA_VISIBLE_DEVICES=4,5,6,7 -# 注意innode-prefill-ports指定为Prefill服务的engine-worker-queue-port -python -m fastdeploy.entrypoints.openai.api_server \ - --model ERNIE-4.5-300B-A47B-BF16 \ - --port 8184 --metrics-port 8185 \ - --engine-worker-queue-port 8186 \ - --cache-queue-port 8187 \ - --tensor-parallel-size 4 \ - --quantization wint4 \ - --innode-prefill-ports 8182 \ - --splitwise-role "decode" -``` - -注意在请求单机PD分离服务时,**用户需请求Decode服务的端口**。 - -#### 离线推理服务 - -参考`fastdeploy/demo` 目录下 `offline_disaggregated_demo.py` 示例代码,进行离线推理服务部署 - ### 多机分离式部署 #### 前置依赖 Redis @@ -120,6 +77,7 @@ sudo systemctl start redis export FD_LOG_DIR="log_prefill" export CUDA_VISIBLE_DEVICES=0,1,2,3 +export ENABLE_V1_KVCACHE_SCHEDULER=0 echo "set RDMA NICS" export $(bash scripts/get_rdma_nics.sh gpu) echo "KVCACHE_RDMA_NICS ${KVCACHE_RDMA_NICS}" @@ -146,6 +104,7 @@ python -m fastdeploy.entrypoints.openai.api_server \ ```bash export FD_LOG_DIR="log_decode" export CUDA_VISIBLE_DEVICES=4,5,6,7 +export ENABLE_V1_KVCACHE_SCHEDULER=0 echo "set RDMA NICS" export $(bash scripts/get_rdma_nics.sh gpu) echo "KVCACHE_RDMA_NICS ${KVCACHE_RDMA_NICS}" diff --git a/examples/splitwise/start_mixed.sh b/examples/splitwise/start_mixed.sh index bf3e78ab058..c36027ac26a 100644 --- a/examples/splitwise/start_mixed.sh +++ b/examples/splitwise/start_mixed.sh @@ -1,6 +1,8 @@ #!/bin/bash set -e +# Test mixed server + router + wait_for_health() { local server_port=$1 while true; do @@ -16,7 +18,6 @@ wait_for_health() { # prepare environment MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" -# MODEL_NAME="baidu/ERNIE-4.5-21B-A3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=0 @@ -25,13 +26,16 @@ export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 unset http_proxy && unset https_proxy rm -rf log_* +S1_PORT=52400 +S2_PORT=52500 +ROUTER_PORT=52600 + # start router export FD_LOG_DIR="log_router" mkdir -p ${FD_LOG_DIR} -router_port=9000 nohup python -m fastdeploy.router.launch \ - --port ${router_port} \ + --port ${ROUTER_PORT} \ 2>&1 >${FD_LOG_DIR}/nohup & sleep 1 @@ -42,16 +46,16 @@ mkdir -p ${FD_LOG_DIR} nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ + --port ${S1_PORT} \ + --metrics-port $((S1_PORT + 1)) \ + --engine-worker-queue-port $((S1_PORT + 2)) \ + --cache-queue-port $((S1_PORT + 3)) \ --max-model-len 32768 \ - --router "0.0.0.0:${router_port}" \ + --router "0.0.0.0:${ROUTER_PORT}" \ 2>&1 >${FD_LOG_DIR}/nohup & sleep 1 -wait_for_health 8100 +wait_for_health ${S1_PORT} # start modelserver 1 export CUDA_VISIBLE_DEVICES=1 @@ -60,12 +64,24 @@ mkdir -p ${FD_LOG_DIR} nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 8200 \ - --metrics-port 8201 \ - --engine-worker-queue-port 8202 \ - --cache-queue-port 8203 \ + --port ${S2_PORT} \ + --metrics-port $((S2_PORT + 1)) \ + --engine-worker-queue-port $((S2_PORT + 2)) \ + --cache-queue-port $((S2_PORT + 3)) \ --max-model-len 32768 \ - --router "0.0.0.0:${router_port}" \ + --router "0.0.0.0:${ROUTER_PORT}" \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 8200 +wait_for_health ${S2_PORT} + +# send request +sleep 10 # make sure server is registered to router +curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": true +}' diff --git a/examples/splitwise/start_v0_tp1.sh b/examples/splitwise/start_v0_tp1.sh index 30dbb5a906d..42f585a5a71 100644 --- a/examples/splitwise/start_v0_tp1.sh +++ b/examples/splitwise/start_v0_tp1.sh @@ -2,9 +2,9 @@ set -e # Test splitwise deployment -# v0 requires prefill and decode in one node and it uses local scheduler -# v1 supports prefill and decode in multi node and it uses splitwise scheduler -# v2 supports prefill and decode in multi node and it uses router and local scheduler +# There are two methods for splitwise deployment: +# v0: using splitwise_scheduler or dp_scheduler +# v1: using local_scheduler + router wait_for_health() { local server_port=$1 @@ -19,48 +19,97 @@ wait_for_health() { done } +# prepare environment MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" -# MODEL_NAME="baidu/ERNIE-4.5-21B-A3B-Paddle" -aistudio download --model ${MODEL_NAME} + +export FD_DEBUG=1 +export ENABLE_V1_KVCACHE_SCHEDULER=1 +export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 + +SCRIPT_PATH=$(readlink -f "$0") +SCRIPT_DIR=$(dirname "$SCRIPT_PATH") +export $(bash ${SCRIPT_DIR}/../../scripts/get_rdma_nics.sh gpu) +echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}" +if [ -z "${KVCACHE_RDMA_NICS}" ]; then + echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh" + exit 1 +fi unset http_proxy && unset https_proxy rm -rf log_* +P_PORT=52400 +D_PORT=52500 +REDIS_PORT=56388 + +# start redis +if ! redis-cli -p ${REDIS_PORT} ping &>/dev/null; then + echo "Redis is not running. Starting redis-server..." + redis-server --daemonize yes --port ${REDIS_PORT} + sleep 1 +else + echo "Redis is already running." +fi +sleep 1 + # start prefill +export CUDA_VISIBLE_DEVICES=0 export FD_LOG_DIR="log_prefill" mkdir -p ${FD_LOG_DIR} -export CUDA_VISIBLE_DEVICES=0 -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 - nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ + --port ${P_PORT} \ + --metrics-port $((P_PORT + 1)) \ + --engine-worker-queue-port $((P_PORT + 2)) \ + --cache-queue-port $((P_PORT + 3)) \ --max-model-len 32768 \ + --num-gpu-blocks-override 1000 \ --splitwise-role "prefill" \ + --cache-transfer-protocol "rdma" \ + --rdma-comm-ports $((P_PORT + 4)) \ + --pd-comm-port $((P_PORT + 5)) \ + --scheduler-name "splitwise" \ + --scheduler-host "127.0.0.1" \ + --scheduler-port ${REDIS_PORT} \ + --scheduler-ttl 9000 \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 8100 + +wait_for_health ${P_PORT} # start decode +export CUDA_VISIBLE_DEVICES=1 export FD_LOG_DIR="log_decode" mkdir -p ${FD_LOG_DIR} -export CUDA_VISIBLE_DEVICES=1 -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 - nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 9000 \ - --metrics-port 9001 \ - --engine-worker-queue-port 9002 \ - --cache-queue-port 9003 \ + --port ${D_PORT} \ + --metrics-port $((D_PORT + 1)) \ + --engine-worker-queue-port $((D_PORT + 2)) \ + --cache-queue-port $((D_PORT + 3)) \ --max-model-len 32768 \ --splitwise-role "decode" \ - --innode-prefill-ports 8102 \ + --cache-transfer-protocol "rdma" \ + --rdma-comm-ports $((D_PORT + 4)) \ + --pd-comm-port $((D_PORT + 5)) \ + --scheduler-name "splitwise" \ + --scheduler-host "127.0.0.1" \ + --scheduler-port ${REDIS_PORT} \ + --scheduler-ttl 9000 \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 9000 + +wait_for_health ${D_PORT} + + +# send request +sleep 10 # make sure server is registered to router +curl -X POST "http://0.0.0.0:${D_PORT}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": true +}' diff --git a/examples/splitwise/start_v2_tp2.sh b/examples/splitwise/start_v0_tp2.sh similarity index 59% rename from examples/splitwise/start_v2_tp2.sh rename to examples/splitwise/start_v0_tp2.sh index 5563b2f4c98..cb2015ec4ac 100644 --- a/examples/splitwise/start_v2_tp2.sh +++ b/examples/splitwise/start_v0_tp2.sh @@ -2,9 +2,9 @@ set -e # Test splitwise deployment -# v0 requires prefill and decode in one node and it uses local scheduler -# v1 supports prefill and decode in multi node and it uses splitwise scheduler -# v2 supports prefill and decode in multi node and it uses router and local scheduler +# There are two methods for splitwise deployment: +# v0: using splitwise_scheduler or dp_scheduler +# v1: using local_scheduler + router wait_for_health() { local server_port=$1 @@ -21,7 +21,6 @@ wait_for_health() { # prepare environment MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" -# MODEL_NAME="baidu/ERNIE-4.5-21B-A3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=0 @@ -39,16 +38,14 @@ fi unset http_proxy && unset https_proxy rm -rf log_* -# start router -export FD_LOG_DIR="log_router" -mkdir -p ${FD_LOG_DIR} - -echo "start router" -router_port=9000 -nohup python -m fastdeploy.router.launch \ - --port ${router_port} \ - --splitwise \ - 2>&1 >${FD_LOG_DIR}/nohup & +# start redis +if ! redis-cli ping &>/dev/null; then + echo "Redis is not running. Starting redis-server..." + redis-server --daemonize yes + sleep 1 +else + echo "Redis is already running." +fi sleep 1 # start prefill @@ -56,41 +53,59 @@ export CUDA_VISIBLE_DEVICES=0,1 export FD_LOG_DIR="log_prefill" mkdir -p ${FD_LOG_DIR} -echo "start prefill" nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ --port 8100 \ --metrics-port 8101 \ --engine-worker-queue-port 8102 \ --cache-queue-port 8103 \ - --tensor-parallel-size 2 \ --max-model-len 32768 \ + --tensor-parallel-size 2 \ --splitwise-role "prefill" \ + --cache-transfer-protocol "rdma,ipc" \ --pd-comm-port 8104 \ --rdma-comm-ports 8105,8106 \ - --router "0.0.0.0:${router_port}" \ + --scheduler-name "splitwise" \ + --scheduler-host "127.0.0.1" \ + --scheduler-port 6379 \ + --scheduler-ttl 9000 \ 2>&1 >${FD_LOG_DIR}/nohup & - -wait_for_health 8100 +# wait_for_health 8100 # start decode export CUDA_VISIBLE_DEVICES=2,3 export FD_LOG_DIR="log_decode" mkdir -p ${FD_LOG_DIR} -echo "start decode" nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 8200 \ - --metrics-port 8201 \ - --engine-worker-queue-port 8202 \ - --cache-queue-port 8203 \ + --port 9000 \ + --metrics-port 9001 \ + --engine-worker-queue-port 9002 \ + --cache-queue-port 9003 \ --max-model-len 32768 \ --tensor-parallel-size 2 \ --splitwise-role "decode" \ - --pd-comm-port 8204 \ - --rdma-comm-ports 8205,8206 \ - --router "0.0.0.0:${router_port}" \ + --cache-transfer-protocol "rdma,ipc" \ + --pd-comm-port 9004 \ + --rdma-comm-ports 9005,9006 \ + --scheduler-name "splitwise" \ + --scheduler-host "127.0.0.1" \ + --scheduler-port 6379 \ + --scheduler-ttl 9000 \ 2>&1 >${FD_LOG_DIR}/nohup & +wait_for_health 9000 + -wait_for_health 8200 +# send request +sleep 10 # make sure server is registered to router +port=9000 +curl -X POST "http://0.0.0.0:${port}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": true +}' diff --git a/examples/splitwise/start_v1_tp1.sh b/examples/splitwise/start_v1_tp1.sh index 12377404c1d..31eca8ab77f 100644 --- a/examples/splitwise/start_v1_tp1.sh +++ b/examples/splitwise/start_v1_tp1.sh @@ -2,9 +2,9 @@ set -e # Test splitwise deployment -# v0 requires prefill and decode in one node and it uses local scheduler -# v1 supports prefill and decode in multi node and it uses splitwise scheduler -# v2 supports prefill and decode in multi node and it uses router and local scheduler +# There are two methods for splitwise deployment: +# v0: using splitwise_scheduler or dp_scheduler +# v1: using local_scheduler + router wait_for_health() { local server_port=$1 @@ -21,10 +21,9 @@ wait_for_health() { # prepare environment MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" -# MODEL_NAME="baidu/ERNIE-4.5-21B-A3B-Paddle" export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 +export ENABLE_V1_KVCACHE_SCHEDULER=1 export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 SCRIPT_PATH=$(readlink -f "$0") @@ -39,14 +38,18 @@ fi unset http_proxy && unset https_proxy rm -rf log_* -# start redis -if ! redis-cli ping &>/dev/null; then - echo "Redis is not running. Starting redis-server..." - redis-server --daemonize yes - sleep 1 -else - echo "Redis is already running." -fi +P_PORT=52400 +D_PORT=52500 +ROUTER_PORT=52600 + +# start router +export FD_LOG_DIR="log_router" +mkdir -p ${FD_LOG_DIR} + +nohup python -m fastdeploy.router.launch \ + --port ${ROUTER_PORT} \ + --splitwise \ + 2>&1 >${FD_LOG_DIR}/nohup & sleep 1 # start prefill @@ -56,21 +59,20 @@ mkdir -p ${FD_LOG_DIR} nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ + --port "${P_PORT}" \ + --metrics-port "$((P_PORT + 1))" \ + --engine-worker-queue-port "$((P_PORT + 2))" \ + --cache-queue-port "$((P_PORT + 3))" \ --max-model-len 32768 \ --splitwise-role "prefill" \ - --cache-transfer-protocol "rdma,ipc" \ - --rdma-comm-ports 8104 \ - --pd-comm-port 8105 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ + --cache-transfer-protocol "rdma" \ + --rdma-comm-ports "$((P_PORT + 4))" \ + --pd-comm-port "$((P_PORT + 5))" \ + --num-gpu-blocks-override 2000 \ + --router "0.0.0.0:${ROUTER_PORT}" \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 8100 + +wait_for_health ${P_PORT} # start decode export CUDA_VISIBLE_DEVICES=1 @@ -79,18 +81,28 @@ mkdir -p ${FD_LOG_DIR} nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 9000 \ - --metrics-port 9001 \ - --engine-worker-queue-port 9002 \ - --cache-queue-port 9003 \ + --port "${D_PORT}" \ + --metrics-port "$((D_PORT + 2))" \ + --engine-worker-queue-port "$((D_PORT + 3))" \ + --cache-queue-port "$((D_PORT + 1))" \ --max-model-len 32768 \ --splitwise-role "decode" \ - --cache-transfer-protocol "rdma,ipc" \ - --rdma-comm-ports 9004 \ - --pd-comm-port 9005 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ + --cache-transfer-protocol "rdma" \ + --rdma-comm-ports "$((D_PORT + 4))" \ + --pd-comm-port "$((D_PORT + 5))" \ + --router "0.0.0.0:${ROUTER_PORT}" \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 9000 + +wait_for_health ${D_PORT} + +# send request +sleep 10 # make sure server is registered to router +curl -X POST "http://0.0.0.0:${ROUTER_PORT}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": true +}' diff --git a/examples/splitwise/start_v1_tp2.sh b/examples/splitwise/start_v1_tp2.sh index cf0b728064a..c58a8a9cead 100644 --- a/examples/splitwise/start_v1_tp2.sh +++ b/examples/splitwise/start_v1_tp2.sh @@ -2,9 +2,9 @@ set -e # Test splitwise deployment -# v0 requires prefill and decode in one node and it uses local scheduler -# v1 supports prefill and decode in multi node and it uses splitwise scheduler -# v2 supports prefill and decode in multi node and it uses router and local scheduler +# There are two methods for splitwise deployment: +# v0: using splitwise_scheduler or dp_scheduler +# v1: using local_scheduler + router wait_for_health() { local server_port=$1 @@ -21,7 +21,6 @@ wait_for_health() { # prepare environment MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" -# MODEL_NAME="baidu/ERNIE-4.5-21B-A3B-Paddle" export FD_DEBUG=1 export ENABLE_V1_KVCACHE_SCHEDULER=0 @@ -39,14 +38,16 @@ fi unset http_proxy && unset https_proxy rm -rf log_* -# start redis -if ! redis-cli ping &>/dev/null; then - echo "Redis is not running. Starting redis-server..." - redis-server --daemonize yes - sleep 1 -else - echo "Redis is already running." -fi +# start router +export FD_LOG_DIR="log_router" +mkdir -p ${FD_LOG_DIR} + +echo "start router" +router_port=9000 +nohup python -m fastdeploy.router.launch \ + --port ${router_port} \ + --splitwise \ + 2>&1 >${FD_LOG_DIR}/nohup & sleep 1 # start prefill @@ -54,45 +55,56 @@ export CUDA_VISIBLE_DEVICES=0,1 export FD_LOG_DIR="log_prefill" mkdir -p ${FD_LOG_DIR} +echo "start prefill" nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ --port 8100 \ --metrics-port 8101 \ --engine-worker-queue-port 8102 \ --cache-queue-port 8103 \ - --max-model-len 32768 \ --tensor-parallel-size 2 \ + --max-model-len 32768 \ --splitwise-role "prefill" \ - --cache-transfer-protocol "rdma,ipc" \ --pd-comm-port 8104 \ --rdma-comm-ports 8105,8106 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ + --router "0.0.0.0:${router_port}" \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 8100 + +# wait_for_health 8100 # start decode export CUDA_VISIBLE_DEVICES=2,3 export FD_LOG_DIR="log_decode" mkdir -p ${FD_LOG_DIR} +echo "start decode" nohup python -m fastdeploy.entrypoints.openai.api_server \ --model ${MODEL_NAME} \ - --port 9000 \ - --metrics-port 9001 \ - --engine-worker-queue-port 9002 \ - --cache-queue-port 9003 \ + --port 8200 \ + --metrics-port 8201 \ + --engine-worker-queue-port 8202 \ + --cache-queue-port 8203 \ --max-model-len 32768 \ --tensor-parallel-size 2 \ --splitwise-role "decode" \ - --cache-transfer-protocol "rdma,ipc" \ - --pd-comm-port 9004 \ - --rdma-comm-ports 9005,9006 \ - --scheduler-name "splitwise" \ - --scheduler-host "127.0.0.1" \ - --scheduler-port 6379 \ - --scheduler-ttl 9000 \ + --pd-comm-port 8204 \ + --rdma-comm-ports 8205,8206 \ + --router "0.0.0.0:${router_port}" \ 2>&1 >${FD_LOG_DIR}/nohup & -wait_for_health 9000 + +wait_for_health 8200 + + + +# send request +sleep 10 # make sure server is registered to router +port=9000 +curl -X POST "http://0.0.0.0:${port}/v1/chat/completions" \ +-H "Content-Type: application/json" \ +-d '{ + "messages": [ + {"role": "user", "content": "hello"} + ], + "max_tokens": 20, + "stream": true +}' diff --git a/examples/splitwise/start_v2_tp1.sh b/examples/splitwise/start_v2_tp1.sh deleted file mode 100644 index 78a0358f957..00000000000 --- a/examples/splitwise/start_v2_tp1.sh +++ /dev/null @@ -1,93 +0,0 @@ -#!/bin/bash -set -e - -# Test splitwise deployment -# v0 requires prefill and decode in one node and it uses local scheduler -# v1 supports prefill and decode in multi node and it uses splitwise scheduler -# v2 supports prefill and decode in multi node and it uses router and local scheduler - -wait_for_health() { - local server_port=$1 - while true; do - status_code=$(curl -s -o /dev/null -w "%{http_code}" "http://0.0.0.0:${server_port}/health" || echo "000") - if [ "$status_code" -eq 200 ]; then - break - else - echo "Service not ready. Retrying in 2s..." - sleep 2 - fi - done -} - -# prepare environment -MODEL_NAME="PaddlePaddle/ERNIE-4.5-0.3B-Paddle" -# MODEL_NAME="baidu/ERNIE-4.5-21B-A3B-Paddle" - -export FD_DEBUG=1 -export ENABLE_V1_KVCACHE_SCHEDULER=0 -export KVCACHE_GDRCOPY_FLUSH_ENABLE=1 - -SCRIPT_PATH=$(readlink -f "$0") -SCRIPT_DIR=$(dirname "$SCRIPT_PATH") -export $(bash ${SCRIPT_DIR}/../../scripts/get_rdma_nics.sh gpu) -echo "KVCACHE_RDMA_NICS:${KVCACHE_RDMA_NICS}" -if [ -z "${KVCACHE_RDMA_NICS}" ]; then - echo "KVCACHE_RDMA_NICS is empty, please check the output of get_rdma_nics.sh" - exit 1 -fi - -unset http_proxy && unset https_proxy -rm -rf log_* - -# start router -export FD_LOG_DIR="log_router" -mkdir -p ${FD_LOG_DIR} - -router_port=9000 -nohup python -m fastdeploy.router.launch \ - --port ${router_port} \ - --splitwise \ - 2>&1 >${FD_LOG_DIR}/nohup & -sleep 1 - -# start prefill -export CUDA_VISIBLE_DEVICES=0 -export FD_LOG_DIR="log_prefill" -mkdir -p ${FD_LOG_DIR} - -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8100 \ - --metrics-port 8101 \ - --engine-worker-queue-port 8102 \ - --cache-queue-port 8103 \ - --max-model-len 32768 \ - --splitwise-role "prefill" \ - --cache-transfer-protocol "ipc,rdma" \ - --rdma-comm-ports 8104 \ - --pd-comm-port 8105 \ - --router "0.0.0.0:${router_port}" \ - 2>&1 >${FD_LOG_DIR}/nohup & - -wait_for_health 8100 - -# start decode -export CUDA_VISIBLE_DEVICES=1 -export FD_LOG_DIR="log_decode" -mkdir -p ${FD_LOG_DIR} - -nohup python -m fastdeploy.entrypoints.openai.api_server \ - --model ${MODEL_NAME} \ - --port 8200 \ - --metrics-port 8201 \ - --engine-worker-queue-port 8202 \ - --cache-queue-port 8203 \ - --max-model-len 32768 \ - --splitwise-role "decode" \ - --cache-transfer-protocol "ipc,rdma" \ - --rdma-comm-ports 8204 \ - --pd-comm-port 8205 \ - --router "0.0.0.0:${router_port}" \ - 2>&1 >${FD_LOG_DIR}/nohup & - -wait_for_health 8200 diff --git a/fastdeploy/cache_manager/cache_messager.py b/fastdeploy/cache_manager/cache_messager.py index 7717afa3a6b..dc3d64099a8 100644 --- a/fastdeploy/cache_manager/cache_messager.py +++ b/fastdeploy/cache_manager/cache_messager.py @@ -622,8 +622,13 @@ def prefill_layerwise_send_cache_thread(self): target_id = int(task["rdma_ports"][self.rank]) if "error" in task["status"]: continue + + # TODO: use is connected to check if the connection is still alive + logger.debug(f"rdma, start connect decode, {target_ip}:{target_id}") status = self.messager[current_transfer_protocol].connect(target_ip, target_id) - if not status: + if status: + logger.info(f"connect to {target_ip}:{target_id} success") + else: logger.error(f"connect to {target_ip}:{target_id} failed") task["status"] = "connection error" continue @@ -752,7 +757,7 @@ def _handle_connect_task(self): self.engine_worker_queue.connect_task_response_barrier.wait() self.engine_worker_queue.put_connect_rdma_task_response(response) except Exception as e: - logger.error(f"handle_connect_task has exception: {e}") + logger.error(f"handle_connect_task has exception: {e}, {traceback.format_exc()}") def main(): diff --git a/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py b/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py index 61a4fa10b06..e87c77f277e 100644 --- a/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py +++ b/fastdeploy/cache_manager/transfer_factory/ipc_cache_transfer.py @@ -51,7 +51,6 @@ def __init__(self, rank_id_, remote_gpu_id_, layer_num, local_gpu_id_): self.remote_key_tensor_ptr_list.append(get_data_ptr_ipc(tmp, key_unique_name)) self.remote_value_tensor_ptr_list.append(get_data_ptr_ipc(tmp, value_unique_name)) self.write_stream = paddle.device.Stream(f"gpu:{self.local_gpu_id}") - self.finish_event = paddle.device.Event() class IPCCommManager: diff --git a/fastdeploy/config.py b/fastdeploy/config.py index f7b3fe0b9e6..5ec3df934ac 100644 --- a/fastdeploy/config.py +++ b/fastdeploy/config.py @@ -20,7 +20,7 @@ import os from dataclasses import field from enum import Enum -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Dict, Literal, Optional, Union import paddle import paddle.distributed as dist @@ -1453,7 +1453,6 @@ def __init__( use_warmup: bool = False, limit_mm_per_prompt: Optional[Dict[str, Any]] = None, mm_processor_kwargs: Optional[Dict[str, Any]] = None, - innode_prefill_ports: Optional[List[int]] = None, max_num_partial_prefills: int = 1, max_long_partial_prefills: int = 1, long_prefill_token_threshold: int = 0, @@ -1517,13 +1516,10 @@ def __init__( self.limit_mm_per_prompt = limit_mm_per_prompt self.mm_processor_kwargs = mm_processor_kwargs self.use_warmup = use_warmup - self.innode_prefill_ports = innode_prefill_ports self.max_num_partial_prefills = max_num_partial_prefills self.max_long_partial_prefills = max_long_partial_prefills self.long_prefill_token_threshold = long_prefill_token_threshold - self._str_to_list("innode_prefill_ports", int) - if envs.FD_FOR_TORCH_MODEL_FORMAT: self.model_config.model_format = "torch" @@ -1773,23 +1769,15 @@ def init_cache_info(self): """ initialize cache info """ - # TODO: group the splitiwse params, remove code of v0 - # v0 requires prefill and decode in one node and it uses local scheduler - # v1 supports prefill and decode in multi node and it uses splitwise or dp scheduler - # v2 supports prefill and decode in multi node and it uses router and local scheduler + # TODO: group the splitiwse params + # There are two methods for splitwise deployment: + # 1. v0 splitwise_scheduler or dp_scheduler + # 2. v1 local_scheduler + router self.splitwise_version = None - if self.scheduler_config.name == "local" and (self.router_config is None or self.router_config.router is None): + if self.scheduler_config.name in ("splitwise", "dp"): self.splitwise_version = "v0" - elif self.scheduler_config.name in ("splitwise", "dp"): - self.splitwise_version = "v1" elif self.scheduler_config.name == "local" and self.router_config and self.router_config.router: - self.splitwise_version = "v2" - else: - raise ValueError( - f"Unsupported scheduler mode, scheduler_name: {self.scheduler_config.name}, " - f"router_config: {self.router_config}" - ) - logger.info(f"splitwise_version: {self.splitwise_version}") + self.splitwise_version = "v1" if isinstance(self.parallel_config.engine_worker_queue_port, (int, str)): engine_worker_queue_port = self.parallel_config.engine_worker_queue_port diff --git a/fastdeploy/demo/offline_disaggregated_demo.py b/fastdeploy/demo/offline_disaggregated_demo.py deleted file mode 100644 index 26e34794168..00000000000 --- a/fastdeploy/demo/offline_disaggregated_demo.py +++ /dev/null @@ -1,64 +0,0 @@ -""" -# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License" -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" - -import multiprocessing -import os -import time - -from fastdeploy.entrypoints.llm import LLM - -model_name_or_path = "baidu/ERNIE-4.5-0.3B-Paddle" - - -def start_decode(model_name_or_path): - os.environ["CUDA_VISIBLE_DEVICES"] = "1" - os.environ["FD_LOG_DIR"] = "log_decode" - llm_decode = LLM( - model=model_name_or_path, - tensor_parallel_size=1, - splitwise_role="decode", - engine_worker_queue_port=6678, - innode_prefill_ports=[6677], - cache_queue_port=55668, - ) - return llm_decode - - -def start_prefill(model_name_or_path): - os.environ["CUDA_VISIBLE_DEVICES"] = "0" - os.environ["FD_LOG_DIR"] = "log_prefill" - LLM( - model=model_name_or_path, - tensor_parallel_size=1, - splitwise_role="prefill", - engine_worker_queue_port=6677, - cache_queue_port=55667, - ) - - -def main(): - prefill = multiprocessing.Process(target=start_prefill, args=(model_name_or_path,)).start() - time.sleep(10) - llm_decode = start_decode(model_name_or_path) - - output = llm_decode.generate(prompts=["who are you?", "what can you do?"], use_tqdm=True) - print(output) - - prefill.join() - - -if __name__ == "__main__": - main() diff --git a/fastdeploy/engine/args_utils.py b/fastdeploy/engine/args_utils.py index 82c4af9d77b..b5b10ca6ca3 100644 --- a/fastdeploy/engine/args_utils.py +++ b/fastdeploy/engine/args_utils.py @@ -296,11 +296,6 @@ class EngineArgs: Port for splitwise communication. """ - innode_prefill_ports: Optional[List[int]] = None - """ - Ports for innode dispatch request. - """ - rdma_comm_ports: Optional[List[int]] = None """ Ports for rdma communication. @@ -500,8 +495,33 @@ def __post_init__(self): if self.max_logprobs == -1 and not envs.ENABLE_V1_KVCACHE_SCHEDULER: raise NotImplementedError("Only ENABLE_V1_KVCACHE_SCHEDULER=1 support max_logprobs=-1") - if self.splitwise_role != "mixed" and self.cache_transfer_protocol != "rdma": - envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 + if self.splitwise_role != "mixed": + if self.scheduler_name == "local" and self.router is None: + raise ValueError( + f"When using {self.splitwise_role} role and the {self.scheduler_name} " + f"scheduler, please provide --router argument." + ) + + if "rdma" in self.cache_transfer_protocol: + if self.rdma_comm_ports is None: + raise ValueError( + "Please set --rdma_comm_ports argument when using " "rdma cache transfer protocol." + ) + if len(self.rdma_comm_ports) != self.tensor_parallel_size: + raise ValueError("The number of rdma comm ports must be equal to tensor parallel size.") + + if envs.ENABLE_V1_KVCACHE_SCHEDULER == 1: + if "ipc" in self.cache_transfer_protocol: + # FIXME: support ipc cache transfer protocol + raise NotImplementedError( + "only support rdma cache transfer protocol " "when using ENABLE_V1_KVCACHE_SCHEDULER." + ) + # FIXME: fix this bug + if self.splitwise_role == "prefill" and self.num_gpu_blocks_override is None: + raise NotImplementedError( + "please set num_gpu_blocks_override for prefill " "instance using ENABLE_V1_KVCACHE_SCHEDULER." + ) + if not current_platform.is_cuda() and not current_platform.is_xpu(): envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 if self.guided_decoding_backend != "off": @@ -931,13 +951,6 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: 'mixed'. (prefill, decode, mixed)", ) - splitwise_group.add_argument( - "--innode-prefill-ports", - type=lambda s: s.split(",") if s else None, - default=EngineArgs.innode_prefill_ports, - help="port for innode prefill, only used in single machine splitwise deployment", - ) - splitwise_group.add_argument( "--cache-transfer-protocol", type=str, @@ -1233,7 +1246,6 @@ def create_engine_config(self, port_availability_check=True) -> FDConfig: limit_mm_per_prompt=self.limit_mm_per_prompt, mm_processor_kwargs=self.mm_processor_kwargs, tool_parser=self.tool_call_parser, - innode_prefill_ports=self.innode_prefill_ports, max_num_partial_prefills=self.max_num_partial_prefills, max_long_partial_prefills=self.max_long_partial_prefills, long_prefill_token_threshold=self.long_prefill_token_threshold, diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index 240e1620d06..a2306c66534 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -899,8 +899,6 @@ def check_health(self, time_interval_threashold=30): def launch_components(self): if self.cfg.scheduler_config.splitwise_role != "mixed": - # 单机逻辑 - self.engine_service.engine_worker_queue.available_prefill_instances.put(1) self.engine_service.split_mode_get_tasks() if self.cfg.scheduler_config.name == "splitwise": self.splitwise_receive_thread = threading.Thread( diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index b88a4d0f054..583005a5637 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -160,8 +160,8 @@ def start(self): self.insert_task_to_worker_thread.start() self.token_processor.tasks_queue = self.engine_worker_queue self.token_processor.run() - if self.cfg.scheduler_config.splitwise_role != "mixed": - self._process_splitwise_task() + if self.cfg.scheduler_config.splitwise_role == "decode": + self._decode_process_splitwise_requests() self._register_to_router() @@ -329,54 +329,13 @@ def start_worker_queue_service(self, start_queue): local_data_parallel_id=self.cfg.parallel_config.local_data_parallel_id, ) - def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current_id=-1, allocated=False): + def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current_id=-1): """ Insert tasks to engine. """ for task in tasks: start_span_request("DEQUEUE", task, trace.SpanKind.CONSUMER) - # TODO 返回至 scheduler - if allocated: - current_tasks = [] - for task in tasks: - cur_task_idx = self.resource_manager.req_dict[task.request_id] - del self.resource_manager.req_dict[task.request_id] - cur_task = self.resource_manager.tasks_list[cur_task_idx] - if envs.FD_ENABLE_INTERNAL_ADAPTER: - if not task.outputs.token_ids: # first token is eos in Prefill, just recycle resource and continue - self.resource_manager.stop_flags[cur_task_idx] = True - self.resource_manager.tasks_list[cur_task_idx] = None - self.resource_manager._recycle_block_tables(cur_task) - if task.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.llm_logger.warning(f"{task.request_id} need not decode after first token") - continue - cur_task.prompt_token_ids[0] = task.outputs.token_ids[0] - cur_task.num_cached_tokens = task.num_cached_tokens - if ( - self.cfg.speculative_config.method in ["mtp"] - and self.cfg.scheduler_config.splitwise_role == "decode" - ): - cur_task.draft_token_ids = copy.deepcopy(task.outputs.draft_token_ids) - if task.error_code != 200: - self.resource_manager.stop_flags[cur_task_idx] = True - self.resource_manager.tasks_list[cur_task_idx] = None - self.resource_manager._recycle_block_tables(cur_task) - if task.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.scheduler.put_results([task]) - self.llm_logger.warning( - f"{task.request_id} prefill failed with msg:{task.error_msg}, recycle resource." - ) - continue - self.token_processor.tokens_counter[task.request_id] = 1 - current_tasks.append(cur_task) - if current_tasks: - self.engine_worker_queue.put_tasks((current_tasks, self.resource_manager.real_bsz)) - self.llm_logger.debug(f"put task to engine worker queue, task:{current_tasks}") - return True - self.resource_manager.check_and_free_block_tables() if not isinstance(tasks, list): @@ -445,8 +404,53 @@ def insert_tasks(self, tasks: Union[List[Request], List[RequestOutput]], current else: self.update_mm_requests_chunk_size(tasks) self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz)) - if is_prefill and self.cfg.scheduler_config.name != "splitwise": - self.engine_worker_queue.available_prefill_instances.put(1) + return True + + def _insert_prefilled_requests(self, request_outputs: List[RequestOutput]): + """ + insert prefilled requests into engine worker queue. + Args: + request_outputs: a list of RequestOutput sent by prefill instance + """ + to_infer_reqs = [] + for req_out in request_outputs: + solt_idx = self.resource_manager.req_dict[req_out.request_id] + del self.resource_manager.req_dict[req_out.request_id] + cur_req = self.resource_manager.tasks_list[solt_idx] + + if envs.FD_ENABLE_INTERNAL_ADAPTER: + if not req_out.outputs.token_ids: # first token is eos in Prefill, just recycle resource and continue + self.resource_manager.stop_flags[solt_idx] = True + self.resource_manager.tasks_list[solt_idx] = None + self.resource_manager._recycle_block_tables(cur_req) + if req_out.request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[req_out.request_id] + self.llm_logger.warning(f"{req_out.request_id} need not decode after first token") + continue + + cur_req.prompt_token_ids[0] = req_out.outputs.token_ids[0] + cur_req.num_cached_tokens = req_out.num_cached_tokens + if self.cfg.speculative_config.method in ["mtp"] and self.cfg.scheduler_config.splitwise_role == "decode": + cur_req.draft_token_ids = copy.deepcopy(req_out.outputs.draft_token_ids) + + if req_out.error_code != 200: + self.resource_manager.stop_flags[solt_idx] = True + self.resource_manager.tasks_list[solt_idx] = None + self.resource_manager._recycle_block_tables(cur_req) + if req_out.request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[req_out.request_id] + self.scheduler.put_results([req_out]) + self.llm_logger.warning( + f"{req_out.request_id} prefill failed with msg:{req_out.error_msg}, recycle resource." + ) + continue + + self.token_processor.tokens_counter[req_out.request_id] = 1 + to_infer_reqs.append(cur_req) + + if to_infer_reqs: + self.engine_worker_queue.put_tasks((to_infer_reqs, self.resource_manager.real_bsz)) + self.llm_logger.debug(f"put requests to engine worker queue, task:{to_infer_reqs}") return True def task_is_finished(self, index): @@ -636,8 +640,9 @@ def _schedule_request_to_worker(self): if len(tasks) == 0: time.sleep(0.001) continue - if self.cfg.splitwise_version == "v2" and self.cfg.scheduler_config.splitwise_role == "decode": - # the task in decode instance will processed in _process_splitwise_task thread + if self.cfg.scheduler_config.splitwise_role == "decode": + # Decode will instert the request sent by prefill to engine, + # so the task sent by client will be ignored continue llm_logger.debug(f"get tasks from scheduler: {tasks}") @@ -684,7 +689,14 @@ def _fetch_request(): max_num_batched_tokens=max_num_batched_tokens, batch=num_prefill_batch, ) - self.llm_logger.debug(f"get tasks from scheduler: {tasks}") + + if self.cfg.scheduler_config.splitwise_role == "decode": + # Decode will instert the request sent by prefill to engine, + # so the task sent by client will be ignored + is_fetching = False + return + + self.llm_logger.debug(f"get tasks from {type(self.scheduler)}: {tasks}") if self.cfg.scheduler_config.splitwise_role != "mixed": need_delete_tasks = [] if envs.FD_OFFLINE_PERF_TEST_FOR_PD: @@ -705,6 +717,7 @@ def _fetch_request(): for task in tasks: # assure can allocate block ids in P while not self.resource_manager.preallocate_resource_in_p(task): + self.llm_logger.info("wait for preallocate_resource_in_p") time.sleep(0.005) self.llm_logger.info(f"ask D resource for req_id: {task.request_id}") self.split_connector.send_splitwise_tasks([task], task.idx) @@ -864,7 +877,7 @@ def _insert_zmq_task_to_scheduler(self): request.llm_engine_recv_req_timestamp = time.time() start_span("ENQUEUE_ZMQ", data, trace.SpanKind.PRODUCER) main_process_metrics.requests_number.inc() - self.llm_logger.debug(f"Receive request: {request}") + self.llm_logger.debug(f"Receive request from api server: {request}") except Exception as e: self.llm_logger.error(f"Receive request error: {e}, {traceback.format_exc()!s}") err_msg = str(e) @@ -997,156 +1010,126 @@ def _zmq_send_generated_tokens(self): except Exception as e: llm_logger.error(f"Unexcepted error happend: {e}, {traceback.format_exc()!s}") - def _process_splitwise_task(self): + def _decode_process_splitwise_requests(self): """ - Processing tasks from engine worker queue in splitwise deployment. - For v0 version, prefill instance gets tasks from engine worker queue. - For v1 and v2 version, decode instance gets raw tasks from engine worker queue to preallocate resources, - and decode instance gets prefilled tasks from engine worker queue to generate tokens. - TODO: unifiy the communication between decode and prefill instances. + Decode processes requests from engine worker queue, which are sent by prefill. + TODO: merge this function to the schedule function in resource manager """ + allocate_resource_requests: list[Request] = [] + prefilled_request_ouputs: list[RequestOutput] = [] - def receiver_loop(): - waiting_resource_requests = [] - waiting_ready_tasks = [] + def _fetch_requests(): + if self.engine_worker_queue.disaggregate_queue_empty(): + return - # Waiting for the api_server and scheduler in decode to - # receive the request sent by the client - def _decode_process_prefilled_task_v0_scheduler(input_tasks): - ready_tasks = [] - waiting_tasks = [] - for task in input_tasks: - if not hasattr(self.scheduler, "has_request") or self.scheduler.has_request(task.request_id): - ready_tasks.append(task) + items = self.engine_worker_queue.get_disaggregated_tasks() + for item in items: + tasks = item[1] + if isinstance(tasks[0], Request): + self.llm_logger.debug(f"receive tasks to preallocate resource, {tasks}") + allocate_resource_requests.extend(tasks) + elif isinstance(tasks[0], RequestOutput): + self.llm_logger.debug(f"receive prefilled tasks, {tasks}") + if not isinstance(tasks, list): + tasks = [tasks] + for task in tasks: + task.finished = False + prefilled_request_ouputs.extend(tasks) + + def _process_allocate_resource_requests(): + processed_indices = [] + for idx, task in enumerate(allocate_resource_requests): + is_success = False + + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + if self.resource_manager.preallocate_resource_in_d(task): + self.llm_logger.info(f"Resource available, processing task {task.request_id}") + self.split_connector.send_cache_infos([task], -1) + processed_indices.append(idx) + is_success = True + else: + if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len): + self.llm_logger.info(f"Resource available, processing task {task.request_id}") + self.insert_tasks([task]) + processed_indices.append(idx) + is_success = True + + if not is_success: + if not self.enable_decode_cache_task: + task.error_msg = "Not enough resources" + self.split_connector.send_cache_infos([task], -1) + processed_indices.append(idx) else: - waiting_tasks.append(task) - self.insert_tasks(ready_tasks, allocated=True) - if self.cfg.splitwise_version in ("v0", "v2"): - self.scheduler.put_results(ready_tasks) - return waiting_tasks + self.llm_logger.debug(f"Still waiting for resources {task.request_id}") + break - while self.running: - try: - processed_indices = [] - for idx, task in enumerate(waiting_resource_requests): - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - if self.resource_manager.preallocate_resource_in_d(task): - self.llm_logger.info(f"Resource available, processing task {task.request_id}") - self.split_connector.send_cache_infos([task], -1) - processed_indices.append(idx) - else: - self.llm_logger.debug(f"Still waiting for resources {task.request_id}") - break - else: - if self.resource_manager.is_resource_sufficient(task.prompt_token_ids_len): - self.insert_tasks([task]) - self.llm_logger.info(f"Resource available, processing task {task.request_id}") - processed_indices.append(idx) - else: - self.llm_logger.debug(f"Still waiting for resources {task.request_id}") - break + for idx in sorted(processed_indices, reverse=True): + allocate_resource_requests.pop(idx) - for idx in sorted(processed_indices, reverse=True): - waiting_resource_requests.pop(idx) + def _process_prefilled_requests(): + nonlocal prefilled_request_ouputs + ready_request_outputs = [] + waiting_request_outputs = [] + # Waiting for the api_server and scheduler in decode to + # receive the request sent by the client + for task in prefilled_request_ouputs: + if not hasattr(self.scheduler, "has_request") or self.scheduler.has_request(task.request_id): + ready_request_outputs.append(task) + else: + waiting_request_outputs.append(task) - waiting_ready_tasks = _decode_process_prefilled_task_v0_scheduler(waiting_ready_tasks) + prefilled_request_ouputs = waiting_request_outputs + if self.cfg.splitwise_version == "v1": + # decode return first token to client + self.scheduler.put_results(ready_request_outputs) - if self.engine_worker_queue.disaggregate_queue_empty(): - time.sleep(0.001) - else: - items = self.engine_worker_queue.get_disaggregated_tasks() - for item in items: - role = item[0] - tasks = item[1] - - # prefill instance gets tasks from engine worker queue - if role == "prefill": - for task in tasks: - task.max_tokens = task.min_tokens = 2 - self.insert_tasks(tasks) - # decode instance gets tasks from engine worker queue - elif role == "decode": - if isinstance(tasks[0], RequestOutput): - self.llm_logger.debug(f"receive prefilled tasks, {tasks}") - if not isinstance(tasks, list): - tasks = [tasks] - for task in tasks: - task.finished = False - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - for task in tasks: - if envs.FD_ENABLE_INTERNAL_ADAPTER: - if ( - not task.outputs.token_ids - ): # first token is eos in Prefill, just recycle resource and continue - cur_task = self.resource_manager.requests[task.request_id] - self.resource_manager.stop_flags[cur_task.idx] = True - self.resource_manager.tasks_list[cur_task.idx] = None - self.resource_manager._free_blocks(cur_task) - if cur_task.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.llm_logger.warning( - f"{task.request_id} need not decode after first token" - ) - del self.resource_manager.requests[task.request_id] - del self.resource_manager.req_dict[task.request_id] - continue - if task.error_code != 200: - cur_task = self.resource_manager.requests[task.request_id] - self.resource_manager.stop_flags[cur_task.idx] = True - self.resource_manager.tasks_list[cur_task.idx] = None - self.resource_manager._free_blocks(cur_task) - if cur_task.request_id in self.token_processor.tokens_counter: - del self.token_processor.tokens_counter[task.request_id] - self.scheduler.put_results([task]) - self.llm_logger.warning( - f"{task.request_id} prefill failed with msg:{task.error_msg}, recycle resource." - ) - continue - self.token_processor.tokens_counter[task.request_id] = 1 - self.resource_manager.insert_task_for_decoding(task) - - else: - waiting_ready_tasks.extend(_decode_process_prefilled_task_v0_scheduler(tasks)) - elif isinstance(tasks[0], Request): - self.llm_logger.debug(f"receive tasks to preallocate resource, {tasks}") - if len(waiting_resource_requests): - self.llm_logger.info(f"Waiting for resource for task {tasks[0].request_id}") - waiting_resource_requests.extend(tasks) - else: - new_waiting = [] - for task in tasks: - can_allocate_resource = False - if envs.ENABLE_V1_KVCACHE_SCHEDULER: - if self.resource_manager.preallocate_resource_in_d(task): - self.split_connector.send_cache_infos([task], -1) - can_allocate_resource = True - else: - if self.resource_manager.is_resource_sufficient( - task.prompt_token_ids_len - ): - self.insert_tasks([task]) - can_allocate_resource = True - if can_allocate_resource is False: - if not self.enable_decode_cache_task: - task.error_msg = "Not enough resources" - new_waiting.append(task) - - if new_waiting: - if not self.enable_decode_cache_task: - self.split_connector.send_cache_infos(new_waiting, -1) - else: - waiting_resource_requests.extend(new_waiting) - self.llm_logger.info( - f"Added {len(new_waiting)} tasks to waiting queue" - ) - else: - raise ValueError(f"Unsupported task type: {type(tasks[0])}") + if not envs.ENABLE_V1_KVCACHE_SCHEDULER: + self._insert_prefilled_requests(ready_request_outputs) + else: + for task in ready_request_outputs: + if envs.FD_ENABLE_INTERNAL_ADAPTER: + if ( + not task.outputs.token_ids + ): # first token is eos in Prefill, just recycle resource and continue + cur_req = self.resource_manager.requests[task.request_id] + self.resource_manager.stop_flags[cur_req.idx] = True + self.resource_manager.tasks_list[cur_req.idx] = None + self.resource_manager._free_blocks(cur_req) + if cur_req.request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[task.request_id] + self.llm_logger.warning(f"{task.request_id} need not decode after first token") + del self.resource_manager.requests[task.request_id] + del self.resource_manager.req_dict[task.request_id] + continue + if task.error_code != 200: + cur_req = self.resource_manager.requests[task.request_id] + self.resource_manager.stop_flags[cur_req.idx] = True + self.resource_manager.tasks_list[cur_req.idx] = None + self.resource_manager._free_blocks(cur_req) + if cur_req.request_id in self.token_processor.tokens_counter: + del self.token_processor.tokens_counter[task.request_id] + self.scheduler.put_results([task]) + self.llm_logger.warning( + f"{task.request_id} prefill failed with msg:{task.error_msg}, recycle resource." + ) + continue + self.token_processor.tokens_counter[task.request_id] = 1 + self.resource_manager.insert_task_for_decoding(task) + def decode_loop(): + while self.running: + try: + _fetch_requests() + _process_allocate_resource_requests() + _process_prefilled_requests() + time.sleep(0.001) except Exception as e: - self.llm_logger.error(f"Error in main loop: {e}") - time.sleep(0.1) + self.llm_logger.error( + f"Error in main loop of decode_process_splitwise_requests: " f"{e}, {traceback.format_exc()}" + ) + time.sleep(0.01) - threading.Thread(target=receiver_loop, daemon=True).start() + threading.Thread(target=decode_loop, daemon=True).start() def start_cache_service(self, device_ids, ipc_signal_suffix): return self.resource_manager.cache_manager.launch_cache_manager( diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index c7d40c557e5..dcf3f8a596a 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -690,8 +690,6 @@ def check_health(self, time_interval_threashold=30): def launch_components(self): if self.cfg.scheduler_config.splitwise_role != "mixed": - # 单机逻辑 - self.engine.engine_worker_queue.available_prefill_instances.put(1) self.splitwise_receive_thread = threading.Thread( target=self.engine.split_connector.start_receiver, args=() ) diff --git a/fastdeploy/engine/request.py b/fastdeploy/engine/request.py index 776611560ab..70d82d2e32c 100644 --- a/fastdeploy/engine/request.py +++ b/fastdeploy/engine/request.py @@ -75,6 +75,8 @@ def __init__( pooling_params: Optional[PoolingParams] = None, preprocess_start_time: Optional[float] = None, preprocess_end_time: Optional[float] = None, + inference_start_time: float = 0, + llm_engine_recv_req_timestamp: float = 0, multimodal_inputs: Optional[dict] = None, multimodal_data: Optional[dict] = None, disable_chat_template: bool = False, @@ -118,6 +120,10 @@ def __init__( self.arrival_time = arrival_time self.preprocess_start_time = preprocess_start_time self.preprocess_end_time = preprocess_end_time + self.inference_start_time = inference_start_time + self.llm_engine_recv_req_timestamp = ( + llm_engine_recv_req_timestamp if llm_engine_recv_req_timestamp else time.time() + ) self.disable_chat_template = disable_chat_template self.disaggregate_info = disaggregate_info @@ -166,7 +172,6 @@ def __init__( self.extend_block_tables = [] # dp self.dp_rank = dp_rank - self.llm_engine_recv_req_timestamp = time.time() @classmethod def from_dict(cls, d: dict): @@ -217,6 +222,8 @@ def from_dict(cls, d: dict): video_end=d.get("video_end", 0), audio_end=d.get("audio_end", 0), dp_rank=d.get("dp_rank", None), + inference_start_time=d.get("inference_start_time"), + llm_engine_recv_req_timestamp=d.get("llm_engine_recv_req_timestamp"), ) @property diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index ffa9155bab7..f91638bb58b 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -287,12 +287,6 @@ class QueueManager(BaseManager): callable=lambda idx: self.disaggregate_requests[idx], ) - self.available_prefill_instances = Queue() - QueueManager.register( - "get_available_prefill_instances", - callable=lambda: self.available_prefill_instances, - ) - QueueManager.register( "get_finish_request_barrier", callable=lambda idx: self.finish_request_barrier[idx], @@ -351,7 +345,6 @@ class QueueManager(BaseManager): QueueManager.register("get_client_read_info_flag") QueueManager.register("get_lock_info") QueueManager.register("get_disaggregate_requests") - QueueManager.register("get_available_prefill_instances") QueueManager.register("get_finish_request_barrier") QueueManager.register("get_finish_add_cache_task_barrier") QueueManager.register("get_connect_task_barrier") @@ -390,7 +383,6 @@ class QueueManager(BaseManager): # p/d 分离获取 self.disaggregate_requests = self.manager.get_disaggregate_requests(self.local_data_parallel_id) - self.available_prefill_instances = self.manager.get_available_prefill_instances() self.finish_request_barrier = self.manager.get_finish_request_barrier(self.local_data_parallel_id) self.finish_add_cache_task_barrier = self.manager.get_finish_add_cache_task_barrier( self.local_data_parallel_id @@ -652,15 +644,6 @@ def get_connect_rdma_task_response(self): self.connect_task_response_lock.release() return task_response - def get_prefill_instances(self): - """ - check if the prefill queue is empty - """ - if self.available_prefill_instances.qsize() == 0: - return 0 - else: - return self.available_prefill_instances.get() - def put_cache_info(self, cache_info) -> None: """ Args: diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index b2e89e276e1..1d933461f61 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -498,6 +498,9 @@ def _recycle_resources(self, task_id, index, task, result=None, is_prefill=False self.split_connector.send_first_token(task.disaggregate_info, [result]) break else: + # TODO: Refine checking sending cache and do not keep waiting + if time.time() - start_time > 30: + llm_logger.warning(f"wait for sending cache, {task_id}") time.sleep(0.002) else: if envs.ENABLE_V1_KVCACHE_SCHEDULER: @@ -753,10 +756,8 @@ def _process_batch_output(self): self._recycle_resources(task_id, i, task, result, is_prefill) break - if not (is_prefill and self.cfg.splitwise_version == "v0"): - # NOTE: prefill instance in v0 version does not return result to scheduler - llm_logger.debug(f"get response from infer: {result}") - batch_result.append(result) + llm_logger.debug(f"get response from infer: {result}") + batch_result.append(result) self.postprocess(batch_result, mtype) diff --git a/fastdeploy/splitwise/splitwise_connector.py b/fastdeploy/splitwise/splitwise_connector.py index e5ad4ad8adc..8daab42ddf1 100644 --- a/fastdeploy/splitwise/splitwise_connector.py +++ b/fastdeploy/splitwise/splitwise_connector.py @@ -175,72 +175,6 @@ def _close_connection(self, addr): self.push_sockets[addr].close() del self.push_sockets[addr] - def has_splitwise_tasks(self): - """ - PD mode: check prefill empty - """ - if self.cfg.innode_prefill_ports is None: - return True - else: - for port in self.cfg.innode_prefill_ports: - if port not in self.connect_innode_instances: - self.create_connection(port) - if self.connect_innode_instances[port].available_prefill_instances.qsize() > 0: - return False - return True - - def dispatch_innode_splitwise_tasks(self, tasks, current_id): - """ - Dispatch splitwise tasks . - - Parameters: - tasks (list): List of tasks. - """ - tasks_status = "mixed" - is_changable = envs.FD_PD_CHANGEABLE == "1" - while True: - for port in self.cfg.innode_prefill_ports: - current_port = -1 - if port not in self.connect_innode_instances: - self.create_connection(port) - if self.connect_innode_instances[port].get_prefill_instances() == 1: - for task in tasks: - task.disaggregate_info = { - "role": "prefill", - "transfer_protocol": "ipc", - "cache_info": { - "ipc": { - "ip": "0.0.0.0", - "port": self.cfg.parallel_config.engine_worker_queue_port[self.idx], - "current_id": current_id, - }, - }, - } - self.connect_innode_instances[port].put_disaggregated_tasks(("prefill", tasks)) - current_port = port - - if current_port != -1: - tasks_status = "decode" - break - if current_port != -1 or is_changable: - break - else: - time.sleep(0.005) - - if tasks_status == "decode": - for task in tasks: - task.disaggregate_info = { - "role": tasks_status, - "transfer_protocol": "ipc", - "cache_info": { - "ipc": { - "ip": "0.0.0.0", - "port": current_port, - "current_id": current_id, - }, - }, - } - def send_splitwise_tasks(self, tasks: List[Request], current_id): """ Send splitwise tasks to all connected addresses. @@ -249,10 +183,6 @@ def send_splitwise_tasks(self, tasks: List[Request], current_id): tasks (list): List of tasks. current_id (int): Current ID. """ - - if self.cfg.innode_prefill_ports is not None: - self.dispatch_innode_splitwise_tasks(tasks, current_id) - return addr = None decode_diagg = None for task in tasks: diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index df6604f8d99..14a45c437b7 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -927,8 +927,6 @@ def initialize_fd_config(args, ranks: int = 1, local_rank: int = 0) -> FDConfig: logger.info(f"- Dynamic load weight: {load_config.dynamic_load_weight}") logger.info(f"- Load strategy: {load_config.load_strategy}") - if args.splitwise_role != "mixed" and args.cache_transfer_protocol != "rdma": - envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 if not current_platform.is_cuda() and not current_platform.is_xpu(): logger.info("Set ENABLE_V1_KVCACHE_SCHEDULER to 0 due to not supported.") envs.ENABLE_V1_KVCACHE_SCHEDULER = 0 diff --git a/tests/e2e/test_ernie_03b_pd_multi_node.py b/tests/e2e/test_ernie_03b_pd_router_v0.py similarity index 74% rename from tests/e2e/test_ernie_03b_pd_multi_node.py rename to tests/e2e/test_ernie_03b_pd_router_v0.py index b1cc1fd2ac4..c8da6adbb67 100644 --- a/tests/e2e/test_ernie_03b_pd_multi_node.py +++ b/tests/e2e/test_ernie_03b_pd_router_v0.py @@ -12,23 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Test splitwise deployment which uses local_scheduler + router, +# and ENABLE_V1_KVCACHE_SCHEDULER is 0 + import json import os import shutil import signal -import socket import subprocess import sys import time import pytest import requests +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + get_registered_number, +) # Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433)) FD_ROUTER_PORT = int(os.getenv("FD_ROUTER_PORT", 8533)) @@ -48,133 +54,6 @@ ] -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def check_service_health(base_url: str, timeout: int = 3) -> bool: - """ - Check the health status of a service. - - Args: - base_url (str): The base URL of the service, e.g. "http://127.0.0.1:8080" - timeout (int): Request timeout in seconds. - - Returns: - bool: True if the service is healthy, False otherwise. - """ - if not base_url.startswith("http"): - base_url = f"http://{base_url}" - url = f"{base_url.rstrip('/')}/health" - try: - resp = requests.get(url, timeout=timeout) - if resp.status_code == 200: - return True - else: - return False - except Exception: - return False - - -def get_registered_number(router_url) -> list: - """ - Get the number of registered models in the router. - - Args: - router_url (str): The base URL of the router, e.g. "http://localhost:8080". - - Returns: - int: The number of registered models. - """ - if not router_url.startswith("http"): - router_url = f"http://{router_url}" - - try: - response = requests.get(f"{router_url}/registered_number", timeout=60) - registered_numbers = response.json() - return registered_numbers - except Exception: - return {"mixed": 0, "prefill": 0, "decode": 0} - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses multiple methods to ensure thorough cleanup. - """ - current_pid = os.getpid() - parent_pid = os.getppid() - - # Method 1: Use lsof to find processes - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - pid = int(pid) - if pid in (current_pid, parent_pid): - print(f"Skip killing current process (pid={pid}) on port {port}") - continue - try: - # First try SIGTERM for graceful shutdown - os.kill(pid, signal.SIGTERM) - time.sleep(1) - # Then SIGKILL if still running - os.kill(pid, signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except ProcessLookupError: - pass # Process already terminated - except subprocess.CalledProcessError: - pass - - # Method 2: Use netstat and fuser as backup - try: - # Find processes using netstat and awk - cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" - output = subprocess.check_output(cmd, shell=True).decode().strip() - for pid in output.splitlines(): - if pid and pid.isdigit(): - pid = int(pid) - if pid in (current_pid, parent_pid): - continue - try: - os.kill(pid, signal.SIGKILL) - print(f"Killed process (netstat) on port {port}, pid={pid}") - except ProcessLookupError: - pass - except (subprocess.CalledProcessError, FileNotFoundError): - pass - - # Method 3: Use fuser if available - try: - subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) - except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - print(f"Cleaning ports: {PORTS_TO_CLEAN}") - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - - # Double check and retry if ports are still in use - time.sleep(2) - for port in PORTS_TO_CLEAN: - if is_port_open("127.0.0.1", port, timeout=0.1): - print(f"Port {port} still in use, retrying cleanup...") - kill_process_on_port(port) - time.sleep(1) - - @pytest.fixture(scope="session", autouse=True) def setup_and_run_server(): """ @@ -185,7 +64,7 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports() + clean_ports(PORTS_TO_CLEAN) print("log dir clean ") if os.path.exists("log_router") and os.path.isdir("log_router"): @@ -232,7 +111,6 @@ def setup_and_run_server(): env_prefill["CUDA_VISIBLE_DEVICES"] = "0" env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_prefill["FD_LOG_DIR"] = "log_prefill" - env_prefill["INFERENCE_MSG_QUEUE_ID"] = str(FD_API_PORT) prefill_log_path = "server.log" prefill_cmd = [ sys.executable, @@ -282,7 +160,6 @@ def setup_and_run_server(): env_decode = os.environ.copy() env_decode["CUDA_VISIBLE_DEVICES"] = "1" env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" - env_decode["INFERENCE_MSG_QUEUE_ID"] = str(FD_API_PORT + 1) env_decode["FD_LOG_DIR"] = "log_decode" decode_log_path = "decode_server.log" decode_cmd = [ @@ -351,7 +228,7 @@ def setup_and_run_server(): os.killpg(process_router.pid, signal.SIGTERM) os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports() + clean_ports(PORTS_TO_CLEAN) print(f"Prefill server (pid={process_prefill.pid}) terminated") print(f"Decode server (pid={process_decode.pid}) terminated") except Exception as e: diff --git a/tests/e2e/test_ernie_03b_pd.py b/tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py similarity index 71% rename from tests/e2e/test_ernie_03b_pd.py rename to tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py index 7d31a574a9e..b8af9c011cc 100644 --- a/tests/e2e/test_ernie_03b_pd.py +++ b/tests/e2e/test_ernie_03b_pd_splitwise_scheduler.py @@ -12,23 +12,31 @@ # See the License for the specific language governing permissions and # limitations under the License. +# Test splitwise deployment which uses splitwise_scheduler, +# and ENABLE_V1_KVCACHE_SCHEDULER is 0 + import json import os import shutil import signal -import socket import subprocess import sys import time import pytest import requests +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean_ports, + is_port_open, +) # Read ports from environment variables; use default values if not set -FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) -FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) -FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233)) -FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333)) +FD_CONNECTOR_PORT = int(os.getenv("FD_CONNECTOR_PORT", 8433)) +FD_REDIS_PORT = int(os.getenv("FD_REDIS_PORT", 8533)) # List of ports to clean before and after tests PORTS_TO_CLEAN = [ @@ -36,95 +44,16 @@ FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT, + FD_CONNECTOR_PORT, FD_API_PORT + 1, FD_ENGINE_QUEUE_PORT + 1, FD_METRICS_PORT + 1, FD_CACHE_QUEUE_PORT + 1, + FD_CONNECTOR_PORT + 1, + FD_REDIS_PORT, ] -def is_port_open(host: str, port: int, timeout=1.0): - """ - Check if a TCP port is open on the given host. - Returns True if connection succeeds, False otherwise. - """ - try: - with socket.create_connection((host, port), timeout): - return True - except Exception: - return False - - -def kill_process_on_port(port: int): - """ - Kill processes that are listening on the given port. - Uses multiple methods to ensure thorough cleanup. - """ - current_pid = os.getpid() - parent_pid = os.getppid() - - # Method 1: Use lsof to find processes - try: - output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() - for pid in output.splitlines(): - pid = int(pid) - if pid in (current_pid, parent_pid): - print(f"Skip killing current process (pid={pid}) on port {port}") - continue - try: - # First try SIGTERM for graceful shutdown - os.kill(pid, signal.SIGTERM) - time.sleep(1) - # Then SIGKILL if still running - os.kill(pid, signal.SIGKILL) - print(f"Killed process on port {port}, pid={pid}") - except ProcessLookupError: - pass # Process already terminated - except subprocess.CalledProcessError: - pass - - # Method 2: Use netstat and fuser as backup - try: - # Find processes using netstat and awk - cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1" - output = subprocess.check_output(cmd, shell=True).decode().strip() - for pid in output.splitlines(): - if pid and pid.isdigit(): - pid = int(pid) - if pid in (current_pid, parent_pid): - continue - try: - os.kill(pid, signal.SIGKILL) - print(f"Killed process (netstat) on port {port}, pid={pid}") - except ProcessLookupError: - pass - except (subprocess.CalledProcessError, FileNotFoundError): - pass - - # Method 3: Use fuser if available - try: - subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5) - except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError): - pass - - -def clean_ports(): - """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. - """ - print(f"Cleaning ports: {PORTS_TO_CLEAN}") - for port in PORTS_TO_CLEAN: - kill_process_on_port(port) - - # Double check and retry if ports are still in use - time.sleep(2) - for port in PORTS_TO_CLEAN: - if is_port_open("127.0.0.1", port, timeout=0.1): - print(f"Port {port} still in use, retrying cleanup...") - kill_process_on_port(port) - time.sleep(1) - - @pytest.fixture(scope="session", autouse=True) def setup_and_run_server(): """ @@ -135,9 +64,11 @@ def setup_and_run_server(): - Tears down server after all tests finish """ print("Pre-test port cleanup...") - clean_ports() + clean_ports(PORTS_TO_CLEAN) print("log dir clean ") + if os.path.exists("log_redis") and os.path.isdir("log_redis"): + shutil.rmtree("log_redis") if os.path.exists("log_prefill") and os.path.isdir("log_prefill"): shutil.rmtree("log_prefill") if os.path.exists("log_decode") and os.path.isdir("log_decode"): @@ -150,13 +81,34 @@ def setup_and_run_server(): model_path = "baidu/ERNIE-4.5-0.3B-Paddle" print(f"model_path: {model_path}") + # redis-server + print("start redis...") + env_copy = os.environ.copy() + log_path = "router.log" + + cmd = [ + "redis-server", + "--port", + str(FD_REDIS_PORT), + "--daemonize", + "yes", + ] + + with open(log_path, "w") as logfile: + process_redis = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, # Enables killing full group via os.killpg + env=env_copy, + ) + # prefill实例 print("start prefill...") env_prefill = os.environ.copy() env_prefill["CUDA_VISIBLE_DEVICES"] = "0" env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_prefill["FD_LOG_DIR"] = "log_prefill" - env_prefill["INFERENCE_MSG_QUEUE_ID"] = str(FD_API_PORT) prefill_log_path = "server.log" prefill_cmd = [ sys.executable, @@ -182,6 +134,16 @@ def setup_and_run_server(): "wint8", "--splitwise-role", "prefill", + "--cache-transfer-protocol", + "ipc", + "--pd-comm-port", + str(FD_CONNECTOR_PORT), + "--scheduler-name", + "splitwise", + "--scheduler-host", + "127.0.0.1", + "--scheduler-port", + str(FD_REDIS_PORT), ] # Start subprocess in new process group @@ -193,14 +155,13 @@ def setup_and_run_server(): start_new_session=True, # Enables killing full group via os.killpg env=env_prefill, ) - time.sleep(3) + time.sleep(1) # decode实例 print("start decode...") env_decode = os.environ.copy() env_decode["CUDA_VISIBLE_DEVICES"] = "1" - env_prefill["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" - env_decode["INFERENCE_MSG_QUEUE_ID"] = str(FD_API_PORT + 1) + env_decode["ENABLE_V1_KVCACHE_SCHEDULER"] = "0" env_decode["FD_LOG_DIR"] = "log_decode" decode_log_path = "decode_server.log" decode_cmd = [ @@ -227,8 +188,16 @@ def setup_and_run_server(): "wint8", "--splitwise-role", "decode", - "--innode-prefill-ports", - str(FD_ENGINE_QUEUE_PORT), + "--cache-transfer-protocol", + "ipc", + "--pd-comm-port", + str(FD_CONNECTOR_PORT + 1), + "--scheduler-name", + "splitwise", + "--scheduler-host", + "127.0.0.1", + "--scheduler-port", + str(FD_REDIS_PORT), ] # Start subprocess in new process group @@ -242,19 +211,18 @@ def setup_and_run_server(): ) # Wait up to 300 seconds for API server to be ready - for _ in range(300): - if is_port_open("127.0.0.1", FD_API_PORT): - if is_port_open("127.0.0.1", FD_API_PORT + 1): - print(f"Prefill server is up on port {FD_API_PORT}") - print(f"Decode server is up on port {FD_API_PORT + 1}") - break - time.sleep(1) + for _ in range(60): + if is_port_open("127.0.0.1", FD_API_PORT) and is_port_open("127.0.0.1", FD_API_PORT + 1): + print(f"Prefill server is up on port {FD_API_PORT}") + print(f"Decode server is up on port {FD_API_PORT + 1}") + break + time.sleep(5) else: print("[TIMEOUT] API server failed to start in 5 minutes. Cleaning up...") try: os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports() + clean_ports(PORTS_TO_CLEAN) except Exception as e: print(f"Failed to kill process group: {e}") raise RuntimeError(f"API server did not start on port {FD_API_PORT}") @@ -263,9 +231,10 @@ def setup_and_run_server(): print("\n===== Post-test server cleanup... =====") try: + os.killpg(process_redis.pid, signal.SIGTERM) os.killpg(process_prefill.pid, signal.SIGTERM) os.killpg(process_decode.pid, signal.SIGTERM) - clean_ports() + clean_ports(PORTS_TO_CLEAN) print(f"Prefill server (pid={process_prefill.pid}) terminated") print(f"Decode server (pid={process_decode.pid}) terminated") except Exception as e: @@ -277,7 +246,7 @@ def api_url(request): """ Returns the API endpoint URL for chat completions. """ - return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions", f"http://0.0.0.0:{FD_API_PORT + 1}/v1/chat/completions" + return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions", f"http://0.0.0.0:{FD_API_PORT+1}/v1/chat/completions" @pytest.fixture(scope="session") @@ -364,15 +333,12 @@ def test_chat_usage_stream(api_url): "stream_options": {"include_usage": True, "continuous_usage_stats": True}, "metadata": {"min_tokens": 10}, } - _, d_url = api_url # Only the decode server receives the request - - response = send_request(url=d_url, payload=payload) + p_url, d_url = api_url + response = send_request(url=p_url, payload=payload) chunks = get_stream_chunks(response) result = "".join([x["choices"][0]["delta"]["content"] for x in chunks[:-1]]) print("Decode Response:", result) assert result != "", "结果为空" - # for idx, chunk in enumerate(chunks): - # print(f"\nchunk[{idx}]:\n{json.dumps(chunk, indent=2, ensure_ascii=False)}") usage = chunks[-1]["usage"] total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" @@ -395,9 +361,9 @@ def test_chat_usage_non_stream(api_url): "stream": False, "metadata": {"min_tokens": 10}, } - _, d_url = api_url - response = send_request(url=d_url, payload=payload).json() + p_url, d_url = api_url + response = send_request(url=p_url, payload=payload).json() usage = response["usage"] result = response["choices"][0]["message"]["content"] assert result != "", "结果为空" @@ -420,13 +386,13 @@ def test_non_chat_usage_stream(api_url): "stream_options": {"include_usage": True, "continuous_usage_stats": True}, "metadata": {"min_tokens": 10}, } - _, d_url = api_url - d_url = d_url.replace("chat/completions", "completions") + p_url, d_url = api_url + p_url = p_url.replace("chat/completions", "completions") - response = send_request(url=d_url, payload=payload) + response = send_request(url=p_url, payload=payload) chunks = get_stream_chunks(response) result = "".join([x["choices"][0]["text"] for x in chunks[:-1]]) - print("Decode Response:", result) + # print("Decode Response:", result) assert result != "", "结果为空" usage = chunks[-1]["usage"] total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] @@ -447,13 +413,13 @@ def test_non_chat_usage_non_stream(api_url): "stream": False, "metadata": {"min_tokens": 10}, } - _, d_url = api_url - d_url = d_url.replace("chat/completions", "completions") + p_url, d_url = api_url + p_url = p_url.replace("chat/completions", "completions") - response = send_request(url=d_url, payload=payload).json() + response = send_request(url=p_url, payload=payload).json() usage = response["usage"] result = response["choices"][0]["text"] - print("Decode Response:", result) + # print("Decode Response:", result) assert result != "", "结果为空" total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] assert payload["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" diff --git a/tests/e2e/utils/serving_utils.py b/tests/e2e/utils/serving_utils.py index e6dcaf8b31c..ad2538e4962 100644 --- a/tests/e2e/utils/serving_utils.py +++ b/tests/e2e/utils/serving_utils.py @@ -4,6 +4,8 @@ import subprocess import time +import requests + # Read ports from environment variables; use default values if not set FD_API_PORT = int(os.getenv("FD_API_PORT", 8188)) FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133)) @@ -79,18 +81,66 @@ def kill_process_on_port(port: int): pass -def clean_ports(): +def clean_ports(ports=None): """ - Kill all processes occupying the ports listed in PORTS_TO_CLEAN. + Kill all processes occupying the ports """ - print(f"Cleaning ports: {PORTS_TO_CLEAN}") - for port in PORTS_TO_CLEAN: + if ports is None: + ports = PORTS_TO_CLEAN + + print(f"Cleaning ports: {ports}") + for port in ports: kill_process_on_port(port) # Double check and retry if ports are still in use time.sleep(2) - for port in PORTS_TO_CLEAN: + for port in ports: if is_port_open("127.0.0.1", port, timeout=0.1): print(f"Port {port} still in use, retrying cleanup...") kill_process_on_port(port) time.sleep(1) + + +def check_service_health(base_url: str, timeout: int = 3) -> bool: + """ + Check the health status of a service. + + Args: + base_url (str): The base URL of the service, e.g. "http://127.0.0.1:8080" + timeout (int): Request timeout in seconds. + + Returns: + bool: True if the service is healthy, False otherwise. + """ + if not base_url.startswith("http"): + base_url = f"http://{base_url}" + url = f"{base_url.rstrip('/')}/health" + try: + resp = requests.get(url, timeout=timeout) + if resp.status_code == 200: + return True + else: + return False + except Exception: + return False + + +def get_registered_number(router_url) -> dict: + """ + Get the registered model counts by type from the router. + + Args: + router_url (str): The base URL of the router, e.g. "http://localhost:8080". + + Returns: + dict: A dictionary containing registered model counts with keys "mixed", "prefill", and "decode". + """ + if not router_url.startswith("http"): + router_url = f"http://{router_url}" + + try: + response = requests.get(f"{router_url}/registered_number", timeout=60) + registered_numbers = response.json() + return registered_numbers + except Exception: + return {"mixed": 0, "prefill": 0, "decode": 0}