Multi-threading with open-source LLMs was a game changer for us. We've been able to run independent tasks simultaneously, instead of waiting for agents sequentially, slashing turnaround times on document processing. Our team can now process 5x more cases without proportional headcount increases.
— Justin Trugman, Cofounder & Head of Technology, BetterFutureLabs
Why Parallel Execution Matters
Consider a real-world scenario where you're building a medical diagnosis support system. Your system needs multiple specialized multi-agent teams simultaneously analyzing different aspects of a patient case:
- Symptom pattern analysis team: Correlating symptoms with medical history (8-12 minutes)
- Medical literature review team: Synthesizing recent research and protocols (10-15 minutes)
- Drug interaction analysis team: Checking medication safety (6-9 minutes)
- Imaging analysis team: Interpreting radiology results (7-10 minutes)
- Treatment planning team: Developing therapy recommendations (9-12 minutes)
Each team must reach internal consensus through multi-agent discussions before contributing to the final diagnostic assessment.
Sequential execution: 40-58 minutes total (each team waits for the previous to complete)
Parallel execution: 10-15 minutes total (limited by the longest-running team)
This 70-75% reduction in processing time transforms diagnosis from a lengthy batch process into a near real-time clinical decision support tool.
Core Implementation Pattern
We've found success using Python's concurrent.futures.ThreadPoolExecutor as the foundation for parallel agent execution.
Why ThreadPoolExecutor works well for AG2:
- Perfect for AG2 agent operations: AG2 agents spend time waiting for LLM API calls, tool operations, web requests, etc.
- Shared memory space: Agents can access the same configuration objects without complex inter-process communication
- Resource efficiency: Lower overhead than full process creation
- Exception handling: Clean error propagation and debugging
import concurrent.futures # Define independent diagnostic analysis tasks diagnostic_tasks = [ "symptom_pattern_analysis", "literature_review", "drug_interaction_check", "imaging_analysis", "treatment_planning" ] # Execute diagnostic teams in parallel with concurrent.futures.ThreadPoolExecutor() as executor: futures = [ executor.submit(run_diagnostic_team, team_id, analysis_type, patient_data) for analysis_type in diagnostic_tasks ] concurrent.futures.wait(futures) # Wait for all teams to complete
Agent Factory Pattern
For parallel execution, each parallel execution creates completely fresh agent instances. This prevents state contamination and ensures true independence.
def run_diagnostic_team(team_id, analysis_type, patient_data): """ Factory function that creates a complete independent agent team """ # Create fresh user proxy for this diagnostic team user_proxy = UserProxyAgent( name=f"coordinator_{analysis_type}_{team_id}", is_termination_msg=lambda msg: "TERMINATE" in msg["content"], human_input_mode="NEVER", max_consecutive_auto_reply=1, code_execution_config=False ) # Create specialized medical analyst with unique configuration medical_specialist = GPTAssistantAgent( name=f"specialist_{analysis_type}_{team_id}", instructions=medical_instructions[analysis_type], overwrite_instructions=True, # Ensure clean state overwrite_tools=True, llm_config={ "config_list": config_list, "tools": diagnostic_tools[analysis_type], "assistant_id": specialist_assistant_ids[analysis_type] } ) # Create complete diagnostic team and execute team = [user_proxy, medical_specialist, clinical_reviewer, supervisor] groupchat = ag2.GroupChat(agents=team, messages=[], max_round=15) chat_manager = ag2.GroupChatManager(groupchat=groupchat, llm_config={"config_list": config_list}) user_proxy.initiate_chat(chat_manager, message=f"Perform {analysis_type} for patient: {patient_data}")
Key principles:
- Fresh instances: Every parallel execution gets brand new agent objects
- Unique naming: Prevents agent name conflicts across parallel executions
- Isolated configuration: Each team gets its own tools, instructions, and assistant IDs
- Clean state:
overwrite_instructions=Trueensures no state leakage
Common Pitfalls to Avoid
Race Conditions
The most dangerous trap is sharing mutable state between agents.
# BAD: Shared mutable state shared_patient_analysis = {} def bad_diagnostic_process(analysis_type): shared_patient_analysis[analysis_type] = perform_analysis(...) # Race condition! # GOOD: Independent storage def good_diagnostic_process(analysis_type, patient_id): result = perform_analysis(...) save_to_file(f"diagnosis_{patient_id}_{analysis_type}.json", result)
Resource Exhaustion
Creating unlimited threads will overwhelm your system.
# BAD: Unlimited workers with ThreadPoolExecutor() as executor: # Could create too many threads # GOOD: Controlled resource usage max_workers = min(len(tasks), 5) with ThreadPoolExecutor(max_workers=max_workers) as executor:
Ignoring Failures
In parallel execution, individual tasks will fail. Handle these gracefully.
# GOOD: Graceful error handling for future in concurrent.futures.as_completed(futures): try: result = future.result(timeout=300) handle_success(result) except Exception as e: handle_error(e)
Production Implementation Tips
Error Handling
def execute_parallel_tasks(doc_id, tasks, document_data): results = {} errors = {} with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_task = { executor.submit(process_document, doc_id, task, document_data): task for task in tasks } for future in concurrent.futures.as_completed(future_to_task): task = future_to_task[future] try: results[task] = future.result(timeout=300) logging.info(f"Task {task} completed successfully") except Exception as e: errors[task] = str(e) logging.error(f"Task {task} failed: {e}") return results, errors
Resource Management
The default ThreadPoolExecutor worker count is min(32, (os.process_cpu_count() or 1) + 4).
import os def get_optimal_workers(task_count): default_workers = min(32, (os.process_cpu_count() or 1) + 4) return min(task_count, default_workers)
Best Practices Summary
- Design for Complete Independence: Use separate data sources and storage for each parallel task
- Intelligent Resource Management: Calculate optimal workers, implement appropriate timeouts
- Comprehensive Error Handling: Graceful degradation, detailed logging
- Performance Monitoring: Track total execution time, individual task duration, success/failure rates
Expected outcomes when properly implemented:
- Significant reduction in processing time for multi-task workflows
- Improved user experience through concurrent execution
- Better resource utilization with intelligent task allocation
- Higher system reliability through graceful error handling
Parallel execution fundamentally changes how users interact with multi-agent systems. Start with these patterns, measure your results, and iterate based on your specific use case requirements.
