classHttpxBackend(OpenAIBackendMixin):SERIALIZER=HttpxSerializerDEFAULT_SRC="https://api.reka.ai/v1/chat"DEFAULT_MODEL="reka-core-20240501"def__init__(self,api_key=None,endpoint:str|None=None,serializer:Serializer|None=None,**kwargs)->None:"""Initializes the CompleteBackend. Defaults to using the API key from the environment and. Args: api_key (Optional[str]): The API key for the Complete service. endpoint (str): The base URL for the Complete API. serializer (Optional[Serializer]): The serializer to use for serializing messages. """self.base_url=endpointorself.DEFAULT_SRCself.api_key=api_keyoros.getenv("MBODI_API_KEY")self.headers=Noneifnotself.api_key:self.headers={"Content-Type":"application/json"}else:self.headers={"X-Api-Key":self.api_key,"Content-Type":"application/json"}self.serialized=serializerorself.SERIALIZERself.kwargs=kwargsself.DEFAULT_MODEL=kwargs.get("model",self.DEFAULT_MODEL)@overloaddefpredict(self,messages:List[Message],model:str|None=None,**kwargs)->str:...@overloaddefpredict(self,message:Message,context:List[Message]|None=None,model:str|None=None,**kwargs)->str:...defpredict(self,message_or_messages,context:List[Message]|None=None,model:str|None=None,**kwargs)->str:# Determine if the input is a list of messages or a single messageifisinstance(message_or_messages,list):messages=message_or_messageselse:# If a single message is provided, append it to the contextmessages=(contextor[])+[message_or_messages]model=modelorself.DEFAULT_MODELdata={"messages":[self.serialized(msg).serialize()formsginmessages],"model":model,"stream":False,**kwargs,}withhttpx.Client(trust_env=True)asclient:response=client.post(self.base_url,headers=self.headers,json=data,timeout=kwargs.get("timeout",60),follow_redirects=True)# Process responseifresponse.status_code==200:response_data=response.json()returnself.serialized.extract_response(response_data)response.raise_for_status()returnresponse.text@overloaddefstream(self,messages:List[Message],model:str|None=None,**kwargs)->Generator[str,None,None]:...@overloaddefstream(self,message:Message,context:List[Message]|None=None,model:str|None=None,**kwargs)->Generator[str,None,None]:...defstream(self,message_or_messages,context:List[Message]|None=None,model:str|None=None,**kwargs)->Generator[str,None,None]:ifisinstance(message_or_messages,list):messages=message_or_messageselse:# If a single message is provided, append it to the contextmessages=(contextor[])+[message_or_messages]model=modelorself.DEFAULT_MODELyield fromself._stream_completion(messages,model,**kwargs)@overloadasyncdefastream(self,messages:List[Message],model:str|None=None,**kwargs)->AsyncGenerator[str,None]:...@overloadasyncdefastream(self,message:Message,context:List[Message]|None=None,model:str|None=None,**kwargs)->AsyncGenerator[str,None]:...asyncdefastream(self,message_or_messages,context:List[Message]|None=None,model:str|None=None,**kwargs)->AsyncGenerator[str,None]:ifisinstance(message_or_messages,list):messages=message_or_messageselse:# If a single message is provided, append it to the contextmessages=(contextor[])+[message_or_messages]model=modelorself.DEFAULT_MODELasyncforchunkinself._astream_completion(messages,model,**kwargs):yieldchunkdef_stream_completion(self,messages:List[Message],model:str|None=None,**kwargs,)->Generator[str,None,None]:model=modelorself.DEFAULT_MODELdata={"messages":[self.serialized(msg).serialize()formsginmessages],"model":model,"stream":True,**kwargs,}data.update(kwargs)with(httpx.Client(follow_redirects=True)asclient,client.stream("post",self.base_url,headers=self.headers,json=data,timeout=kwargs.get("timeout",60))asstream,):forchunkinstream.iter_text():yieldself.serialized.extract_stream(chunk)asyncdef_acreate_completion(self,messages:List[Message],model:str|None=None,**kwargs)->str:model=modelorself.DEFAULT_MODELdata={"messages":[self.serialized(msg).serialize()formsginmessages],"model":model,"stream":False,**kwargs,}data.update(kwargs)asyncwithhttpx.AsyncClient(timeout=-1)asclient:response=awaitclient.post(self.base_url,headers=self.headers,json=data,timeout=kwargs.get("timeout",60))ifresponse.status_code==200:response_data=response.json()returnself.serialized.extract_response(response_data)returnresponse.textasyncdef_astream_completion(self,messages:List[Message],model:str|None=None,**kwargs)->AsyncGenerator[str,None]:model=modelorself.DEFAULT_MODELdata={"messages":[self.serialized(msg).serialize()formsginmessages],"model":model,"stream":True,**kwargs,}data.update(kwargs)asyncwithhttpx.AsyncClient(follow_redirects=True)asclient:stream=client.stream("POST",self.base_url,headers=self.headers,json=data,timeout=60)asyncwithstreamasresponse:asyncforchunkinresponse.aiter_text():yieldself.serialized.extract_stream(chunk)
Source code in mbodied/agents/backends/httpx_backend.py
6162636465666768697071727374757677787980
def__init__(self,api_key=None,endpoint:str|None=None,serializer:Serializer|None=None,**kwargs)->None:"""Initializes the CompleteBackend. Defaults to using the API key from the environment and. Args: api_key (Optional[str]): The API key for the Complete service. endpoint (str): The base URL for the Complete API. serializer (Optional[Serializer]): The serializer to use for serializing messages. """self.base_url=endpointorself.DEFAULT_SRCself.api_key=api_keyoros.getenv("MBODI_API_KEY")self.headers=Noneifnotself.api_key:self.headers={"Content-Type":"application/json"}else:self.headers={"X-Api-Key":self.api_key,"Content-Type":"application/json"}self.serialized=serializerorself.SERIALIZERself.kwargs=kwargsself.DEFAULT_MODEL=kwargs.get("model",self.DEFAULT_MODEL)