Source code for qianfan.evaluation.evaluation_manager

# Copyright (c) 2023 Baidu, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
manager which manage whole procedure of evaluation
"""
import math
import multiprocessing
import os.path
import time
from concurrent.futures import ALL_COMPLETED, Future, ThreadPoolExecutor, wait
from copy import copy
from typing import Any, Dict, List, Optional, Sequence, Set, Union

import pyarrow

from qianfan import get_config
from qianfan.dataset import Dataset
from qianfan.dataset.consts import (
    LLMOutputColumnName,
    LLMTagColumnName,
    OldReferenceColumnName,
)
from qianfan.dataset.data_source import FileDataSource, QianfanDataSource
from qianfan.dataset.data_source.utils import (
    _download_file_from_url_streamly,
)
from qianfan.dataset.schema import EvaluationSchema
from qianfan.errors import QianfanError
from qianfan.evaluation.consts import QianfanRefereeEvaluatorPromptTemplate
from qianfan.evaluation.evaluation_result import EvaluationResult
from qianfan.evaluation.evaluator import (
    LocalEvaluator,
    QianfanEvaluator,
    QianfanManualEvaluator,
    QianfanRefereeEvaluator,
    QianfanRuleEvaluator,
)
from qianfan.model import Model, Service
from qianfan.resources import Model as ResourceModel
from qianfan.resources.console.consts import (
    EvaluationResultExportTaskStatus,
    EvaluationTaskStatus,
)
from qianfan.utils import log_debug, log_error, log_info, log_warn
from qianfan.utils.pydantic import BaseModel, Field, root_validator
from qianfan.utils.utils import generate_letter_num_random_id


[docs]class EvaluationManager(BaseModel): """logic control center of evaluation""" local_evaluators: Optional[List[LocalEvaluator]] = Field(default=None) qianfan_evaluators: Optional[List[QianfanEvaluator]] = Field(default=None) task_id: Optional[str] = Field(default=None) @root_validator @classmethod def _check_evaluators(cls, input_dict: Any) -> Any: """校验传入的参数""" if not isinstance(input_dict, dict): err_msg = ( "the arguments of model validator of EvaluationManager isn't dict," f" rather {type(input_dict)}" ) log_error(err_msg) raise ValueError(err_msg) local_evaluators = input_dict.get("local_evaluators", None) qianfan_evaluators = input_dict.get("qianfan_evaluators", None) if not local_evaluators and not qianfan_evaluators: # 如果都没设置,则报错抛出 err_msg = "none of local evaluator and qianfan evaluator has been set" log_error(err_msg) raise ValueError(err_msg) if local_evaluators and qianfan_evaluators: # 如果同时设置,则报错抛出 err_msg = "both local evaluator and qianfan evaluator has been set" log_error(err_msg) raise ValueError(err_msg) if qianfan_evaluators: dedup_map: Set[str] = set() for evaluator in qianfan_evaluators: type_name = f"{type(evaluator)}" if type_name in dedup_map: err_msg = f"multiple {type_name} has been set in qianfan_evaluators" log_error(err_msg) raise ValueError dedup_map.add(type_name) return input_dict def _eval_worker( self, start: int, end: int, input: List[Union[str, List[Dict[str, Any]]]], reference: List[str], output: List[str], ) -> List[Dict[str, Any]]: result_list: List[Dict[str, Any]] = [] if start >= end: return result_list assert self.local_evaluators for i in range(start, end): result: Dict[str, Any] = {} for evaluator in self.local_evaluators: result.update(evaluator.evaluate(input[i], reference[i], output[i])) result_list.append(result) return result_list def _get_eval_task_future(self, dataset: Dataset, **kwargs: Any) -> List[Future]: input_column_name = dataset.eval_input_column reference_column_name = dataset.reference_column output_column_name = dataset.eval_llm_output_column ds_dict = dataset.col_list( [input_column_name, reference_column_name, output_column_name] ) sector_length = math.ceil(len(dataset) / multiprocessing.cpu_count()) pool = ThreadPoolExecutor() future_list: List[Future] = [] for i in range(multiprocessing.cpu_count()): future_list.append( pool.submit( self._eval_worker, i * sector_length, min((i + 1) * sector_length, len(dataset)), ds_dict[input_column_name], ds_dict[reference_column_name], ds_dict[output_column_name], ) ) return future_list def _run_evaluator_locally( self, dataset: Dataset, **kwargs: Any ) -> List[Dict[str, Any]]: future_list = self._get_eval_task_future(dataset, **kwargs) wait(future_list, return_when=ALL_COMPLETED) result_list: List[Dict[str, Any]] = [] for future in future_list: try: result = future.result() result_list.extend(result) except Exception as e: err_msg = ( f"an fatal error occurred when evaluate llm output in batch: {e}" ) log_error(err_msg) raise e return result_list def _get_llm_tags(self, llms: Sequence[Union[Model, Service]]) -> List[str]: llm_tags: List[str] = [] for llm in llms: if isinstance(llm, Service): if llm.model: llm_key_str = f"{llm.id}_{llm.endpoint}_{llm.model.name}" else: llm_key_str = f"{llm.id}_{llm.endpoint}" elif isinstance(llm, Model): llm_key_str = f"{llm.id}_{llm.version_id}_{llm.name}" else: llm_key_str = "" llm_tags.append(llm_key_str) return llm_tags def _get_batch_inference_task_future( self, llms: Sequence[Union[Model, Service]], dataset: Dataset, **kwargs: Any ) -> Dict[int, Future]: future_dict: Dict[int, Future] = {} pool = ThreadPoolExecutor() for index in range(len(llms)): llm = llms[index] if isinstance(llm, Model): future_dict[index] = pool.submit( dataset.test_using_llm, model_version_id=llm.version_id, **kwargs, ) elif isinstance(llm, Service): model_name = None if not llm.model else llm.model.name future_dict[index] = pool.submit( dataset.test_using_llm, service_model=model_name, service_endpoint=llm.endpoint, **kwargs, ) return future_dict def _get_qianfan_evaluator_configuration_dict(self) -> Dict[str, Any]: # 对输入数据做映射 input_argument_dict: Dict[str, Any] = {} assert self.qianfan_evaluators for evaluator in self.qianfan_evaluators: # 如果处理的是人工评估 if isinstance(evaluator, QianfanManualEvaluator): input_argument_dict["evalMode"] = ( input_argument_dict.get("evalMode", "") + "manual," ) # 超过 4 个指标则截断,对齐平台 dimensions = evaluator.evaluation_dimensions[:] if len(evaluator.evaluation_dimensions) > 4: log_warn( "the number of evaluation dimension is more than 4, the" " dimensions will be truncated" ) dimensions = dimensions[:4] # 创建指标字典 input_dimension_list: List[Dict[str, Any]] = [] for dimension in dimensions: input_dimension_dict: Dict[str, Any] = { "dimension": dimension.dimension } if dimension.description: input_dimension_dict["description"] = dimension.description input_dimension_list.append(input_dimension_dict) input_argument_dict["evaluationDimension"] = input_dimension_list # 如果处理的是基于规则的评估 if isinstance(evaluator, QianfanRuleEvaluator): input_argument_dict["evalMode"] = ( input_argument_dict.get("evalMode", "") + "rule," ) rule_list: List[str] = [] if evaluator.using_similarity: rule_list.append("similarity") if evaluator.using_accuracy: rule_list.append("accuracy") if not rule_list: err_msg = "no rule has been set despite using QianfanRuleEvaluator" log_error(err_msg) raise ValueError(err_msg) input_argument_dict["scoreModes"] = rule_list # 添加停用词表 if evaluator.stop_words: input_argument_dict["stopWordsPath"] = evaluator.stop_words # 如果处理的是基于裁判员的打分 if isinstance(evaluator, QianfanRefereeEvaluator): input_argument_dict["evalMode"] = ( input_argument_dict.get("evalMode", "") + "model," ) input_argument_dict["appId"] = evaluator.app_id input_argument_dict["prompt"] = { "templateContent": QianfanRefereeEvaluatorPromptTemplate, "metric": evaluator.prompt_metrics, "steps": evaluator.prompt_steps, "maxScore": evaluator.prompt_max_score, } input_argument_dict["evalMode"] = input_argument_dict.get("evalMode", "").strip( "," ) return input_argument_dict def _create_qianfan_evaluation_task_and_wait_to_success( self, llms: Sequence[Union[Model, Service]], dataset_id: str, eval_config: Dict[str, Any], **kwargs: Any, ) -> EvaluationTaskStatus: model_objs: List[Model] = llms # type: ignore for i in range(len(model_objs)): model_objs[i].auto_complete_info() resp_body = ResourceModel.create_evaluation_task( name=f"sdk_eval_{generate_letter_num_random_id(11)}", version_info=[ {"modelId": model.id, "modelVersionId": model.version_id} for model in model_objs ], dataset_id=dataset_id, eval_config=eval_config, **kwargs, ).body eval_id = resp_body["result"]["evalIdStr"] task_url = f"https://console.bce.baidu.com/qianfan/modelcenter/model/eval/detail/task/{eval_id}" self.task_id = eval_id log_info(f"please check webpage {task_url} to get further information") # 开始轮询任务进度 while True: eval_info = ResourceModel.get_evaluation_info(eval_id) eval_state = EvaluationTaskStatus(eval_info["result"]["state"]) log_debug(f"current evaluation task info: {eval_info}") log_info(f"current eval_state: {eval_state}") if eval_state not in [ EvaluationTaskStatus.Pending, EvaluationTaskStatus.Doing, ]: break time.sleep(get_config().EVALUATION_ONLINE_POLLING_INTERVAL) if eval_state not in [ EvaluationTaskStatus.DoingWithManualBegin, EvaluationTaskStatus.Done, ]: err_msg = f"can't finish evaluation task and failed with state {eval_state}" log_error(err_msg) raise QianfanError(err_msg) return eval_state
[docs] def eval_only( self, dataset: Dataset, **kwargs: Any, ) -> EvaluationResult: """ running evaluation only on specific dataset Args: dataset (Dataset): dataset which comes from batch inference or be batch-inference like **kwargs (Any): other keyword arguments. Returns: EvaluationResult: Evaluation result of models on the dataset. """ if not dataset.eval_input_column or not dataset.eval_llm_output_column: err_msg = ( "either eval_input_column or eval_llm_output_column didn't been set" ) log_error(err_msg) raise ValueError(err_msg) dataset = copy(dataset) if not dataset.reference_column: dataset.reference_column = OldReferenceColumnName dataset.col_append( {OldReferenceColumnName: [None for _ in range(len(dataset))]} ) if not EvaluationSchema().validate(dataset): raise ValueError("validate failed before evaluation") tmp_ds = Dataset.create_from_pyobj( self._run_evaluator_locally(dataset, **kwargs) ) return EvaluationResult(result_dataset=dataset.col_append(tmp_ds.col_list()))
[docs] def eval( self, llms: Sequence[Union[Model, Service]], dataset: Dataset, **kwargs: Any ) -> Optional[EvaluationResult]: """ Evaluate the performance of models on the dataset. Args: llms (List[Union[Model, Service]]): List of models or service to be evaluated. dataset (Dataset): The dataset on which models will be evaluated. **kwargs (Any): Other keyword arguments. Returns: Optional[EvaluationResult]: Evaluation result of models on the dataset. """ if len(set(type(llm) for llm in llms)) > 1: err_msg = "should use only either Model or Service, not both togather" log_error(err_msg) raise ValueError(err_msg) if self.local_evaluators: # 大模型的标签列表 llm_tags = self._get_llm_tags(llms) # 检查是否有评估的标准答案列 # 如果没有,需要复制数据集并且在副本中添加空参考数据列 # 当且仅当使用的是 Service 对 Completion 补全数据集进行批量推理 # 且用户没有指定 reference_column 时使用这个逻辑 if not dataset.reference_column: dataset = copy(dataset) dataset.reference_column = OldReferenceColumnName dataset.col_append( {OldReferenceColumnName: [None for _ in range(len(dataset))]} ) # 首先获取批量评估的结果 log_info("start to inference in batch during evaluation") future_dict = self._get_batch_inference_task_future(llms, dataset, **kwargs) wait(list(future_dict.values()), return_when=ALL_COMPLETED) # 然后再等待批量推理的结果,并且送去评估 # 针对单个模型的,每条数据的评估结果字典 llm_evaluation_result_dict: Dict[int, List[Dict[str, Any]]] = {} # 针对单个模型的,每条数据的实际返回列表 llm_response_list: Dict[int, List[str]] = {} # 统一的输入数据列表 llm_input_list: List[Any] = [] # 统一的输出数据列表 expected_output_list: List[str] = [] input_column_name: str = "" for index, future in future_dict.items(): try: result = future.result() llm_response_list[index] = result[LLMOutputColumnName][ LLMOutputColumnName ] # 实际评估的地方 log_info(f"start to evaluate llm {index}") llm_evaluation_result_dict[index] = self._run_evaluator_locally( result, **kwargs ) assert isinstance(result, Dataset) # 做一些字段填充,只在这两个列表为空的时候进入 if not llm_input_list: llm_input_list = result[result.eval_input_column][ result.eval_input_column ] assert result.eval_input_column input_column_name = result.eval_input_column if not expected_output_list: expected_output_list = result[result.reference_column][ result.reference_column ] except Exception as e: err_msg = ( "an error occurred when doing batch inference in" f" evaluation: {e}" ) log_warn(err_msg) # 整合数据,将得到的数据集整合成网页人工评估的数据集格式 log_info("start to merge evaluation result dataset") table_list: List[pyarrow.Table] = [] for index, response_list in llm_response_list.items(): index_tag_column = [llm_tags[index] for _ in range(len(response_list))] ds = dataset.create_from_pyobj( { LLMTagColumnName: index_tag_column, input_column_name: llm_input_list, OldReferenceColumnName: expected_output_list, LLMOutputColumnName: response_list, } ) metrics_ds = dataset.create_from_pyobj( llm_evaluation_result_dict[index] ) ds.col_append(metrics_ds.col_list()) table_list.append(ds.inner_table) return EvaluationResult( result_dataset=Dataset.create_from_pyarrow_table( pyarrow.concat_tables(table_list) ) ) if self.qianfan_evaluators: # 检查是否有不支持的实例 if any([not isinstance(inst, Model) for inst in llms]): err_msg = "only Model instance can use QianfanEvaluator" log_error(err_msg) raise ValueError(err_msg) # 检查数据集是否在云上 if not dataset.is_dataset_located_in_qianfan(): err_msg = "dataset must be in qianfan, not local storage" log_error(err_msg) raise ValueError(err_msg) # 检查数据集是否是泛文本 if dataset.is_dataset_generic_text(): err_msg = "dataset can't be generic text dataset" log_error(err_msg) raise ValueError(err_msg) input_argument_dict = self._get_qianfan_evaluator_configuration_dict() qianfan_data_source = dataset.inner_data_source_cache assert isinstance(qianfan_data_source, QianfanDataSource) # 提交评估任务 eval_state = self._create_qianfan_evaluation_task_and_wait_to_success( llms, qianfan_data_source.id, input_argument_dict ) # 如果是进入人工评估阶段,则返回空 if eval_state == EvaluationTaskStatus.DoingWithManualBegin: log_warn("can't fetch any metrics due to manual mode has been set") return None assert self.task_id result_list = ResourceModel.get_evaluation_result(self.task_id)["result"] metric_list: Dict[str, Dict[str, Any]] = { f'{result["modelName"]}_{result["modelVersion"]}': result[ "effectMetric" ] for result in result_list } export_task_id = ResourceModel.create_evaluation_result_export_task( self.task_id, **kwargs )["result"]["exportIDStr"] polling_interval = get_config().EXPORT_STATUS_POLLING_INTERVAL while True: result = ResourceModel.get_evaluation_result_export_task_status( export_task_id )["result"] task_status = EvaluationResultExportTaskStatus(result["state"]) if task_status in [ EvaluationResultExportTaskStatus.Uploading, EvaluationResultExportTaskStatus.Pending, ]: log_info( f"wait evaluation result export task {export_task_id} to be" " completed" ) time.sleep(polling_interval) continue if task_status == EvaluationResultExportTaskStatus.Fail: err_msg = "evaluation result export failed" log_error(err_msg) raise QianfanError(err_msg) if task_status == EvaluationResultExportTaskStatus.Done: log_info("export result succeeded") break err_msg = "enter logic unreachable" log_error(err_msg) raise Exception(err_msg) download_url = result["downloadUrl"] log_info(f"start to download evaluation result file from {download_url}") local_cache_file_path = "tmp.csv" try: _download_file_from_url_streamly(download_url, local_cache_file_path) # 返回指标信息 return EvaluationResult( result_dataset=Dataset.load( FileDataSource(path=local_cache_file_path) ), metrics=metric_list, ) finally: if os.path.exists(local_cache_file_path): os.remove(local_cache_file_path) return None