任务调度Agent:复杂任务分解与执行

深入探讨任务调度Agent的核心技术,包括任务理解、依赖分析、调度策略和并行执行优化

概述与动机

在现代复杂系统中,任务调度是一个无处不在的挑战。从数据管道处理、批量作业执行到微服务编排,都需要高效、可靠的任务调度系统。传统的任务调度方法往往面临以下问题:

  1. 复杂性管理困难:随着任务数量增加,依赖关系变得难以维护
  2. 资源利用率低:缺乏智能调度策略,导致资源浪费
  3. 错误恢复机制弱:任务失败后难以快速恢复和重试
  4. 扩展性受限:难以适应动态变化的任务需求和资源环境

任务调度Agent通过引入AI能力,能够智能地理解任务需求、分析依赖关系、优化执行顺序、处理异常情况,从而显著提升任务调度系统的智能化水平和执行效率。

一个优秀的任务调度Agent可以带来以下价值:

  • 智能任务分解:将复杂任务自动拆解为可执行的子任务
  • 动态依赖分析:自动识别和管理任务间的依赖关系
  • 最优调度决策:基于实时状态和约束条件做出最优调度选择
  • 自适应执行:根据执行情况动态调整策略
  • 故障自愈:自动处理异常和恢复失败任务

本文将深入探讨任务调度Agent的核心技术,包括任务理解与分解、依赖关系分析、调度策略设计、并行执行优化以及状态追踪机制。

核心概念与架构设计

任务理解与分解

任务理解是任务调度Agent的基础能力。Agent需要能够:

  1. 任务类型识别:识别任务是计算密集型、IO密集型还是混合型
  2. 资源需求分析:理解任务需要的CPU、内存、存储等资源
  3. 优先级评估:根据业务需求和约束条件评估任务优先级
  4. 时间约束处理:理解任务的截止时间、执行窗口等时间约束

任务分解是将复杂任务拆解为可并行执行的子任务的过程。有效的任务分解需要考虑:

  • 独立性原则:子任务之间尽可能独立,减少依赖
  • 粒度平衡:子任务大小适中,避免过粗或过细
  • 资源匹配:子任务大小与可用资源相匹配
  • 可验证性:每个子任务的完成状态可以明确验证

依赖关系分析

任务依赖关系是调度决策的核心考虑因素。常见的依赖类型包括:

  • 顺序依赖:任务A必须在任务B之前完成
  • 条件依赖:任务B的执行依赖于任务A的输出结果
  • 资源依赖:多个任务共享同一资源,需要协调
  • 数据依赖:任务间的数据流转关系

依赖分析需要解决的关键问题:

  • 依赖图构建:自动识别和构建任务依赖图
  • 循环依赖检测:发现和处理任务间的循环依赖
  • 依赖优化:优化依赖关系以减少不必要的等待
  • 动态依赖处理:处理运行时出现的动态依赖

调度策略设计

调度策略决定了任务的执行顺序和资源分配。常见的调度策略包括:

  1. 先来先服务(FCFS):按任务到达顺序执行
  2. 最短作业优先(SJF):优先执行预计执行时间最短的任务
  3. 优先级调度:根据任务优先级调度
  4. 批处理调度:将相似任务批量执行以提升效率
  5. 适应性调度:根据实时状态动态调整调度策略

调度策略的选择需要综合考虑:

  • 任务特性(计算密集型、IO密集型等)
  • 资源约束(CPU、内存、存储等)
  • 性能目标(吞吐量、延迟、公平性等)
  • 环境条件(网络状况、负载情况等)

并行执行优化

并行执行是提升任务调度效率的关键技术。并行优化包括:

  • 任务级并行:多个独立任务并行执行
  • 数据级并行:对大规模数据进行分片并行处理
  • 流水线并行:将任务分解为多个阶段,形成流水线
  • 混合并行:结合多种并行策略

并行执行面临的挑战:

  • 资源竞争和冲突
  • 数据一致性和同步
  • 负载均衡
  • 容错和恢复

架构设计

任务调度Agent的整体架构如下:

Rendering diagram...

任务调度流程

任务调度的完整流程如下:

Rendering diagram...

关键技术实现

1. 任务模型定义

from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
import asyncio
from datetime import datetime, timedelta
import json
import hashlib
import uuid

class TaskType(Enum):
    """任务类型"""
    COMPUTE_INTENSIVE = "compute_intensive"
    IO_INTENSIVE = "io_intensive"
    MIXED = "mixed"
    BATCH = "batch"
    REALTIME = "realtime"

