
Java实现代码javapublic class BackgroundTasksSystem { // --- 配置 --- private static final Path WORKDIR Paths.get(System.getProperty(user.dir)); private static final Gson gson new GsonBuilder().setPrettyPrinting().create(); // --- 后台任务管理器 --- static class BackgroundManager { // 任务存储 private final MapString, TaskInfo tasks new ConcurrentHashMap(); // 通知队列 private final QueueTaskNotification notificationQueue new ConcurrentLinkedQueue(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter new AtomicInteger(1); // 锁 private final Object lock new Object(); static class TaskInfo { String taskId; String status; // running, completed, timeout, error String result; String command; long startTime; Thread thread; // 关联的执行线程 } static class TaskNotification { String taskId; String status; String command; String result; } /** * 启动后台任务 * 立即返回任务 ID不等待命令完成 */ public String run(String command) { String taskId task_ taskIdCounter.getAndIncrement(); TaskInfo task new TaskInfo(taskId, command); tasks.put(taskId, task); // 创建并启动后台线程 Thread thread new Thread(() - executeTask(task), BackgroundTask- taskId); thread.setDaemon(true); task.thread thread; thread.start(); // 立即返回不阻塞 return String.format(Background task %s started: %s, taskId, command.substring(0, Math.min(command.length(), 80))); } /** * 线程目标执行子进程捕获输出推送结果到队列 */ private void executeTask(TaskInfo task) { String output; String status; try { ProcessBuilder pb new ProcessBuilder(bash, -c, task.command); pb.directory(WORKDIR.toFile()); pb.redirectErrorStream(true); Process process pb.start(); boolean finished process.waitFor(300, TimeUnit.SECONDS); // 5分钟超时 if (!finished) { process.destroy(); output Error: Timeout (300s); status timeout; } else { output new String(process.getInputStream().readAllBytes()).trim(); status completed; } } catch (Exception e) { output Error: e.getMessage(); status error; } // 更新任务状态 task.status status; task.result output.isEmpty() ? (no output) : output.substring(0, Math.min(output.length(), 50000)); // 添加通知到队列 synchronized (lock) { notificationQueue.offer(new TaskNotification( task.taskId, status, task.command.substring(0, Math.min(task.command.length(), 80)), task.result.substring(0, Math.min(task.result.length(), 500)) )); } } /** * 检查任务状态 * 如果指定 taskId检查单个任务否则列出所有任务 */ public String check(String taskId) { if (taskId ! null !taskId.isEmpty()) { TaskInfo task tasks.get(taskId); if (task null) { return Error: Unknown task taskId; } return String.format([%s] %s\n%s, task.status, task.command.substring(0, Math.min(task.command.length(), 60)), task.result ! null ? task.result : (running)); } else { StringBuilder sb new StringBuilder(); for (Map.EntryString, TaskInfo entry : tasks.entrySet()) { TaskInfo task entry.getValue(); sb.append(String.format(%s: [%s] %s\n, task.taskId, task.status, task.command.substring(0, Math.min(task.command.length(), 60)))); } return sb.length() 0 ? sb.toString().trim() : No background tasks.; } } /** * 清空通知队列并返回所有待处理的通知 */ public ListTaskNotification drainNotifications() { synchronized (lock) { ListTaskNotification notifications new ArrayList(); while (!notificationQueue.isEmpty()) { notifications.add(notificationQueue.poll()); } return notifications; } } /** * 获取所有任务 */ public MapString, TaskInfo getAllTasks() { return new HashMap(tasks); } } // 初始化后台管理器 private static final BackgroundManager BG_MANAGER new BackgroundManager(); // --- 工具枚举 --- public enum ToolType { BASH(bash, Run a shell command (blocking).), READ_FILE(read_file, Read file contents.), WRITE_FILE(write_file, Write content to file.), EDIT_FILE(edit_file, Replace exact text in file.), BACKGROUND_RUN(background_run, Run command in background thread. Returns task_id immediately.), // 新增 CHECK_BACKGROUND(check_background, Check background task status. Omit task_id to list all.); // 新增 public final String name; public final String description; ToolType(String name, String description) { this.name name; this.description description; } } // --- 工具处理器映射 --- private static final MapString, ToolExecutor TOOL_HANDLERS new HashMap(); static { // ... 省略基础工具注册 // 后台任务工具 TOOL_HANDLERS.put(ToolType.BACKGROUND_RUN.name, args - { String command (String) args.get(command); return BG_MANAGER.run(command); }); TOOL_HANDLERS.put(ToolType.CHECK_BACKGROUND.name, args - { String taskId (String) args.get(task_id); return BG_MANAGER.check(taskId); }); } // ... 省略相同的工具实现 // --- Agent 主循环集成后台任务通知--- public static void agentLoop(ListMapString, Object messages) { while (true) { try { // 在 LLM 调用前检查后台通知 ListBackgroundManager.TaskNotification notifications BG_MANAGER.drainNotifications(); if (!notifications.isEmpty() !messages.isEmpty()) { StringBuilder notifText new StringBuilder(); notifText.append(background-results\n); for (BackgroundManager.TaskNotification notif : notifications) { notifText.append(String.format([bg:%s] %s: %s\n, notif.taskId, notif.status, notif.result)); } notifText.append(/background-results); messages.add(Map.of( role, user, content, notifText.toString() )); messages.add(Map.of( role, assistant, content, Noted background results. )); // 异步结果注入将后台任务结果插入到对话中 // 结构化格式用XML标签包裹便于LLM解析 } // 显示当前活动任务 MapString, BackgroundManager.TaskInfo activeTasks BG_MANAGER.getAllTasks(); int runningTasks (int) activeTasks.values().stream() .filter(t - running.equals(t.status)) .count(); if (runningTasks 0) { System.out.printf([Active background tasks: %d]\n, runningTasks); } // ... 省略相同的 LLM 调用和工具执行逻辑 } catch (Exception e) { System.err.println(Error in agent loop: e.getMessage()); e.printStackTrace(); return; } } } }这段代码引入了后台任务系统解决了 Agent 在执行长时间任务时的阻塞问题关键洞察Agent 可以在命令执行时继续工作而不是被阻塞。异步任务处理架构核心思想从同步阻塞的任务执行升级为异步非阻塞的并发处理让Agent能够同时处理多个耗时任务实现并行计算能力大幅提升效率和响应性。java// 后台任务管理器 - 异步执行引擎 static class BackgroundManager { // 任务存储 private final MapString, TaskInfo tasks new ConcurrentHashMap(); // 通知队列 private final QueueTaskNotification notificationQueue new ConcurrentLinkedQueue(); // 任务 ID 生成器 private final AtomicInteger taskIdCounter new AtomicInteger(1); // 并发安全使用线程安全集合 // 异步通信通过队列传递任务结果 // 唯一标识自动生成任务ID }解耦执行任务提交和执行分离立即返回控制权并发管理多个后台任务可以同时运行结果异步收集通过队列机制收集完成的任务结果线程安全使用并发集合确保多线程安全任务信息结构设计java// 任务信息实体 static class TaskInfo { String taskId; // 唯一标识 String status; // 状态running, completed, timeout, error String result; // 执行结果 String command; // 执行的命令 long startTime; // 开始时间 Thread thread; // 关联的执行线程 // 完整状态跟踪从启动到完成的全生命周期 // 线程关联可以控制或监控执行线程 // 时间戳支持超时和性能分析 } // 任务通知实体 static class TaskNotification { String taskId; String status; String command; String result;