# Concept - `Assistant`는 대화형 인공지능 어시스턴트를 구축하고 관리하는 데 사용되는 프로그래밍 인터페이스이다. - `Assistant`는 `openAI` 플랫폼 안에서 제공되는 `API`로 [[LCEL(LangChain Expression Language)]]으로 민들어진 `GPT`보다 더 범용적으로 사용할 수 있다. - `Assistant`는 `Tool`이라는 도구가 있으며, `Assistant`는 `LLM`에게 해당 `Tool`을 사용하여 `User`에게 답변을 주도록 돕는 기능을 수행한다. - `Assistant`는 기본적으로 무료이나, `Code Interpreter`나 `Retrieval`을 사용하려면 사용 `Model`에 따라 돈을 지불해야 한다. - 아직 **Beta** Version이며 [assistant overview](https://platform.openai.com/docs/assistants/overview)에 자세한 내용이 나와있다. # Components - `Assistant`는 크게 4개의 `Components`로 구분되어 있다. - `Assistant`, `Thread`, `Message`, `Run` 모두 고유의 번호(**id**)가 존재한다. - `Id`을 통해 긱 요소가 어떠한 다른 요소와 연결되어 있는지를 설정할 수 있다. ### Assistant - `Assistant`의 옵션을 설정할 수 있다. ([OpenAI API](https://platform.openai.com/playground/assistants)에서도 만들 수 있고, `python`, `node.js`로도 만들 수 있다.) - `Assistant`의 Attributes는 다음과 같다. - Name : `Assistant`의 이름이다. - Instructions : 어떠한 `Assistant`인지 설명하는 내용이다. - Model : `Assistant`에 사용할 Model(`GPT3, GPT4`) - Tool(retriever, code interpreter, function) : `LLM`에게 전달할 기능들을 선언/설정한다. - Response format : [[Output Parser]] 같은 기능으로 Reponse가 어떠한 형식으로 나올지를 결정한다. - Temperature : `Model`의 창의성을 설정한다. - Top p : 단어들의 확률을 누적하여 **특정 임계값 P**에 도달할 때까지의 단어들만을 고려합니다. ### Thread - `User`와 `Assistant`가 주고받는 `Messages`의 모음이다.(사용자 각자의 Thread을 가지게 된다.) - `Thraed`가 `Messages`을 다 가지고 있어 `Memory` 기능을 수행한다.(오래된 메시지는 지우거나 압축시켜서 용량이 너무 커지지 않게 관리한다.) - [Threads Info](https://platform.openai.com/threads) - `Thread` 내용을 볼 수 있다. (Settings - Data Controls - Threads을 볼 수 있게 설정 해야 보인다.) ### Message - `User` / `Assistant`의 대화 내용들이다. **(input/output)** - `Message`의 Attributes는 다음과 같다. - `Thread Id` : 어떠한 `Thread`의 포함 시킬 것 인지를 설정한다. - `role`: 누구의 메시지인지를 설정한다. (`"user"/"assistant"`) - `content`: `Message`의 내용이다. ### Run - `Thread`안의` Message`을 `model`을 통해 실행(run)하는 것을 의미한다. - `Model`이 해당 `Message`를 보고 즉시 대답을 해줄지 아니면 적절한 `Tool`을 사용할지 결정하여 대답을 해준다. - `Status`을 통해 `Run`의 진행 상황을 알 수 있다. - `in_progress` : `Run`이 진행되고 있는 상황이다. - `requires action` : `Tool`을 사용할 필요가 있어 대기하고 있는 상황이다.(이때 `Tool`이 `functions`이라면 `Assistant`에게 `Tool`을 사용한 결과를 보내주어야 한다.) - `completed` : `Run`이 완료된 상황이다. - `Run`의 Attributes는 다음과 같다. - `Thread Id` : 어떠한 `Thread`을 사용할 것인지를 설정한다. - `Assistant Id` : 어떠한 `Assistant`을 사용할 것인지를 설정한다. # Process ![[Assistant Process.webp]] - `User`의 질문(`User's Message`)을 `Message`로 만들면 해당 `Message`가 `Thread`에 보관된다. - `Run`을 통해 `Model`을 작동시키면 `Thread`에 있는 `Message`가 `Model`에게 보내져 `User`의 질문의 답변(`Assistant Message`)을 만들어진다. - `Assistant Message` 또한 `Thread`에 저장되며 답변이 만들어지는 과정에서 `Tool`이 필요하다면 `Run`은 `requires action`가 되며 해당 **`Tool`이 `functions`이라면 어떠한 `function`이 필요하고 해당 `funtion`의 `Parameter`가 무엇인지를 `Model`을 통해 만들어 이를 반환한다.**(`Tool`이 `functions`이 아니라면 자동으로 해당 `Tool`이 실행된다.) - `run`의 `submit_tool_outputs`을 통해 `Assistant`에게 `function Output`값을 전달하면 해당 값이 `Model`에게 넘어가 `Assistant Message`을 만들게 된다.(단 `Model`이 `functions`을 여러 번 호출하는 경우도 있기 때문에 `Run`의 `Stauts`을 통해 판단하여야 한다.) - `Run`의 `Stauts`가 `completed`가 되면 `Assistant Message`가 `Thread`에 저장된다. # Functions Tool Assistant - `LLM`이 `Assistant Message`을 구하는 과정에서 특정 `function`이 필요한 경우 해당 `function`이 원하는 `Parameters`을 만들어 `function`에 전달하며, `function`을 실행시켜 얻은 결과를 바탕으로 `User`의 답변을 만들도록 하는 기능이다. ([[Agent]]랑 비슷한 기능이 비슷하다.) - `Assistant`을 만드는 과정에서 `function`의 기능과 필요 `Parameters`의 정보를 전달하는 `functions`을 선언하여야 한다. - `functions`는 `List` 형태이며 그 안에는 `"description"(해당 함수/변수에 대한 설명), "type"(해당 값의 type), "required"(해당 함수을 실행하기에 Model에게 요구되는 값)`등이 포함되어 있는 `json schema `형태의 `function` 값들이 들어있다. (자세한 내용 참고 : [OpenAI - Define Functions](https://platform.openai.com/docs/assistants/tools/function-calling)) - `Assistant`는 `Functions Tool`을 자동으로 실행하지 않는다. 대신, `function`을 실행시키기 위한 정보를 주기 때문에 이를 이용해 `function`을 실행하여 그 `output`을 `run`의 `submit_tool_outputs`에 전달하여야 한다. - `submit_tool_outputs`은 `Run Id`와 `Thread Id`, `Tool Outputs`이 필요하다. - `function`의 `output`은 항상 `String` 값이어야 하기 때문에 만약, `yfinance` 같은 라이브러리를 사용하여 얻은 값이 `json`이라면 `json.dumps` 같은 함수를 사용하여 `json`값을 `String`으로 바꾸어 반환해야 한다. - 코드의 전체적인 내용은 [[Investor GPT]]와 비슷하기 때문에 `Assistant`을 제외한 Code의 내용은 해당 `Part`를 참고해라. ```python from langchain.utilities.duckduckgo_search import DuckDuckGoSearchAPIWrapper import yfinance import json import openai as client def get_run(run_id, thread_id):     return client.beta.threads.runs.retrieve(         run_id=run_id,         thread_id=thread_id     ) def get_messages(thread_id):     messages = client.beta.threads.messages.list(thread_id=thread_id)     messages = list(messages)     messages.reverse()     for message in messages:         print(f"{message.role}:{message.content[0].text.value}") def get_tool_output(run_id, thread_id):     run = get_run(run_id, thread_id)     outputs = []     for action in run.required_action.submit_tool_outputs.tool_calls:         action_id = action.id         function = action.function         # print(f"Calling function {function.name} with arg {function.arguments}")         outputs.append(             {                 "output": functions_map[function.name](json.loads(function.arguments)),                 "tool_call_id": action_id,             }         )     return outputs def submit_tool_output(run_id, thread_id):     outputs = get_tool_output(run_id, thread_id)     return client.beta.threads.runs.submit_tool_outputs(         run_id=run_id,         thread_id=thread_id,         tool_outputs=outputs     ) def send_message(thread_id, content):     return client.beta.threads.messages.create(         thread_id=thread_id,         role="user",         content=content,     ) def get_ticker(inputs):     ddg = DuckDuckGoSearchAPIWrapper()     company_name = inputs["company_name"]     return ddg.run(f"Ticker symbol of {company_name}") def get_income_statement(inputs):     ticker = inputs["ticker"]     stock = yfinance.Ticker(ticker)     return json.dumps(stock.income_stmt.to_json()) def get_balance_sheet(inputs):     ticker = inputs["ticker"]     stock = yfinance.Ticker(ticker)     return json.dumps(stock.balance_sheet.to_json()) def get_daily_stock_performance(inputs):     ticker = inputs["ticker"]     stock = yfinance.Ticker(ticker)     return json.dumps(stock.history(period="3mo").to_json()) functions_map = {     "get_ticker": get_ticker,     "get_income_statement": get_income_statement,     "get_balance_sheet": get_balance_sheet,     "get_daily_stock_performance": get_daily_stock_performance, } functions = [     {         "type": "function",         "function": {             "name": "get_ticker",             "description": "Given the name of a company returns its ticker symbol",             "parameters": {                 "type": "object",                 "properties": {                     "company_name": {                         "type": "string",                         "description": "The name of the company",                     }                 },                 "required": ["company_name"],             },         },     },     {         "type": "function",         "function": {             "name": "get_income_statement",             "description": "Given a ticker symbol (i.e AAPL) returns the company's income statement.",             "parameters": {                 "type": "object",                 "properties": {                     "ticker": {                         "type": "string",                         "description": "Ticker symbol of the company",                     },                 },                 "required": ["ticker"],             },         },     },     {         "type": "function",         "function": {             "name": "get_balance_sheet",             "description": "Given a ticker symbol (i.e AAPL) returns the company's balance sheet.",             "parameters": {                 "type": "object",                 "properties": {                     "ticker": {                         "type": "string",                         "description": "Ticker symbol of the company",                     },                 },                 "required": ["ticker"],             },         },     },     {         "type": "function",         "function": {             "name": "get_daily_stock_performance",             "description": "Given a ticker symbol (i.e AAPL) returns the performance of the stock for the last 100 days.",             "parameters": {                 "type": "object",                 "properties": {                     "ticker": {                         "type": "string",                         "description": "Ticker symbol of the company",                     },                 },                 "required": ["ticker"],             },         },     }, ] assistant = client.beta.assistants.create(     name="Investor Assistant",     instructions="You help user do research on publicly traded companies and you help them decide if they should buy the stock or not.",     model="gpt-3.5-turbo-0125",     tools = functions ) assistant_id = assistant.id thread = client.beta.threads.create() thread_id = thread.id message = client.beta.threads.messages.create(     thread_id=thread_id,     role= "user",     content="I want to know if Health Catalyst stock is a good buy." ) run = client.beta.threads.runs.create(     thread_id=thread_id,     assistant_id=assistant_id, ) run_id = run.id print(get_run(run_id, thread_id).status) print(submit_tool_output(run_id, thread_id)) print(get_messages(thread_id)) ``` # File Search Assistant - `File`을 받아 [[RAG(Retrieval-Augmented Generation)]]을 거쳐 나온 정보를 토대로 `User's Message`의 `Assistant's Message`을 구할 수 있도록 도와주는 `Tool`이다. - `Assistant`을 만들 때 `tools`에 `file_search`을 지정해야 한다. - `files.create`을 통해 `File`을 만들 수 있으며 고유의 `File Id`가 생성된다. - `Message`의 `attachments`의 `File Id`와 `tool("file_search")` 넣으면 `Assistant`가 `File`을 받아 [[RAG(Retrieval-Augmented Generation)]]을 거쳐 저장한다. ```python message = client.beta.threads.messages.create(     thread_id=thread_id,     role="user",     content="I want you to help me with this file",     attachments=[         {             "file_id": file.id,             "tools": [                 {                     "type": "file_search",                 }             ],         }     ], ) ``` - `File`만 `Assistant`에 넣는다면 `Message`에 `User's Message`을 입력하면 자동으로 `File`의 내용을 참고하여 `Assistant's Message`을 반환한다. - `Assistant's Message`에는 `annotations` 값이 포함되어 있는데 `annotations`에는 해당 `Assistant's Message`이 `File`의 어느 부분을 참고하였는지 저장되어 있다. ```python import json import openai as client def get_run(run_id, thread_id):     return client.beta.threads.runs.retrieve(         run_id=run_id,         thread_id=thread_id     ) def get_messages(thread_id):     messages = client.beta.threads.messages.list(thread_id=thread_id)     messages = list(messages)     messages.reverse()     for message in messages:         print(message)         # print(f"{message.role}:{message.content[0].text.value}")         for annotation in message.content[0].text.annotations:             print(f"Source : {annotation.file_citation}") def send_message(thread_id, content):     return client.beta.threads.messages.create(         thread_id=thread_id,         role="user",         content=content,     ) assistant = client.beta.assistants.create(     name="Book Assistant",     instructions="You help user do research with their question on the files they upload.",     model="gpt-3.5-turbo-0125",     tools=[{"type": "file_search"}], ) assistant_id = assistant.id file = client.files.create(     file = open("./files/chapter_one.txt", "rb"),     purpose = "assistants" ) thread = client.beta.threads.create() thread_id = thread.id message = client.beta.threads.messages.create(     thread_id=thread_id,     role="user",     content="I want you to help me with this file",     attachments=[         {             "file_id": file.id,             "tools": [                 {                     "type": "file_search",                 }             ],         }     ], ) run = client.beta.threads.runs.create(     thread_id=thread_id,     assistant_id=assistant_id, ) run_id = run.id send_message(thread_id, "Where dose he work?") print(get_run(run_id, thread_id).status) print(get_messages(thread_id)) ``` # Assistant Streaming - Assistant 2.0 이후로부터는 `Streaming`이 제공된다. - `Run`을 실행하는 과정에서 `event_handler`을 지정하면 `Stream` 기능을 사용할 수 있다. - `Streaming`을 사용하면 `Assistant`가 `function Tool` 사용을 원할 때, 이를 바로바로 넘겨줄 수 있다. - `event_handler`는 Class로 `AssistantStreamEvent`을 **Parent class**로 두어야 하며 `from openai.types.beta import AssistantStreamEvent`로 불러올 수 있다. - `AssistantStreamEvent`에는 `on_run_step_created`, `on_text_delt`, `on_tool_call_done` 등 다양한 `function`이 제공된다. (`create` : 만들어질 때, `delt` : 실행중일 때, `done`: 끝났을 때) - **`Run`을 실행할 때마다 `event_handler`을 지정해주어야** 하기 때문에 `submit_tool_outputs_stream`에서 `function output`을 넘겨주는 과정에서도 `event_handler`을 지정해주어야 한다. ```python from typing_extensions import override from openai import AssistantEventHandler from openai.types.beta.threads import Message, MessageDelta from openai.types.beta.threads.runs import ToolCall, RunStep from openai.types.beta import AssistantStreamEvent class EventHandler(AssistantEventHandler):     def __init__(self, thread_id, assistant_id):         super().__init__()         self.output = None         self.tool_id = None         self.thread_id = thread_id         self.assistant_id = assistant_id         self.run_id = None         self.run_step = None         self.function_name = ""         self.arguments = ""             @override     def on_text_created(self, text) -> None:         print(f"\nassistant on_text_created > ", end="", flush=True)     @override     def on_text_delta(self, delta, snapshot):         # text가 만들어질때마다 작성하도록         # print(f"\nassistant on_text_delta > {delta.value}", end="", flush=True)         print(f"{delta.value}")     @override     def on_end(self):         # 각 stream이 끝날 때때         print(f"\n end assistant > ", self.current_run_step_snapshot, end="", flush=True)     @override     def on_exception(self, exception: Exception) -> None:         """Fired whenever an exception happens during streaming"""         print(f"\nassistant > {exception}\n", end="", flush=True)             @override     def on_message_created(self, message: Message) -> None:         print(f"\nassistant on_message_created > {message}\n", end="", flush=True)     @override     def on_message_done(self, message: Message) -> None:         print(f"\nassistant on_message_done > {message}\n", end="", flush=True)     @override     def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:         # print(f"\nassistant on_message_delta > {delta}\n", end="", flush=True)         pass     def on_tool_call_created(self, tool_call):         # 4         print(f"\nassistant on_tool_call_created > {tool_call}")         self.function_name = tool_call.function.name         self.tool_id = tool_call.id         print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")         print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)                 keep_retrieving_run = client.beta.threads.runs.retrieve(             thread_id=self.thread_id, run_id=self.run_id         )         while keep_retrieving_run.status in ["queued", "in_progress"]:             keep_retrieving_run = client.beta.threads.runs.retrieve(                 thread_id=self.thread_id, run_id=self.run_id             )             print(f"\nSTATUS: {keep_retrieving_run.status}")     @override     def on_tool_call_done(self, tool_call: ToolCall) -> None:         # tool_call이 끝났을 때         keep_retrieving_run = client.beta.threads.runs.retrieve(             thread_id=self.thread_id, run_id=self.run_id         )         # 현재 retrieving_run 정보         print(f"\nDONE STATUS: {keep_retrieving_run.status}")         if keep_retrieving_run.status == "completed":             all_messages = client.beta.threads.messages.list(thread_id=thread_id.id)             print(all_messages.data[0].content[0].text.value, "", "")             return         elif keep_retrieving_run.status == "requires_action":             # keep_retrieving_run.status가 actoin을 요구하면             print("here you would call your function")             if self.function_name in functoins_map:                 # 해당 요구하는 action이 내가 정한 함수 안에 있을 때                 print(self.arguments)                 with client.beta.threads.runs.submit_tool_outputs_stream(                     thread_id=self.thread_id,                     run_id=self.run_id,                     tool_outputs=[                         {                             "tool_call_id": self.tool_id,                             "output": functoins_map[self.function_name](                                 json.loads(self.arguments)                             ),                         }                     ],                     event_handler=EventHandler(self.thread_id, self.assistant_id),                 ) as stream:                     stream.until_done()                 # 해당 action에 대한 stream을 만듦             else:                 print("unknown function")                 return     @override     def on_run_step_created(self, run_step: RunStep) -> None:         # 2         print(f"on_run_step_created")         self.run_id = run_step.run_id         self.run_step = run_step         print("The type ofrun_step run step is ", type(run_step), flush=True)         print(f"\n run step created assistant > {run_step}\n", flush=True)     @override     def on_run_step_done(self, run_step: RunStep) -> None:         print(f"\n run step done assistant > {run_step}\n", flush=True)     def on_tool_call_delta(self, delta, snapshot):         if delta.type == "function":             print("Get function argument")             # the arguments stream thorugh here and then you get the requires action event             print(delta.function.arguments, end="", flush=True)             self.arguments += delta.function.arguments         elif delta.type == "code_interpreter":             print(f"on_tool_call_delta > code_interpreter")             if delta.code_interpreter.input:                 print(delta.code_interpreter.input, end="", flush=True)             if delta.code_interpreter.outputs:                 print(f"\n\noutput >", flush=True)                 for output in delta.code_interpreter.outputs:                     if output.type == "logs":                         print(f"\n{output.logs}", flush=True)         else:             print("ELSE")             print(delta, end="", flush=True)     @override     def on_event(self, event: AssistantStreamEvent) -> None:         # print("In on_event of event is ", event.event, flush=True)         if event.event == "thread.run.requires_action":             print("\nthread.run.requires_action > submit tool call")             print(f"ARGS: {self.arguments}") with client.beta.threads.runs.stream(     thread_id=thread_id,     assistant_id=assistant_id,     instructions="You help user do research on publicly traded companies and you help them decide if they should buy the stock or not.",     event_handler=EventHandler(thread_id, assistant_id), ) as stream:     stream.until_done() ```