class TaskStatus(Enum):
    """任务状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"
    RETRYING = "retrying"
    BLOCKED = "blocked"

class TaskPriority(Enum):
    """任务优先级"""
    CRITICAL = 0
    HIGH = 1
    MEDIUM = 2
    LOW = 3

@dataclass
class ResourceRequirement:
    """资源需求"""
    cpu_cores: float = 1.0
    memory_mb: int = 1024
    disk_mb: int = 512
    network_bandwidth: Optional[float] = None  # Mbps
    gpu_count: int = 0

@dataclass
class TaskDependency:
    """任务依赖"""
    task_id: str
    dependency_type: str = "sequential"  # sequential, conditional, resource
    condition: Optional[Callable[[Any], bool]] = None

@dataclass
class Task:
    """任务定义"""
    id: str = field(default_factory=lambda: str(uuid.uuid4()))
    name: str = ""
    description: str = ""
    task_type: TaskType = TaskType.MIXED
    priority: TaskPriority = TaskPriority.MEDIUM

    # 执行相关
    executor: Optional[Callable] = None
    executor_params: Dict[str, Any] = field(default_factory=dict)

    # 资源需求
    resource_requirements: ResourceRequirement = field(default_factory=ResourceRequirement)

    # 依赖关系
    dependencies: List[TaskDependency] = field(default_factory=list)

    # 时间约束
    deadline: Optional[datetime] = None
    estimated_duration: Optional[timedelta] = None
    execution_window: Optional[tuple] = None  # (start_time, end_time)

    # 执行状态
    status: TaskStatus = TaskStatus.PENDING
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    retry_count: int = 0
    max_retries: int = 3

    # 元数据
    metadata: Dict[str, Any] = field(default_factory=dict)
    parent_task_id: Optional[str] = None
    sub_task_ids: List[str] = field(default_factory=list)

    # 执行结果
    result: Optional[Any] = None
    error: Optional[str] = None

    def __hash__(self):
        """使Task可哈希,用于集合操作"""
        return hash(self.id)

    def __eq__(self, other):
        """任务相等性判断"""
        if not isinstance(other, Task):
            return False
        return self.id == other.id

    def is_ready(self, task_map: Dict[str, 'Task']) -> bool:
        """
        检查任务是否准备好执行

        Args:
            task_map: 所有任务的映射字典

        Returns:
            是否准备好执行
        """
        if self.status != TaskStatus.PENDING:
            return False

        # 检查所有依赖是否完成
        for dep in self.dependencies:
            dep_task = task_map.get(dep.task_id)
            if not dep_task:
                return False

            if dep_task.status != TaskStatus.COMPLETED:
                return False

            # 检查条件依赖
            if dep.dependency_type == "conditional" and dep.condition:
                if not dep.condition(dep_task.result):
                    return False

        return True

    def to_dict(self) -> Dict[str, Any]:
        """转换为字典格式"""
        return {
            'id': self.id,
            'name': self.name,
            'description': self.description,
            'task_type': self.task_type.value,
            'priority': self.priority.value,
            'resource_requirements': {
                'cpu_cores': self.resource_requirements.cpu_cores,
                'memory_mb': self.resource_requirements.memory_mb,
                'disk_mb': self.resource_requirements.disk_mb,
                'gpu_count': self.resource_requirements.gpu_count
            },
            'dependencies': [
                {
                    'task_id': dep.task_id,
                    'dependency_type': dep.dependency_type
                }
                for dep in self.dependencies
            ],
            'deadline': self.deadline.isoformat() if self.deadline else None,
            'estimated_duration': self.estimated_duration.total_seconds() if self.estimated_duration else None,
            'status': self.status.value,
            'retry_count': self.retry_count,
            'metadata': self.metadata
        }

2. 任务分解器

from typing import List, Dict, Any, Optional
import re

class TaskDecomposer:
    """任务分解器"""

    def __init__(self):
        """初始化任务分解器"""
        # 任务分解规则
        self.decomposition_rules = {
            'data_processing': self._decompose_data_processing,
            'batch_computation': self._decompose_batch_computation,
            'pipeline': self._decompose_pipeline,
            'parallel_execution': self._decompose_parallel_execution
        }

    async def decompose(
        self,
        task: Task,
        strategy: str = "auto"
    ) -> List[Task]:
        """
        分解任务为子任务

        Args:
            task: 待分解的任务
            strategy: 分解策略

        Returns:
            分解后的子任务列表
        """
        if strategy == "auto":
            strategy = self._determine_decomposition_strategy(task)

        if strategy in self.decomposition_rules:
            sub_tasks = await self.decomposition_rules[strategy](task)
        else:
            sub_tasks = [task]  # 不分解

        # 更新任务关系
        task.sub_task_ids = [st.id for st in sub_tasks]
        for st in sub_tasks:
            st.parent_task_id = task.id

        return sub_tasks

    def _determine_decomposition_strategy(self, task: Task) -> str:
        """自动确定分解策略"""
        # 基于任务特征确定策略
        if task.task_type == TaskType.BATCH:
            return "batch_computation"
        elif "pipeline" in task.description.lower():
            return "pipeline"
        elif task.resource_requirements.cpu_cores > 1:
            return "parallel_execution"
        elif "data" in task.description.lower():
            return "data_processing"
        else:
            return "parallel_execution"

    async def _decompose_data_processing(
        self,
        task: Task
    ) -> List[Task]:
        """分解数据处理任务"""
        # 根据数据量和处理复杂度确定子任务数量
        data_size = task.metadata.get('data_size', 1000)
        chunk_size = task.metadata.get('chunk_size', 100)

        num_chunks = max(1, data_size // chunk_size)

        sub_tasks = []
        for i in range(num_chunks):
            sub_task = Task(
                name=f"{task.name}_chunk_{i}",
                description=f"Process data chunk {i}",
                task_type=task.task_type,
                priority=task.priority,
                executor=task.executor,
                executor_params={
                    **task.executor_params,
                    'chunk_index': i,
                    'total_chunks': num_chunks
                },
                resource_requirements=ResourceRequirement(
                    cpu_cores=task.resource_requirements.cpu_cores / num_chunks,
                    memory_mb=task.resource_requirements.memory_mb // num_chunks,
                    disk_mb=task.resource_requirements.disk_mb // num_chunks
                ),
                metadata={
                    **task.metadata,
                    'chunk_index': i,
                    'total_chunks': num_chunks
                }
            )
            sub_tasks.append(sub_task)

        return sub_tasks

    async def _decompose_batch_computation(
        self,
        task: Task
    ) -> List[Task]:
        """分解批量计算任务"""
        batch_size = task.metadata.get('batch_size', 10)
        total_items = task.metadata.get('total_items', 100)

        num_batches = max(1, total_items // batch_size)

        sub_tasks = []
        for i in range(num_batches):
            sub_task = Task(
                name=f"{task.name}_batch_{i}",
                description=f"Process batch {i}",
                task_type=task.task_type,
                priority=task.priority,
                executor=task.executor,
                executor_params={
                    **task.executor_params,
                    'batch_index': i,
                    'total_batches': num_batches
                },
                metadata={
                    **task.metadata,
                    'batch_index': i,
                    'total_batches': num_batches
                }
            )
            sub_tasks.append(sub_task)

        return sub_tasks

    async def _decompose_pipeline(self, task: Task) -> List[Task]:
        """分解流水线任务"""
        stages = task.metadata.get('pipeline_stages', [])
        sub_tasks = []

        for i, stage in enumerate(stages):
            sub_task = Task(
                name=f"{task.name}_stage_{i}",
                description=f"Execute pipeline stage {i}: {stage}",
                task_type=task.task_type,
                priority=task.priority,
                executor=task.executor,
                executor_params={
                    **task.executor_params,
                    'stage': stage,
                    'stage_index': i
                },
                metadata={
                    **task.metadata,
                    'stage': stage,
                    'stage_index': i
                }
            )

            # 添加流水线依赖
            if i > 0:
                # 依赖前一个阶段
                prev_task_id = f"{task.id}_stage_{i-1}"
                sub_task.dependencies.append(
                    TaskDependency(task_id=prev_task_id)
                )

            sub_tasks.append(sub_task)

        return sub_tasks

    async def _decompose_parallel_execution(
        self,
        task: Task
    ) -> List[Task]:
        """分解为可并行执行的任务"""
        # 简单地将任务分解为多个相同的并行任务
        parallelism = int(task.resource_requirements.cpu_cores)
        parallelism = max(1, parallelism)

        sub_tasks = []
        for i in range(parallelism):
            sub_task = Task(
                name=f"{task.name}_worker_{i}",
                description=f"Parallel worker {i}",
                task_type=task.task_type,
                priority=task.priority,
                executor=task.executor,
                executor_params={
                    **task.executor_params,
                    'worker_index': i,
                    'total_workers': parallelism
                },
                resource_requirements=ResourceRequirement(
                    cpu_cores=task.resource_requirements.cpu_cores / parallelism,
                    memory_mb=task.resource_requirements.memory_mb // parallelism
                ),
                metadata={
                    **task.metadata,
                    'worker_index': i,
                    'total_workers': parallelism
                }
            )
            sub_tasks.append(sub_task)

        return sub_tasks

3. 依赖分析器

from typing import Dict, List, Set, Optional
from collections import defaultdict, deque

class DependencyAnalyzer:
    """依赖关系分析器"""

    def __init__(self):
        """初始化依赖分析器"""
        self.dependency_graph = defaultdict(list)
        self.reverse_graph = defaultdict(list)
        self.task_map: Dict[str, Task] = {}

    def build_dependency_graph(self, tasks: List[Task]) -> None:
        """
        构建依赖图

        Args:
            tasks: 任务列表
        """
        # 清空现有图
        self.dependency_graph.clear()
        self.reverse_graph.clear()
        self.task_map.clear()

        # 构建任务映射
        for task in tasks:
            self.task_map[task.id] = task

        # 构建依赖图
        for task in tasks:
            for dep in task.dependencies:
                self.dependency_graph[dep.task_id].append(task.id)
                self.reverse_graph[task.id].append(dep.task_id)

    def detect_cycle(self) -> Optional[List[str]]:
        """
        检测循环依赖

        Returns:
            如果发现循环,返回循环中的任务ID列表,否则返回None
        """
        visited = set()
        rec_stack = set()

        def dfs(node: str, path: List[str]) -> Optional[List[str]]:
            if node in rec_stack:
                # 找到循环
                cycle_start = path.index(node)
                return path[cycle_start:] + [node]

            if node in visited:
                return None

            visited.add(node)
            rec_stack.add(node)

            for neighbor in self.dependency_graph[node]:
                result = dfs(neighbor, path + [node])
                if result:
                    return result

            rec_stack.remove(node)
            return None

        for node in self.task_map:
            if node not in visited:
                cycle = dfs(node, [])
                if cycle:
                    return cycle

        return None

    def get_execution_order(self) -> List[str]:
        """
        获取任务执行顺序(拓扑排序)

        Returns:
            按执行顺序排列的任务ID列表
        """
        # 计算入度
        in_degree = {task_id: 0 for task_id in self.task_map}
        for task_id in self.dependency_graph:
            for dependent_id in self.dependency_graph[task_id]:
                in_degree[dependent_id] += 1

        # 找到入度为0的节点
        queue = deque([task_id for task_id, degree in in_degree.items() if degree == 0])
        execution_order = []

        while queue:
            current = queue.popleft()
            execution_order.append(current)

            # 减少依赖该节点的节点的入度
            for dependent_id in self.dependency_graph[current]:
                in_degree[dependent_id] -= 1
                if in_degree[dependent_id] == 0:
                    queue.append(dependent_id)

        # 检查是否所有节点都被访问
        if len(execution_order) != len(self.task_map):
            raise ValueError("存在循环依赖,无法生成执行顺序")

        return execution_order

    def get_ready_tasks(self, task_status: Dict[str, TaskStatus]) -> List[Task]:
        """
        获取当前可执行的任务

        Args:
            task_status: 任务状态映射

        Returns:
            可执行的任务列表
        """
        ready_tasks = []

        for task_id, task in self.task_map.items():
            if task_status[task_id] == TaskStatus.PENDING:
                # 检查依赖是否都已完成
                all_deps_completed = True
                for dep in task.dependencies:
                    if task_status.get(dep.task_id) != TaskStatus.COMPLETED:
                        all_deps_completed = False
                        break

                if all_deps_completed:
                    ready_tasks.append(task)

        return ready_tasks

    def get_critical_path(self) -> List[str]:
        """
        获取关键路径

        Returns:
            关键路径上的任务ID列表
        """
        # 简单实现:返回最长路径
        # 实际应用中应该考虑任务执行时间

        def longest_path(node: str, visited: Set[str]) -> List[str]:
            if node in visited:
                return []

            visited.add(node)

            if not self.dependency_graph[node]:
                return [node]

            max_path = []
            for neighbor in self.dependency_graph[node]:
                path = longest_path(neighbor, visited.copy())
                if len(path) > len(max_path):
                    max_path = path

            return [node] + max_path

        max_path = []
        for node in self.task_map:
            path = longest_path(node, set())
            if len(path) > len(max_path):
                max_path = path

        return max_path

    def get_parallel_groups(self) -> List[List[str]]:
        """
        获取可并行执行的任务组

        Returns:
            并行任务组列表,每个组是一批可并行的任务
        """
        # 使用拓扑排序获取执行顺序
        execution_order = self.get_execution_order()

        # 分组:如果任务没有相互依赖,可以并行执行
        parallel_groups = []
        current_group = []
        processed = set()

        for task_id in execution_order:
            # 检查是否与当前组中的任务有依赖关系
            can_parallel = True
            for group_task_id in current_group:
                if (task_id in self.dependency_graph.get(group_task_id, []) or
                    group_task_id in self.dependency_graph.get(task_id, [])):
                    can_parallel = False
                    break

            if can_parallel:
                current_group.append(task_id)
            else:
                if current_group:
                    parallel_groups.append(current_group)
                current_group = [task_id]

        if current_group:
            parallel_groups.append(current_group)

        return parallel_groups

    def visualize_graph(self) -> str:
        """生成依赖图的可视化描述"""
        lines = ["任务依赖图:"]
        lines.append("=" * 50)

        for task_id, dependencies in self.dependency_graph.items():
            task_name = self.task_map.get(task_id, Task(id=task_id)).name
            lines.append(f"\n任务: {task_name} ({task_id})")

            if dependencies:
                lines.append("  依赖:")
                for dep_id in dependencies:
                    dep_name = self.task_map.get(dep_id, Task(id=dep_id)).name
                    lines.append(f"    - {dep_name} ({dep_id})")
            else:
                lines.append("  无依赖")

        return "\n".join(lines)

4. 调度引擎

from typing import List, Dict, Any, Optional, Callable
import heapq
import asyncio
from datetime import datetime, timedelta

class SchedulingStrategy(Enum):
    """调度策略"""
    FCFS = "fcfs"  # 先来先服务
    SJF = "sjf"  # 最短作业优先
    PRIORITY = "priority"  # 优先级调度
    DEADLINE = "deadline"  # 截止时间优先
    ADAPTIVE = "adaptive"  # 自适应调度

class SchedulingEngine:
    """调度引擎"""

    def __init__(self):
        """初始化调度引擎"""
        self.dependency_analyzer = DependencyAnalyzer()
        self.task_queue = []
        self.resource_manager = None
        self.strategy = SchedulingStrategy.PRIORITY

    def set_resource_manager(self, resource_manager: 'ResourceManager'):
        """设置资源管理器"""
        self.resource_manager = resource_manager

    def set_scheduling_strategy(self, strategy: SchedulingStrategy):
        """设置调度策略"""
        self.strategy = strategy

    async def schedule_tasks(
        self,
        tasks: List[Task],
        task_status: Optional[Dict[str, TaskStatus]] = None
    ) -> List[Task]:
        """
        调度任务执行

        Args:
            tasks: 待调度的任务列表
            task_status: 当前任务状态(可选)

        Returns:
            调度后的任务列表(按执行顺序)
        """
        if task_status is None:
            task_status = {task.id: task.status for task in tasks}

        # 构建依赖图
        self.dependency_analyzer.build_dependency_graph(tasks)

        # 检测循环依赖
        cycle = self.dependency_analyzer.detect_cycle()
        if cycle:
            raise ValueError(f"检测到循环依赖: {' -> '.join(cycle)}")

        # 获取可执行任务
        ready_tasks = self.dependency_analyzer.get_ready_tasks(task_status)

        # 根据策略排序
        sorted_tasks = self._sort_tasks_by_strategy(ready_tasks)

        return sorted_tasks

    def _sort_tasks_by_strategy(
        self,
        tasks: List[Task]
    ) -> List[Task]:
        """根据策略对任务排序"""
        if self.strategy == SchedulingStrategy.FCFS:
            return self._fcfs_sort(tasks)
        elif self.strategy == SchedulingStrategy.SJF:
            return self._sjf_sort(tasks)
        elif self.strategy == SchedulingStrategy.PRIORITY:
            return self._priority_sort(tasks)
        elif self.strategy == SchedulingStrategy.DEADLINE:
            return self._deadline_sort(tasks)
        elif self.strategy == SchedulingStrategy.ADAPTIVE:
            return self._adaptive_sort(tasks)
        else:
            return tasks

    def _fcfs_sort(self, tasks: List[Task]) -> List[Task]:
        """先来先服务排序"""
        return sorted(tasks, key=lambda t: t.metadata.get('arrival_time', datetime.now()))

    def _sjf_sort(self, tasks: List[Task]) -> List[Task]:
        """最短作业优先排序"""
        # 按预估执行时间排序
        return sorted(
            tasks,
            key=lambda t: t.estimated_duration.total_seconds() if t.estimated_duration else float('inf')
        )

    def _priority_sort(self, tasks: List[Task]) -> List[Task]:
        """优先级排序"""
        return sorted(tasks, key=lambda t: t.priority.value)

    def _deadline_sort(self, tasks: List[Task]) -> List[Task]:
        """截止时间排序"""
        # 按截止时间排序,优先执行紧迫的任务
        return sorted(
            tasks,
            key=lambda t: t.deadline if t.deadline else datetime.max
        )

    def _adaptive_sort(self, tasks: List[Task]) -> List[Task]:
        """自适应排序"""
        # 综合考虑多个因素
        def adaptive_score(task: Task) -> float:
            score = 0.0

            # 优先级因子(权重0.4)
            score += (4 - task.priority.value) * 0.4

            # 截止时间因子(权重0.3)
            if task.deadline:
                time_left = (task.deadline - datetime.now()).total_seconds()
                if time_left > 0:
                    score += (1.0 / time_left) * 100000 * 0.3

            # 执行时间因子(权重0.2)
            if task.estimated_duration:
                score -= task.estimated_duration.total_seconds() * 0.2

            # 资源需求因子(权重0.1)
            if self.resource_manager:
                available_resources = self.resource_manager.get_available_resources()
                resource_score = min(
                    available_resources.cpu_cores / task.resource_requirements.cpu_cores,
                    available_resources.memory_mb / task.resource_requirements.memory_mb
                )
                score += resource_score * 0.1

            return score

        return sorted(tasks, key=adaptive_score, reverse=True)

    def get_scheduling_statistics(
        self,
        tasks: List[Task]
    ) -> Dict[str, Any]:
        """
        获取调度统计信息

        Args:
            tasks: 任务列表

        Returns:
            调度统计信息
        """
        total_tasks = len(tasks)

        # 按状态统计
        status_counts = {}
        for status in TaskStatus:
            status_counts[status.value] = sum(1 for t in tasks if t.status == status)

        # 按优先级统计
        priority_counts = {}
        for priority in TaskPriority:
            priority_counts[priority.value] = sum(1 for t in tasks if t.priority == priority)

        # 计算平均执行时间
        completed_tasks = [t for t in tasks if t.status == TaskStatus.COMPLETED and t.start_time and t.end_time]
        avg_execution_time = None
        if completed_tasks:
            avg_execution_time = sum(
                (t.end_time - t.start_time).total_seconds()
                for t in completed_tasks
            ) / len(completed_tasks)

        return {
            'total_tasks': total_tasks,
            'status_distribution': status_counts,
            'priority_distribution': priority_counts,
            'average_execution_time': avg_execution_time,
            'completed_count': len(completed_tasks),
            'completion_rate': len(completed_tasks) / total_tasks if total_tasks > 0 else 0.0
        }

5. 并行执行器

from typing import List, Dict, Any, Optional
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time
import psutil

class ResourceManager:
    """资源管理器"""

    def __init__(self):
        """初始化资源管理器"""
        self.available_resources = self._get_system_resources()
        self.reserved_resources = {}

    def _get_system_resources(self) -> ResourceRequirement:
        """获取系统资源"""
        cpu_cores = psutil.cpu_count()
        memory_mb = psutil.virtual_memory().total // (1024 * 1024)
        disk_mb = psutil.disk_usage('/').free // (1024 * 1024)

        return ResourceRequirement(
            cpu_cores=float(cpu_cores),
            memory_mb=memory_mb,
            disk_mb=disk_mb
        )

    def get_available_resources(self) -> ResourceRequirement:
        """获取可用资源"""
        available = ResourceRequirement(
            cpu_cores=self.available_resources.cpu_cores,
            memory_mb=self.available_resources.memory_mb,
            disk_mb=self.available_resources.disk_mb
        )

        # 减去已预留的资源
        for task_id, reserved in self.reserved_resources.items():
            available.cpu_cores -= reserved.cpu_cores
            available.memory_mb -= reserved.memory_mb
            available.disk_mb -= reserved.disk_mb

        return available

    def reserve_resources(
        self,
        task_id: str,
        requirements: ResourceRequirement
    ) -> bool:
        """预留资源"""
        available = self.get_available_resources()

        if (requirements.cpu_cores <= available.cpu_cores and
            requirements.memory_mb <= available.memory_mb and
            requirements.disk_mb <= available.disk_mb):

            self.reserved_resources[task_id] = requirements
            return True

        return False

    def release_resources(self, task_id: str) -> None:
        """释放资源"""
        if task_id in self.reserved_resources:
            del self.reserved_resources[task_id]

class ParallelExecutor:
    """并行执行器"""

    def __init__(self, max_workers: int = None):
        """
        初始化并行执行器

        Args:
            max_workers: 最大工作线程数
        """
        self.max_workers = max_workers or psutil.cpu_count()
        self.resource_manager = ResourceManager()
        self.running_tasks: Dict[str, asyncio.Task] = {}

        # 执行器池
        self.thread_executor = ThreadPoolExecutor(max_workers=self.max_workers)
        self.process_executor = ProcessPoolExecutor(max_workers=self.max_workers // 2)

    async def execute_task(self, task: Task) -> Any:
        """
        执行单个任务

        Args:
            task: 待执行的任务

        Returns:
            任务执行结果
        """
        # 检查资源是否可用
        if not self.resource_manager.reserve_resources(task.id, task.resource_requirements):
            raise RuntimeError(f"资源不足,无法执行任务 {task.id}")

        try:
            task.status = TaskStatus.RUNNING
            task.start_time = datetime.now()

            # 根据任务类型选择执行方式
            if task.task_type == TaskType.IO_INTENSIVE:
                result = await self._execute_async(task)
            else:
                result = await self._execute_sync(task)

            task.status = TaskStatus.COMPLETED
            task.end_time = datetime.now()
            task.result = result

            return result

        except Exception as e:
            task.status = TaskStatus.FAILED
            task.error = str(e)
            task.end_time = datetime.now()

            # 检查是否需要重试
            if task.retry_count < task.max_retries:
                task.retry_count += 1
                task.status = TaskStatus.RETRYING

                # 延迟后重试
                await asyncio.sleep(2 ** task.retry_count)  # 指数退避
                return await self.execute_task(task)

            raise

        finally:
            # 释放资源
            self.resource_manager.release_resources(task.id)

    async def _execute_sync(self, task: Task) -> Any:
        """同步执行任务"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.thread_executor,
            task.executor,
            **task.executor_params
        )

    async def _execute_async(self, task: Task) -> Any:
        """异步执行任务"""
        if asyncio.iscoroutinefunction(task.executor):
            return await task.executor(**task.executor_params)
        else:
            return await self._execute_sync(task)

    async def execute_parallel_tasks(
        self,
        tasks: List[Task],
        max_concurrent: int = None
    ) -> Dict[str, Any]:
        """
        并行执行多个任务

        Args:
            tasks: 任务列表
            max_concurrent: 最大并发数

        Returns:
            执行结果字典
        """
        max_concurrent = max_concurrent or self.max_workers
        semaphore = asyncio.Semaphore(max_concurrent)

        async def execute_with_semaphore(task: Task):
            async with semaphore:
                return await self.execute_task(task)

        # 创建执行任务
        execution_tasks = [
            execute_with_semaphore(task)
            for task in tasks
        ]

        # 等待所有任务完成
        results = await asyncio.gather(*execution_tasks, return_exceptions=True)

        # 组装结果
        task_results = {}
        for task, result in zip(tasks, results):
            if isinstance(result, Exception):
                task_results[task.id] = {
                    'status': 'failed',
                    'error': str(result)
                }
            else:
                task_results[task.id] = {
                    'status': 'completed',
                    'result': result
                }

        return task_results

    async def execute_pipeline(
        self,
        pipeline_tasks: List[Task]
    ) -> Dict[str, Any]:
        """
        执行流水线任务

        Args:
            pipeline_tasks: 流水线任务列表(已按依赖关系排序)

        Returns:
            执行结果
        """
        results = {}

        for task in pipeline_tasks:
            # 执行任务
            try:
                result = await self.execute_task(task)
                results[task.id] = {
                    'status': 'completed',
                    'result': result
                }
            except Exception as e:
                results[task.id] = {
                    'status': 'failed',
                    'error': str(e)
                }
                break  # 流水线中某个任务失败则停止

        return results

    def get_execution_status(self) -> Dict[str, Any]:
        """获取执行状态"""
        return {
            'running_tasks': len(self.running_tasks),
            'available_resources': self.resource_manager.get_available_resources(),
            'reserved_resources': self.resource_manager.reserved_resources
        }

    async def shutdown(self) -> None:
        """关闭执行器"""
        # 取消所有运行中的任务
        for task_id, task in self.running_tasks.items():
            task.cancel()

        # 等待任务取消
        await asyncio.gather(*self.running_tasks.values(), return_exceptions=True)

        # 关闭执行器
        self.thread_executor.shutdown(wait=True)
        self.process_executor.shutdown(wait=True)

