Production-grade API deployment framework for Swarms AI workflows. Easily deploy, scale, and manage your swarm-based applications with enterprise features.
- π₯ Fast API-based deployment framework
 - π€ Support for synchronous and asynchronous swarm execution
 - π Built-in load balancing and scaling
 - π Real-time monitoring and logging
 - π‘οΈ Enterprise-grade error handling
 - π― Priority-based task execution
 - π¦ Simple deployment and configuration
 - π Extensible plugin architecture
 
pip install -U swarms-deployimport os
from dotenv import load_dotenv
from swarms import Agent, SequentialWorkflow
from swarm_models import OpenAIChat
from swarm_deploy import SwarmDeploy
load_dotenv()
# Get the OpenAI API key from the environment variable
api_key = os.getenv("GROQ_API_KEY")
# Model
model = OpenAIChat(
    openai_api_base="https://api.groq.com/openai/v1",
    openai_api_key=api_key,
    model_name="llama-3.1-70b-versatile",
    temperature=0.1,
)
# Initialize specialized agents
data_extractor_agent = Agent(
    agent_name="Data-Extractor",
    system_prompt=None,
    llm=model,
    max_loops=1,
    autosave=True,
    verbose=True,
    dynamic_temperature_enabled=True,
    saved_state_path="data_extractor_agent.json",
    user_name="pe_firm",
    retry_attempts=1,
    context_length=200000,
    output_type="string",
)
summarizer_agent = Agent(
    agent_name="Document-Summarizer",
    system_prompt=None,
    llm=model,
    max_loops=1,
    autosave=True,
    verbose=True,
    dynamic_temperature_enabled=True,
    saved_state_path="summarizer_agent.json",
    user_name="pe_firm",
    retry_attempts=1,
    context_length=200000,
    output_type="string",
)
financial_analyst_agent = Agent(
    agent_name="Financial-Analyst",
    system_prompt=None,
    llm=model,
    max_loops=1,
    autosave=True,
    verbose=True,
    dynamic_temperature_enabled=True,
    saved_state_path="financial_analyst_agent.json",
    user_name="pe_firm",
    retry_attempts=1,
    context_length=200000,
    output_type="string",
)
market_analyst_agent = Agent(
    agent_name="Market-Analyst",
    system_prompt=None,
    llm=model,
    max_loops=1,
    autosave=True,
    verbose=True,
    dynamic_temperature_enabled=True,
    saved_state_path="market_analyst_agent.json",
    user_name="pe_firm",
    retry_attempts=1,
    context_length=200000,
    output_type="string",
)
operational_analyst_agent = Agent(
    agent_name="Operational-Analyst",
    system_prompt=None,
    llm=model,
    max_loops=1,
    autosave=True,
    verbose=True,
    dynamic_temperature_enabled=True,
    saved_state_path="operational_analyst_agent.json",
    user_name="pe_firm",
    retry_attempts=1,
    context_length=200000,
    output_type="string",
)
# Initialize the SwarmRouter
router = SequentialWorkflow(
    name="pe-document-analysis-swarm",
    description="Analyze documents for private equity due diligence and investment decision-making",
    max_loops=1,
    agents=[
        data_extractor_agent,
        summarizer_agent,
        financial_analyst_agent,
        market_analyst_agent,
        operational_analyst_agent,
    ],
    output_type="all",
)
# Advanced usage with configuration
swarm = SwarmDeploy(
    router,
    max_workers=4,
    # cache_backend="redis"
)
swarm.start(
    host="0.0.0.0",
    port=8000,
    workers=4,
)swarm = SwarmDeploy(
    workflow,
    max_workers=4,
    cache_backend="redis",
    ssl_config={
        "keyfile": "path/to/key.pem",
        "certfile": "path/to/cert.pem"
    }
)class SwarmInput(BaseModel):
    task: str          # Task description
    img: Optional[str] # Optional image input
    priority: int      # Task priority (0-10)- POST 
/v1/swarms/completions/{callable_name}- Execute a task with the specified swarm
 - Returns: SwarmOutput or SwarmBatchOutput
 
 
curl -X POST "http://localhost:8000/v1/swarms/completions/document-analysis" \
     -H "Content-Type: application/json" \
     -d '{"task": "Analyze financial report", "priority": 5}'SwarmDeploy provides built-in monitoring capabilities:
- Real-time task execution stats
 - Error tracking and reporting
 - Performance metrics
 - Task history and audit logs
 
The system includes comprehensive error handling:
try:
    result = await swarm.run(task)
except Exception as e:
    error_output = SwarmOutput(
        id=str(uuid.uuid4()),
        status="error",
        execution_time=time.time() - start_time,
        result=None,
        error=str(e)
    )- Always set appropriate task priorities
 - Implement proper error handling
 - Use clustering for high-availability
 - Monitor system performance
 - Regular maintenance and updates
 
Contributions are welcome! Please read our Contributing Guidelines for details on our code of conduct and the process for submitting pull requests.
- Email: kye@swarms.world
 - Discord: Join our community
 - Documentation: https://docs.swarms.world
 
MIT License - see the LICENSE file for details.
Powered by swarms.ai π
For enterprise support and custom solutions, contact kye@swarms.world