Source code for qianfan.resources.llm.embedding

# Copyright (c) 2023 Baidu, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from functools import partial
from typing import Any, AsyncIterator, Dict, Iterator, List, Optional, Union

import qianfan.errors as errors
from qianfan.consts import DefaultLLMModel, DefaultValue
from qianfan.resources.llm.base import (
    UNSPECIFIED_MODEL,
    BaseResource,
    BatchRequestFuture,
)
from qianfan.resources.typing import JsonBody, QfLLMInfo, QfResponse


[docs]class Embedding(BaseResource): """ QianFan Embedding is an agent for calling QianFan embedding API. """ @classmethod def _supported_models(cls) -> Dict[str, QfLLMInfo]: """ preset model list of Embedding support model: - Embedding-V1 - bge-large-en - bge-large-zh Args: None Returns: a dict which key is preset model and value is the endpoint """ return { "Embedding-V1": QfLLMInfo( endpoint="/embeddings/embedding-v1", required_keys={"input"}, optional_keys={"user_id"}, ), "bge-large-en": QfLLMInfo( endpoint="/embeddings/bge_large_en", required_keys={"input"}, optional_keys={"user_id"}, ), "bge-large-zh": QfLLMInfo( endpoint="/embeddings/bge_large_zh", required_keys={"input"}, optional_keys={"user_id"}, ), "tao-8k": QfLLMInfo( endpoint="/embeddings/tao_8k", required_keys={"input"}, optional_keys={"user_id"}, ), UNSPECIFIED_MODEL: QfLLMInfo( endpoint="", required_keys={"input"}, optional_keys=set() ), } @classmethod def _default_model(cls) -> str: """ default model of Embedding `Embedding-V1` Args: None Returns: "Embedding-V1" """ return DefaultLLMModel.Embedding def _generate_body( self, model: Optional[str], endpoint: str, stream: bool, **kwargs: Any ) -> JsonBody: """ need to check whether stream is set in Embedding """ if stream is True: raise errors.InvalidArgumentError("Stream is not supported for embedding") if "texts" not in kwargs: raise errors.ArgumentNotFoundError("input not found in kwargs") kwargs["input"] = kwargs["texts"] del kwargs["texts"] return super()._generate_body(model, endpoint, stream, **kwargs) def _convert_endpoint(self, model: Optional[str], endpoint: str) -> str: """ convert endpoint to Embedding API endpoint """ return f"/embeddings/{endpoint}"
[docs] def do( self, texts: List[str], model: Optional[str] = None, endpoint: Optional[str] = None, stream: bool = False, retry_count: int = DefaultValue.RetryCount, request_timeout: float = DefaultValue.RetryTimeout, request_id: Optional[str] = None, backoff_factor: float = DefaultValue.RetryBackoffFactor, **kwargs: Any, ) -> Union[QfResponse, Iterator[QfResponse]]: """ Generate embeddings for a list of input texts using a specified model. Parameters: texts (List[str]): A list of input texts for which embeddings need to be generated. model (Optional[str]): The name or identifier of the language model to use. If not specified, the default model is used(ERNIE-Bot-turbo). endpoint (Optional[str]): The endpoint for making API requests. If not provided, the default endpoint is used. stream (bool): If set to True, the responses are streamed back as an iterator. If False, a single response is returned. retry_count (int): The number of times to retry the request in case of failure. request_timeout (float): The maximum time (in seconds) to wait for a response from the model. backoff_factor (float): A factor to increase the waiting time between retry attempts. kwargs (Any): Additional keyword arguments that can be passed to customize the request. Additional parameters like `temperature` will vary depending on the model, please refer to the API documentation. The additional parameters can be passed as follows: ``` Embedding().do(texts = ..., temperature = 0.2, top_p = 0.5) ``` """ kwargs["texts"] = texts if request_id is not None: kwargs["request_id"] = request_id return self._do( model, endpoint, stream, retry_count, request_timeout, backoff_factor, **kwargs, )
[docs] async def ado( self, texts: List[str], model: Optional[str] = None, endpoint: Optional[str] = None, stream: bool = False, retry_count: int = DefaultValue.RetryCount, request_timeout: float = DefaultValue.RetryTimeout, request_id: Optional[str] = None, backoff_factor: float = DefaultValue.RetryBackoffFactor, **kwargs: Any, ) -> Union[QfResponse, AsyncIterator[QfResponse]]: """ Async generate embeddings for a list of input texts using a specified model. Parameters: texts (List[str]): A list of input texts for which embeddings need to be generated. model (Optional[str]): The name or identifier of the language model to use. If not specified, the default model is used(ERNIE-Bot-turbo). endpoint (Optional[str]): The endpoint for making API requests. If not provided, the default endpoint is used. stream (bool): If set to True, the responses are streamed back as an iterator. If False, a single response is returned. retry_count (int): The number of times to retry the request in case of failure. request_timeout (float): The maximum time (in seconds) to wait for a response from the model. backoff_factor (float): A factor to increase the waiting time between retry attempts. kwargs (Any): Additional keyword arguments that can be passed to customize the request. Additional parameters like `temperature` will vary depending on the model, please refer to the API documentation. The additional parameters can be passed as follows: ``` Embedding().do(texts = ..., temperature = 0.2, top_p = 0.5) ``` """ kwargs["texts"] = texts if request_id is not None: kwargs["request_id"] = request_id return await self._ado( model, endpoint, stream, retry_count, request_timeout, backoff_factor, **kwargs, )
[docs] def batch_do( self, texts_list: List[List[str]], worker_num: Optional[int] = None, **kwargs: Any, ) -> BatchRequestFuture: """ Batch generate embeddings for a list of input texts using a specified model. Parameters: texts_list (List[List[str]]): List of the input text list to generate the embeddings. worker_num (Optional[int]): The number of prompts to process at the same time, default to None, which means this number will be decided dynamically. kwargs (Any): Please refer to `Completion.do` for other parameters such as `model`, `endpoint`, `retry_count`, etc. ``` response_list = Completion().batch_do(["...", "..."], worker_num = 10) for response in response_list: # return QfResponse if succeed, or exception will be raised print(response.result()) # or while response_list.finished_count() != response_list.task_count(): time.sleep(1) print(response_list.results()) ``` """ task_list = [partial(self.do, texts=texts, **kwargs) for texts in texts_list] return self._batch_request(task_list, worker_num)
[docs] async def abatch_do( self, texts_list: List[List[str]], worker_num: Optional[int] = None, **kwargs: Any, ) -> List[Union[QfResponse, AsyncIterator[QfResponse]]]: """ Async batch generate embeddings for a list of input texts using a specified model. Parameters: texts_list (List[List[str]]): List of the input text list to generate the embeddings. worker_num (Optional[int]): The number of prompts to process at the same time, default to None, which means this number will be decided dynamically. kwargs (Any): Please refer to `Embedding.ado` for other parameters such as `model`, `endpoint`, `retry_count`, etc. ``` response_list = await Embedding().abatch_do([...], worker_num = 10) for response in response_list: # response is `QfResponse` if succeed, or response will be exception print(response) ``` """ tasks = [self.ado(texts=texts, **kwargs) for texts in texts_list] return await self._abatch_request(tasks, worker_num)