Source code for oscopilot.agents.friday_agent

from oscopilot.agents.base_agent import BaseAgent
from oscopilot.utils import check_os_version
import json
import logging
import sys
from oscopilot.prompts.friday_pt import prompt
from oscopilot.utils import TaskStatusCode, InnerMonologue, ExecutionState, JudgementResult, RepairingResult


[docs] class FridayAgent(BaseAgent): """ A FridayAgent orchestrates the execution of tasks by integrating planning, retrieving, and executing strategies. This agent is designed to process tasks, manage errors, and refine strategies as necessary to ensure successful task completion. It supports dynamic task planning, information retrieval, execution strategy application, and employs a mechanism for self-refinement in case of execution failures. """ def __init__(self, planner, retriever, executor, Tool_Manager, config): """ Initializes the FridayAgent with specified planning, retrieving, and executing strategies, alongside configuration settings. Args: planner (callable): A strategy for planning the execution of tasks. retriever (callable): A strategy for retrieving necessary information or tools related to the tasks. executor (callable): A strategy for executing planned tasks. Tool_Manager (callable): A tool manager for handling tool-related operations. config (object): Configuration settings for the agent. Raises: ValueError: If the OS version check fails. """ super().__init__() self.config = config tool_manager = Tool_Manager(config.generated_tool_repo_path) self.planner = planner(prompt['planning_prompt']) self.retriever = retriever(prompt['retrieve_prompt'], tool_manager) self.executor = executor(prompt['execute_prompt'], tool_manager, config.max_repair_iterations) self.score = self.config.score self.task_status = TaskStatusCode.START self.inner_monologue = InnerMonologue() try: check_os_version(self.system_version) except ValueError as e: print(e)
[docs] def run(self, task): """ Executes the given task by planning, executing, and refining as needed until the task is completed or fails. Args: query (object): The high-level task to be executed. No explicit return value, but the method controls the flow of task execution and may exit the process in case of irreparable failures. """ self.planner.reset_plan() self.reset_inner_monologue() sub_tasks_list = self.planning(task) print("The task list obtained after planning is: {}".format(sub_tasks_list)) while self.planner.sub_task_list: try: sub_task = self.planner.sub_task_list.pop(0) execution_state = self.executing(sub_task, task) isTaskCompleted, isReplan = self.self_refining(sub_task, execution_state) if isReplan: continue if isTaskCompleted: print("The execution of the current sub task has been successfully completed.") else: print("{} not completed in repair round {}".format(sub_task, self.config.max_repair_iterations)) break except Exception as e: print("Current task execution failed. Error: {}".format(str(e))) break
[docs] def self_refining(self, tool_name, execution_state: ExecutionState): """ Analyzes and potentially refines the execution of a tool based on its current execution state. This can involve replanning or repairing the execution strategy based on the analysis of execution errors and outcomes. Args: tool_name (str): The name of the tool being executed. execution_state (ExecutionState): The current state of the tool's execution, encapsulating all relevant execution information including errors, results, and codes. Returns: tuple: - isTaskCompleted (bool): Indicates whether the task associated with the tool has been successfully completed. - isReplan (bool): Indicates whether a replan is required due to execution state analysis. The method decides on the next steps by analyzing the type of error (if any) and the execution results, aiming to either complete the task successfully or identify the need for further action, such as replanning. """ isTaskCompleted = False isReplan = False score = 0 state, node_type, description, code, result, relevant_code = execution_state.get_all_state() if node_type in ['Python', 'Shell', 'AppleScript']: judgement = self.judging(tool_name, state, code, description) score = judgement.score # need_repair, critique, score, reasoning, error_type if judgement.status == 'Replan': # raise NotImplementedError print("The current task requires replanning...") new_sub_task_list = self.replanning(tool_name, judgement.critique) print("The new task list obtained after planning is: {}".format(new_sub_task_list)) isReplan = True elif judgement.status == 'Amend': repairing_result = self.repairing(tool_name, code, description, state, judgement.critique, judgement.status) if repairing_result.status == 'Complete': isTaskCompleted = True elif repairing_result.status == 'Replan': print("The current task requires replanning...") new_sub_task_list = self.replanning(tool_name, repairing_result.critique) print("The new task list obtained after planning is: {}".format(new_sub_task_list)) isReplan = True else: isTaskCompleted = False score = repairing_result.score result = repairing_result.result else: isTaskCompleted = True if node_type == 'Python' and isTaskCompleted and score >= self.score: self.executor.store_tool(tool_name, code) print("{} has been stored in the tool repository.".format(tool_name)) else: isTaskCompleted = True if isTaskCompleted: self.inner_monologue.result = result self.planner.update_tool(tool_name, result, relevant_code, True, node_type) return isTaskCompleted, isReplan
[docs] def planning(self, task): """ Decomposes a given high-level task into a list of sub-tasks by retrieving relevant tool names and descriptions, facilitating structured execution planning. Args: task (object): The high-level task to be planned and executed. Returns: list: A list of sub-tasks generated by decomposing the high-level task, intended for sequential execution to achieve the task's goal. This method leverages the retriever component to fetch information relevant to the task, which is then used by the planner component to decompose the task into manageable sub-tasks. """ retrieve_tool_name = self.retriever.retrieve_tool_name(task) retrieve_tool_description_pair = self.retriever.retrieve_tool_description_pair(retrieve_tool_name) # decompose task # Set up the generation format error handling mechanism try: self.planner.decompose_task(task, retrieve_tool_description_pair) except Exception as e: print("api call failed:", str(e)) return return self.planner.sub_task_list
[docs] def executing(self, tool_name, original_task): """ Executes a given sub-task as part of the task execution process, handling different types of tasks including code execution, API calls, and question-answering. Args: tool_name (str): The name of the tool associated with the sub-task. original_task (object): The original high-level task that has been decomposed into sub-tasks. Returns: ExecutionState: The state of execution for the sub-task, including the result, any errors encountered, and additional execution-related information. The method dynamically adapts the execution strategy based on the type of sub-task, utilizing the executor component for code execution, API interaction, or question-answering as appropriate. """ tool_node = self.planner.tool_node[tool_name] description = tool_node.description logging.info("The current subtask is: {subtask}".format(subtask=description)) code = '' state = None # The return value of the current task result = '' relevant_code = {} node_type = tool_node.node_type pre_tasks_info = self.planner.get_pre_tasks_info(tool_name) if node_type == 'Python': # retrieve existing tool retrieve_name = self.retriever.retrieve_tool_name(description, 3) relevant_code = self.retriever.retrieve_tool_code_pair(retrieve_name) # task execute step if node_type == 'QA': if self.planner.tool_num == 1: result = self.executor.question_and_answer_tool(pre_tasks_info, original_task, original_task) else: result = self.executor.question_and_answer_tool(pre_tasks_info, original_task, description) print(result) logging.info(result) else: invoke = '' # Set up the generation format error handling mechanism try: if node_type == 'API': api_path = self.executor.extract_API_Path(description) code = self.executor.api_tool(description, api_path, pre_tasks_info) else: code, invoke = self.executor.generate_tool(tool_name, description, node_type, pre_tasks_info, relevant_code) except Exception as e: print("api call failed:", str(e)) return # Execute python tool class code state = self.executor.execute_tool(code, invoke, node_type) result = state.result logging.info(state) output = { "result": state.result, "error": state.error } logging.info(f"The subtask result is: {json.dumps(output)}") return ExecutionState(state, node_type, description, code, result, relevant_code)
[docs] def judging(self, tool_name, state, code, description): """ Evaluates the execution of a tool based on its execution state and the provided code and description, determining whether the tool's execution was successful or requires amendment. Args: tool_name (str): The name of the tool being judged. state (ExecutionState): The current execution state of the tool, including results and error information. code (str): The source code associated with the tool's execution. description (str): A description of the tool's intended functionality. Returns: JudgementResult: An object encapsulating the judgement on the tool's execution, including whether it needs repair, a critique of the execution, and an optional error type and reasoning for the judgement. This method assesses the correctness of the executed code and its alignment with the expected outcomes, guiding potential repair or amendment actions. """ # Check whether the code runs correctly, if not, amend the code tool_node = self.planner.tool_node[tool_name] next_action = tool_node.next_action critique = '' score = 0 # Set up the generation format error handling mechanism try: critique, status, score = self.executor.judge_tool(code, description, state, next_action) except Exception as e: print("api call failed:", str(e)) return return JudgementResult(status, critique, score)
[docs] def replanning(self, tool_name, reasoning): """ Initiates the replanning process for a task based on new insights or failures encountered during execution, aiming to adjust the plan to better achieve the task goals. Args: tool_name (str): The name of the tool related to the task that requires replanning. reasoning (str): The rationale behind the need for replanning, often based on execution failures or updated task requirements. Returns: list: An updated list of sub-tasks after the replanning process, intended for sequential execution to complete the task. This method identifies alternative or additional tools and their descriptions based on the provided reasoning, updating the task plan accordingly. """ relevant_tool_name = self.retriever.retrieve_tool_name(reasoning) relevant_tool_description_pair = self.retriever.retrieve_tool_description_pair(relevant_tool_name) # Set up the generation format error handling mechanism try: self.planner.replan_task(reasoning, tool_name, relevant_tool_description_pair) except Exception as e: print("api call failed:", str(e)) return return self.planner.sub_task_list
[docs] def repairing(self, tool_name, code, description, state, critique, status): """ Attempts to repair the execution of a tool by amending its code based on the critique received and the current execution state, iterating until the code executes successfully or reaches the maximum iteration limit. Args: tool_name (str): The name of the tool being repaired. code (str): The current code of the tool that requires repairs. description (str): A description of the tool's intended functionality. state (ExecutionState): The current execution state of the tool, including results and error information. critique (str): Feedback on the tool's last execution attempt, identifying issues to be addressed. status (str): Three status types: 'Amend', 'Complete', and 'Replan'. Returns: RepairingResult: An object encapsulating the result of the repair attempt, including whether the task has been completed successfully, the amended code, critique, execution score, and the execution result. The method iterates, amending the tool's code based on feedback until the code executes correctly or the maximum number of iterations is reached. It leverages the executor component for amending the code and re-evaluating its execution. """ tool_node = self.planner.tool_node[tool_name] next_action = tool_node.next_action pre_tasks_info = self.planner.get_pre_tasks_info(tool_name) trial_times = 0 score = 0 while (trial_times < self.executor.max_iter and status == 'Amend'): trial_times += 1 print("current amend times: {}".format(trial_times)) # Set up the generation format error handling mechanism try: new_code, invoke = self.executor.repair_tool(code, description, tool_node.node_type, state, critique, pre_tasks_info) except Exception as e: print("api call failed:", str(e)) return critique = '' code = new_code # Run the current code and check for errors state = self.executor.execute_tool(code, invoke, tool_node.node_type) result = state.result logging.info(state) if state.error == None: # Set up the generation format error handling mechanism try: critique, status, score = self.executor.judge_tool(code, description, state, next_action) except Exception as e: print("api call failed:", str(e)) return # The task execution is completed and the loop exits if status == 'Complete': break elif status == 'Amend': pass elif status == 'Replan': break else: raise NotImplementedError else: # The code still needs to be corrected status = 'Amend' return RepairingResult(status, code, critique, score, result)
[docs] def reset_inner_monologue(self): self.inner_monologue = InnerMonologue()