6. 完整的调度Agent示例

import asyncio
from typing import List, Dict, Any

# 示例任务执行函数
async def example_executor_1(**kwargs):
    """示例执行函数1"""
    print(f"执行任务: {kwargs.get('name', 'unknown')}")
    await asyncio.sleep(1)  # 模拟执行时间
    return {"result": f"任务完成: {kwargs.get('name', 'unknown')}"}

async def example_executor_2(**kwargs):
    """示例执行函数2"""
    print(f"执行任务: {kwargs.get('name', 'unknown')}")
    await asyncio.sleep(2)
    return {"result": f"任务完成: {kwargs.get('name', 'unknown')}"}

def example_executor_3(**kwargs):
    """示例执行函数3(同步)"""
    print(f"执行任务: {kwargs.get('name', 'unknown')}")
    time.sleep(1.5)
    return {"result": f"任务完成: {kwargs.get('name', 'unknown')}"}

async def main():
    """主函数,演示任务调度Agent的使用"""
    print("=== 任务调度Agent演示 ===\n")

    # 1. 创建示例任务
    tasks = [
        Task(
            name="数据预处理",
            description="预处理原始数据",
            task_type=TaskType.IO_INTENSIVE,
            priority=TaskPriority.HIGH,
            executor=example_executor_1,
            executor_params={'name': '数据预处理'},
            resource_requirements=ResourceRequirement(cpu_cores=1, memory_mb=512),
            estimated_duration=timedelta(seconds=1),
            metadata={'data_size': 1000}
        ),
        Task(
            name="特征提取",
            description="提取数据特征",
            task_type=TaskType.COMPUTE_INTENSIVE,
            priority=TaskPriority.HIGH,
            executor=example_executor_2,
            executor_params={'name': '特征提取'},
            resource_requirements=ResourceRequirement(cpu_cores=2, memory_mb=1024),
            estimated_duration=timedelta(seconds=2),
            dependencies=[
                TaskDependency(task_id="")  # 将在下面设置依赖
            ]
        ),
        Task(
            name="模型训练",
            description="训练机器学习模型",
            task_type=TaskType.COMPUTE_INTENSIVE,
            priority=TaskPriority.CRITICAL,
            executor=example_executor_3,
            executor_params={'name': '模型训练'},
            resource_requirements=ResourceRequirement(cpu_cores=4, memory_mb=2048),
            estimated_duration=timedelta(seconds=3)
        ),
        Task(
            name="结果评估",
            description="评估模型性能",
            task_type=TaskType.COMPUTE_INTENSIVE,
            priority=TaskPriority.MEDIUM,
            executor=example_executor_1,
            executor_params={'name': '结果评估'},
            resource_requirements=ResourceRequirement(cpu_cores=1, memory_mb=512),
            estimated_duration=timedelta(seconds=1)
        )
    ]

    # 设置任务依赖关系
    # 特征提取依赖数据预处理
    tasks[1].dependencies[0].task_id = tasks[0].id
    # 模型训练依赖特征提取
    tasks[2].dependencies.append(TaskDependency(task_id=tasks[1].id))
    # 结果评估依赖模型训练
    tasks[3].dependencies.append(TaskDependency(task_id=tasks[2].id))

    # 2. 创建调度引擎
    scheduler = SchedulingEngine()
    scheduler.set_scheduling_strategy(SchedulingStrategy.ADAPTIVE)

    # 3. 创建并行执行器
    executor = ParallelExecutor(max_workers=4)
    scheduler.set_resource_manager(executor.resource_manager)

    print("=== 任务信息 ===")
    for i, task in enumerate(tasks, 1):
        print(f"{i}. {task.name}")
        print(f"   类型: {task.task_type.value}, 优先级: {task.priority.value}")
        print(f"   预估时长: {task.estimated_duration}")
        if task.dependencies:
            deps = [f"任务{i+1}" for i, t in enumerate(tasks) if t.id in [d.task_id for d in task.dependencies]]
            print(f"   依赖: {', '.join(deps)}")
        print()

    print("=== 开始调度 ===")
    start_time = time.time()

    # 调度任务
    scheduled_tasks = await scheduler.schedule_tasks(tasks)

    print(f"调度顺序: {[t.name for t in scheduled_tasks]}")
    print()

    # 执行任务
    print("=== 开始执行 ===")
    results = await executor.execute_parallel_tasks(scheduled_tasks)

    end_time = time.time()
    print(f"\n=== 执行完成 ===")
    print(f"总执行时间: {end_time - start_time:.2f}秒")

    # 打印结果
    print("\n=== 执行结果 ===")
    for task_id, result in results.items():
        task = next(t for t in tasks if t.id == task_id)
        status = result['status']
        print(f"{task.name}: {status}")
        if status == 'completed':
            print(f"  结果: {result['result']}")
        else:
            print(f"  错误: {result['error']}")

    # 获取统计信息
    stats = scheduler.get_scheduling_statistics(tasks)
    print("\n=== 调度统计 ===")
    print(f"总任务数: {stats['total_tasks']}")
    print(f"完成任务数: {stats['completed_count']}")
    print(f"完成率: {stats['completion_rate']:.1%}")
    print(f"平均执行时间: {stats['average_execution_time']:.2f}秒")

    # 关闭执行器
    await executor.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

最佳实践与常见陷阱

最佳实践

  1. 合理的任务分解

    • 根据任务特性选择合适的分解粒度
    • 避免过度分解导致调度开销过大
    • 保持子任务间的独立性
  2. 智能的调度策略

    • 根据系统状态动态调整调度策略
    • 综合考虑优先级、截止时间、资源需求
    • 支持多种调度策略的灵活切换
  3. 有效的资源管理

    • 实时监控资源使用情况
    • 预留和释放资源要及时准确
    • 支持资源超卖和弹性扩展
  4. 完善的错误处理

    • 实现任务重试机制
    • 支持任务失败后的恢复
    • 提供详细的错误日志和监控
  5. 性能监控与优化

    • 监控任务执行时间和资源使用
    • 分析调度决策的有效性
    • 持续优化调度算法

常见陷阱

  1. 忽略资源限制

    • 问题:不考虑资源限制导致任务调度失败
    • 解决:实现完整的资源管理和预留机制
  2. 依赖关系处理不当

    • 问题:循环依赖或依赖关系错误导致死锁
    • 解决:实现依赖关系检测和验证
  3. 过度并行化

    • 问题:并行化过度导致资源竞争和性能下降
    • 解决:根据实际资源情况调整并行度
  4. 缺乏容错机制

    • 问题:任务失败后无法恢复
    • 解决:实现重试、检查点和恢复机制
  5. 调度策略单一

    • 问题:使用固定调度策略无法适应变化
    • 解决:实现自适应调度策略

性能优化考虑

性能优化策略

  1. 任务预调度

    • 提前分析任务依赖和资源需求
    • 预分配资源和执行槽位
    • 减少调度决策时间
  2. 智能缓存

    • 缓存重复任务的执行结果
    • 缓存依赖分析和调度决策
    • 减少重复计算
  3. 批量处理

    • 批量处理相似的小任务
    • 减少调度和资源分配开销
    • 提升整体吞吐量
  4. 动态调整

    • 根据系统负载动态调整并行度
    • 优化任务调度顺序
    • 自适应调整资源分配

监控指标

关键性能指标:

  • 任务吞吐量:单位时间内完成的任务数量
  • 任务延迟:任务提交到完成的时间
  • 资源利用率:CPU、内存、磁盘等资源的利用率
  • 调度效率:调度决策时间和资源分配效率
  • 任务成功率:任务成功完成的比例

参考资源

官方文档

相关工具

  • Airflow:工作流管理和调度平台
  • Prefect:现代工作流编排工具
  • Dagster:数据管道编排平台
  • Celery:分布式任务队列

研究论文

  • "A Survey of Task Scheduling in Cloud Computing" - IEEE Access 2021
  • "Machine Learning for Job Scheduling: A Survey" - ACM Computing Surveys 2020
  • "Parallel Task Scheduling on Multi-Core Processors" - Journal of Parallel and Distributed Computing 2022

实践资源

通过本文的深入讲解和代码示例,您应该能够理解并实现一个功能完善的任务调度Agent。任务调度是复杂系统中的关键组件,合理的设计和实现能够显著提升系统性能和可靠性。希望这些技术和实践能够帮助您构建更智能、更高效的任务调度系统。