Source code for qianfan.dataset.dataset

# Copyright (c) 2023 Baidu, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
dataset core concept, a wrap of data processing, data transmission and data validation
"""
import codecs
import csv
import functools
import io
import json
import os
from copy import deepcopy
from time import sleep
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from zipfile import ZipFile

import pyarrow.json
from pyarrow import Table as PyarrowTable
from pyarrow import csv as pyarrow_csv
from typing_extensions import Self

from qianfan import Completion, QfRole, get_config
from qianfan.common import Prompt
from qianfan.dataset import FormatType
from qianfan.dataset.consts import (
    FirstTokenLatencyColumnName,
    LLMOutputColumnName,
    NewInputChatColumnName,
    NewInputPromptColumnName,
    OldReferenceColumnName,
    QianfanDataGroupColumnName,
    QianfanDatasetPackColumnName,
    RequestLatencyColumnName,
)
from qianfan.dataset.data_source import (
    BosDataSource,
    DataSource,
    FileDataSource,
    QianfanDataSource,
)
from qianfan.dataset.dataset_utils import (
    _async_batch_do_on_service,
    _batch_do_on_service,
    _check_and_generate_service,
    _check_online_data_process_result,
    _create_a_dataset_etl_task,
    _extract_string,
    _get_qianfan_schema,
    _list_cloud_data,
    _start_an_evaluation_task_for_model_batch_inference,
)
from qianfan.dataset.qianfan_data_operators import QianfanOperator
from qianfan.dataset.schema import (
    QianfanSchema,
    Schema,
)
from qianfan.dataset.table import Table
from qianfan.dataset.table_utils import _construct_table_from_nest_sequence
from qianfan.errors import ValidationError
from qianfan.resources import Data, Model
from qianfan.resources.console.consts import (
    DataTemplateType,
)
from qianfan.utils import log_debug, log_error, log_info
from qianfan.utils.utils import generate_letter_num_random_id


# 装饰器,用来阻塞部分对云上数据集(非本地)的操作请求
def _online_except_decorator(func: Callable) -> Callable:
    @functools.wraps(func)
    def inner(dataset: Any, *args: Any, **kwargs: Any) -> Any:
        if dataset._is_dataset_located_in_qianfan():  # noqa
            raise Exception()
        return func(dataset, *args, **kwargs)

    return inner


[docs]class Dataset(Table): """Dataset""" def __init__( self, inner_table: PyarrowTable, inner_data_source_cache: Optional[DataSource] = None, inner_schema_cache: Optional[Schema] = None, input_columns: Optional[List[str]] = None, reference_column: Optional[str] = None, eval_input_column: Optional[str] = None, eval_llm_output_column: Optional[str] = None, **kwargs: Any, ) -> None: """ Init a Dataset Object Args: inner_table (PyarrowTable): a pyarrow.Table object wrapped by Table inner_data_source_cache (Optional[DataSource]): a data source cache where the dataset was loaded from inner_schema_cache (Optional[Schema]): schema cache used when dataset was loaded input_columns (Optional[List[str]]): which columns should be extracted as inputs reference_column (Optional[str]): which column should be extracted as reference eval_input_column (Optional[str]): evaluation input column name in dataset eval_llm_output_column (Optional[str]): llm output column name in dataset for evaluating **kwargs (Any): optional arguments """ super().__init__(inner_table) # 内部的数据源对象,在 load 时被指定 self.inner_data_source_cache: Optional[DataSource] = inner_data_source_cache # schema 对象的缓存,在 load 时被指定 self.inner_schema_cache: Optional[Schema] = inner_schema_cache # 批量推理输入列的列名列表 self.input_columns = input_columns # 批量推理以及评估时的预期结果列的列名 self.reference_column = reference_column # 只运行评估时,评估的输入列的列名 self.eval_input_column = eval_input_column # 只运行评估时,评估的大模型回答列列名 self.eval_llm_output_column = eval_llm_output_column @classmethod def _from_source( cls, source: DataSource, schema: Optional[Schema], is_a_text_file_an_entry: bool = False, **kwargs: Any, ) -> "Dataset": """ 内部封装的从数据源导出字节流并构建数据集的方法 当设置了 is_a_text_file_an_entry = True, 且是读取 txt 格式的文件夹数据,则此时将 一个文件中的所有文本作为一条数据,而不是 按照一行文本作为一条数据 """ if isinstance(source, QianfanDataSource) and not source.download_when_init: # 如果是云上的数据集,则直接创建空表。 # 云上数据集的相关处理能力暂不可用 log_info("a cloud dataset has been created") return cls( inner_table=pyarrow.Table.from_pylist([{"place_holder": 1}]), inner_data_source_cache=source, inner_schema_cache=schema, ) # 从数据源获取字符串格式的数据集。以及数据集的解析格式 content = source.fetch(**kwargs) format_type = source.format_type() log_debug( f"content (type: {format_type}) fetched from data source: \n{content}" ) if isinstance(content, str): content = [content] if format_type == FormatType.Json: json_dict_list: List[Dict[str, Any]] = [] for str_content in content: data_py_rep = json.loads(str_content, strict=False) # 如果导入的是一个字典,则需要转换成列表才能被读取 if not isinstance(data_py_rep, list): data_py_rep = [data_py_rep] json_dict_list.extend(data_py_rep) pyarrow_table = pyarrow.Table.from_pylist(json_dict_list) elif format_type == FormatType.Jsonl: json_data_list: List[Dict[str, Any]] = [] for str_content in content: tmp_list = [ json.loads(line, strict=False) for line in str_content.split("\n") if line ] json_data_list.extend(tmp_list) if not json_data_list: raise ValueError("no data in jsonline file") if isinstance(json_data_list[0], list): pyarrow_table = _construct_table_from_nest_sequence(json_data_list) elif isinstance(json_data_list[0], dict): # 如果读取的是一个 Json 字典的列表,则正常存储,此时行列的处理能力可用 pyarrow_table = pyarrow.Table.from_pylist(json_data_list) else: error = TypeError( f"unknown table element type: {type(json_data_list[0])}" ) log_error(str(error)) raise error elif format_type == FormatType.Csv: # csv 不支持嵌套格式 csv_data: List[Dict[str, Any]] = [] for str_content in content: string_buffer = io.StringIO( str_content.strip(codecs.BOM_UTF8.decode(encoding="utf-8")) ) tmp_data = [row for row in csv.DictReader(string_buffer)] csv_data.extend(tmp_data) pyarrow_table = pyarrow.Table.from_pylist(csv_data) elif format_type == FormatType.Text: # 如果是纯文本,则放置在 _pack 一列下 line_data: List[str] = [] for str_content in content: # 如果指定了按照文件为粒度进行读取, # 则此时一行数据就是一个文本中的所有文件 if is_a_text_file_an_entry: line_data.append(str_content) else: line_data.extend(str_content.split("\n")) pyarrow_table = pyarrow.Table.from_pydict( {QianfanDatasetPackColumnName: line_data} ) else: error = ValueError(f"unknown format type: {format_type}") log_error(str(error)) raise error return cls( inner_table=pyarrow_table.combine_chunks(), # 性能优化,combine_chunks() inner_data_source_cache=source, inner_schema_cache=schema, **kwargs, ) def _to_source(self, source: DataSource, **kwargs: Any) -> bool: """内部封装的,将数据集序列化并导出字节流到数据源的方法""" format_type = source.format_type() log_info(f"export as format: {format_type}") if format_type == FormatType.Json: # 如果是 json,则直接导出,此时不关注内部是否嵌套。 dict_list = self.inner_table.to_pylist() return source.save(json.dumps(dict_list, ensure_ascii=False), **kwargs) elif format_type == FormatType.Jsonl: list_of_json: List[str] = [] # 如果是 Jsonl,则需要处理所有可能的情况 if self.is_dataset_packed(): log_info("enter packed deserialization logic") data_list = self.col_list(QianfanDatasetPackColumnName)[ QianfanDatasetPackColumnName ] for entity in data_list: list_of_json.append(json.dumps(entity, ensure_ascii=False)) elif self.is_dataset_grouped(): log_info("enter grouped deserialization logic") self._squash_group_number() compo_list: List[List[Dict[str, Any]]] = [] for row in self.inner_table.to_pylist(): group_index = row[QianfanDataGroupColumnName] while group_index >= len(compo_list): compo_list.append([]) row.pop(QianfanDataGroupColumnName) compo_list[group_index].append(row) for elem in compo_list: list_of_json.append(json.dumps(elem, ensure_ascii=False)) elif isinstance(source, QianfanDataSource): # 导出到千帆且非嵌套时需要使用特殊格式,只支持文本类数据 log_info("enter qianfan deserialization logic") dict_list = self.inner_table.to_pylist() for elem in dict_list: list_of_json.append(f"[{json.dumps(elem, ensure_ascii=False)}]") else: log_info("enter else logic") dict_list = self.inner_table.to_pylist() for elem in dict_list: list_of_json.append(json.dumps(elem, ensure_ascii=False)) if isinstance(source, FileDataSource) and source.save_as_folder: return source.save(list_of_json, **kwargs) else: return source.save("\n".join(list_of_json), **kwargs) elif format_type == FormatType.Csv: bytes_stream_buffer = io.BytesIO() bytes_stream_buffer.write(codecs.BOM_UTF8) pyarrow_csv.write_csv(self.inner_table, bytes_stream_buffer) return source.save(bytes_stream_buffer.getvalue().decode("utf-8"), **kwargs) elif format_type == FormatType.Text: # 导出为纯文本时,列的数量不可大于 1 if self.column_number() > 1: error = ValueError( "cannot export dataset to pure text if the number of column is" " greater than 1" ) log_error(str(error)) raise error result_list = list(self.inner_table.to_pydict().values())[0] if isinstance(source, (QianfanDataSource, BosDataSource)): tmp_zip_file_name = ( f"tmp_zip_file_{generate_letter_num_random_id()}.zip" ) try: with ZipFile(tmp_zip_file_name, mode="w") as tmp_zip: for i in range(len(result_list)): tmp_zip.writestr( f"generic_text_file_{i}.txt", data=result_list[i] ) result = source.save(zip_file_path=tmp_zip_file_name, **kwargs) return result finally: if os.path.exists(tmp_zip_file_name): os.remove(tmp_zip_file_name) if isinstance(source, FileDataSource) and source.save_as_folder: return source.save(result_list, **kwargs) else: return source.save("\n".join(result_list), **kwargs) else: error = ValueError(f"unknown format type: {format_type}") log_error(str(error)) raise error @classmethod def _from_args_to_source( cls, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, qianfan_dataset_create_args: Optional[Dict[str, Any]] = None, bos_load_args: Optional[Dict[str, Any]] = None, bos_source_args: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> Optional[DataSource]: """从参数来构建数据源""" if data_file: log_info( f"construct a file data source from path: {data_file}, with args:" f" {kwargs}" ) return FileDataSource(path=data_file, **kwargs) if qianfan_dataset_id: log_info( "construct a qianfan data source from existed id:" f" {qianfan_dataset_id}, with args: {kwargs}" ) return QianfanDataSource.get_existed_dataset( dataset_id=qianfan_dataset_id, **kwargs ) if qianfan_dataset_create_args: log_info( "construct a new qianfan data source from args:" f" {qianfan_dataset_create_args}, with args: {kwargs}" ) return QianfanDataSource.create_bare_dataset( **qianfan_dataset_create_args, **kwargs ) if bos_load_args: log_info( "construct a new qianfan data source from bos loading:" f" {bos_load_args}, with args: {kwargs}" ) return QianfanDataSource.create_from_bos_file(**bos_load_args) if bos_source_args: return BosDataSource(**bos_source_args, **kwargs) log_info("no datasource was constructed") return None def _set_qianfan_default_io_column(self) -> None: cache_data_source = self.inner_data_source_cache if not isinstance(cache_data_source, QianfanDataSource): return if cache_data_source.template_type in [ DataTemplateType.NonSortedConversation, DataTemplateType.SortedConversation, DataTemplateType.QuerySet, ]: self.input_columns = ["prompt"] if cache_data_source.template_type in [ DataTemplateType.NonSortedConversation, DataTemplateType.SortedConversation, ]: self.reference_column = "response"
[docs] @classmethod def load( cls, source: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, bos_load_args: Optional[Dict[str, Any]] = None, huggingface_dataset: Optional[Any] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, organize_data_as_group: bool = False, **kwargs: Any, ) -> "Dataset": """ Read data from the source or create a source from the parameters and create a Table instance. If a schema is specified, perform validation after importing. Args: source (Optional[DataSource]): where dataset load from, default to None,in which case, a datasource will be created inside dataset using parameters below data_file (Optional[str]): dataset local file path, default to None qianfan_dataset_id (Optional[str]): qianfan dataset ID, default to None bos_load_args: (Optional[Dict[str, Any]]): create a dataset and import initial dataset content from args huggingface_dataset (Optional[Dict[str, Any], Any]): Huggingface dataset object, only support DatasetDict and Dataset of Huggingface datasets. bos_source_args: (Optional[Dict[str, Any]]): create arguments for creating a file on specific bos default to None schema (Optional[Schema]): schema used to validate loaded data, default to None organize_data_as_group (bool): only available when data source's format is FormatType.Jsonl. Indicates whether organize data within dataset in group format, default to False, and when it's True, the default format will be a group-based 2D structure. **kwargs (Any): optional arguments Returns: Dataset: a dataset instance """ if not source: if huggingface_dataset is not None: log_info("construct dataset from huggingface dataset") if not hasattr(huggingface_dataset, "data"): err_msg = ( "huggingface_dataset should be either DatasetDict or Dataset of" " Huggingface datasets. " ) log_error(err_msg) raise ValueError(log_error) data = huggingface_dataset.data if isinstance(data, dict): log_info("construct from huggingface DatasetDict") pyarrow_table = pyarrow.concat_tables( [ds.table for ds in data.values()] ) return cls.create_from_pyarrow_table(pyarrow_table.combine_chunks()) elif hasattr(data, "table"): log_info("construct from huggingface Dataset") return cls.create_from_pyarrow_table(data.table.combine_chunks()) err_msg = ( f"get unsupported data type {type(data)} from huggingface dataset" ) log_error(err_msg) raise TypeError(err_msg) log_info("no data source was provided, construct") source = cls._from_args_to_source( data_file=data_file, qianfan_dataset_id=qianfan_dataset_id, bos_load_args=bos_load_args, bos_source_args=bos_source_args, **kwargs, ) # 从数据源开始构建对象 if not source: err_msg = "no data source or other arguments provided for loading" log_error(err_msg) raise ValueError(err_msg) table = cls._from_source(source, schema, **kwargs) # 校验 if schema and not schema.validate(table): error = ValidationError("validate failed when initialize dataset") log_error(str(error)) raise error table._set_qianfan_default_io_column() if table.is_dataset_grouped() and not organize_data_as_group: table.pack() return table
[docs] def save( self, destination: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, qianfan_dataset_create_args: Optional[Dict[str, Any]] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, replace_source: bool = False, **kwargs: Any, ) -> bool: """ Write data to source if a schema has been passed, validate data before exporting Args: destination (Optional[DataSource]): data source where dataset exports,default to None. in which case, a datasource will be created inside dataset using parameters below data_file (Optional[str]): dataset local file path, default to None qianfan_dataset_id (Optional[str]): qianfan dataset ID, default to None qianfan_dataset_create_args: (Optional[Dict[str: Any]]): create arguments for creating a bare dataset on qianfan, default to None bos_source_args: (Optional[Dict[str, Any]]): create arguments for creating a file on specific bos default to None schema: (Optional[Schema]): schema used to validate before exporting data, default to None replace_source: (bool): if replace the original source, default to False kwargs (Any): optional arguments Returns: bool: is saving succeeded """ if not destination: log_info("no destination data source was provided, construct") destination = self._from_args_to_source( data_file=data_file, qianfan_dataset_id=qianfan_dataset_id, qianfan_dataset_create_args=qianfan_dataset_create_args, bos_source_args=bos_source_args, is_download_to_local=False, **kwargs, ) # 获取数据源参数 source = destination if destination else self.inner_data_source_cache if not source: err_msg = "no data source or other arguments provided for saving" log_error(err_msg) raise ValueError(err_msg) # 首先检查是否有传入 schema 或者已经默认有了 schema schema = schema if schema else self.inner_schema_cache # 如果导出的数据源是千帆,则强制构造 schema 进行检查,优先级最高 if isinstance(source, QianfanDataSource): # 一个方法从 source 中抽取 schema 信息 schema = _get_qianfan_schema(source) # 校验 if schema and not schema.validate(self): error = ValidationError("validate failed when save dataset") log_error(str(error)) raise error if isinstance(source, QianfanDataSource): assert isinstance(schema, QianfanSchema) kwargs["is_annotated"] = schema.is_annotated # 开始写入数据 res = self._to_source(source, **kwargs) # noqa if res and replace_source: self.inner_data_source_cache = source return res
[docs] @classmethod def create_from_pyobj( cls, data: Union[List[Dict[str, Any]], Dict[str, List]], schema: Optional[Schema] = None, **kwargs: Any, ) -> "Dataset": """ create a dataset from python dict or list Args: data (Union[List[Dict[str, Any]], Dict[str, List]]): python object used to create dataset。 schema (Optional[Schema]): schema used to validate before exporting data, default to None **kwargs (Any): optional arguments Returns: Dataset: a dataset instance """ if isinstance(data, list): return cls( inner_table=pyarrow.Table.from_pylist(data).combine_chunks(), inner_schema_cache=schema, **kwargs, ) else: return cls( inner_table=pyarrow.Table.from_pydict(data).combine_chunks(), inner_schema_cache=schema, **kwargs, )
[docs] @classmethod def create_from_pyarrow_table( cls, table: pyarrow.Table, schema: Optional[Schema] = None, **kwargs: Any, ) -> "Dataset": """ create a dataset from pyarrow table Args: table (pyarrow): pyarrow table object used to create dataset。 schema (Optional[Schema]): schema used to validate before exporting data, default to None **kwargs (Any): optional arguments Returns: Dataset: a dataset instance """ return cls( inner_table=table.combine_chunks(), inner_schema_cache=schema, **kwargs, )
def _is_dataset_located_in_qianfan(self) -> bool: if not isinstance(self.inner_data_source_cache, QianfanDataSource): return False return not self.inner_data_source_cache.download_when_init def _is_dataset_generic_text(self) -> bool: if not isinstance(self.inner_data_source_cache, QianfanDataSource): return False return ( self.inner_data_source_cache.template_type == DataTemplateType.GenericText )
[docs] def is_dataset_located_in_qianfan(self) -> bool: """ tell whether current dataset is cloud-based dataset Returns: bool: whether current dataset is cloud-based dataset """ return self._is_dataset_located_in_qianfan()
[docs] def is_dataset_generic_text(self) -> bool: """ tell whether current dataset is generic text dataset Returns: bool: whether current dataset is generic text dataset """ return self._is_dataset_generic_text()
[docs] def start_online_data_process_task(self, operators: List[QianfanOperator]) -> str: """ create an online ETL task on qianfan Args: operators (List[QianfanOperator]): operators applied to ETL task Returns: str: etl task id """ if not self.is_dataset_located_in_qianfan(): # 如果数据集不是已经在千帆上,则直接失败,因为被处理的数据集必须在云上 # 目前不支持自动先将本地数据集上传到云端,处理完成后再同步回本地这种操作。 err_msg = "can't process a non-qianfan dataset on qianfan" log_error(err_msg) raise ValueError(err_msg) if not self._is_dataset_generic_text(): # 如果数据集不是泛文本,也不支持清洗 err_msg = "can't process qianfan dataset which isn't GenericText type" log_error(err_msg) raise ValueError(err_msg) operator_dict: Dict[str, List[Dict[str, Any]]] = { "clean": [], "filter": [], "deduplication": [], "desensitization": [], } for operator in operators: attr_dict = operator.dict() attr_dict.pop("operator_name") attr_dict.pop("operator_type") elem_dict = {"name": operator.operator_name, "args": attr_dict} operator_type = operator.operator_type operator_dict[operator_type].append(elem_dict) log_debug(f"operator args dict: {operator_dict}") log_info("start to creating an etl task") etl_id = _create_a_dataset_etl_task( self.inner_data_source_cache, operator_dict )[0] return etl_id
[docs] def online_data_process(self, operators: List[QianfanOperator]) -> Dict[str, Any]: """ create an online ETL task on qianfan Args: operators (List[QianfanOperator]): operators applied to ETL task Returns: Dict[str, Any]: ETL task info, contains 3 field: is_succeeded (bool): whether ETL task succeed etl_task_id (Optional[int]): etl task id, only exists when etl task is created successfully new_dataset_id (Optional[int]): dataset id which stores data after etl, only exists when etl task is succeeded """ etl_id = self.start_online_data_process_task(operators) log_debug(f"get etl id {etl_id}") log_info("creating etl task successfully") ret_dict: Dict[str, Any] = {"is_succeeded": False, "etl_task_id": etl_id} while True: sleep(get_config().ETL_STATUS_POLLING_INTERVAL) result = _check_online_data_process_result(etl_id) if result is None: continue if not result: return ret_dict else: ret_dict["new_dataset_id"] = result ret_dict["is_succeeded"] = True break return ret_dict
[docs] @_online_except_decorator def add_default_group_column(self) -> Self: """ add "_group" column to Dataset, the value in "_group" column are sequential incremental Returns: Self: Dataset itself """ if QianfanDataGroupColumnName in self.col_names(): # 如果已经存在,则不做任何处理 return self return self.col_append( {QianfanDataGroupColumnName: list(range(self.row_number()))} )
[docs] @_online_except_decorator def delete_group_column(self) -> Self: """ remove "_group" column from Dataset Returns: Self: Dataset itself """ if QianfanDataGroupColumnName not in self.col_names(): return self return self.col_delete(QianfanDataGroupColumnName)
# -------------------- Processable 相关 ---------------- # 直接调用 Table 对象的接口方法 # 这些接口不支持用在云端数据集上
[docs] @_online_except_decorator def map(self, op: Callable[[Any], Any]) -> Self: """ map on dataset Args: op (Callable[[Any], Any]): handler used to map Returns: Self: Dataset itself """ return super().map(op)
[docs] @_online_except_decorator def filter(self, op: Callable[[Any], bool]) -> Self: """ filter on dataset Args: op (Callable[[Any], bool]): handler used to filter Returns: Self: Dataset itself """ return super().filter(op)
[docs] @_online_except_decorator def delete(self, index: Union[int, str]) -> Self: """ delete an element from dataset Args: index (Union[int, str]): element index to delete Returns: Self: Dataset itself """ return super().delete(index)
# 但是在云上数据集追加数据未来可以支持,本质是向数据集中导入新数据。 # 目前不做修改,等待接口 ready
[docs] @_online_except_decorator def append( self, elem: Any, add_new_group: bool = False, is_grouped: bool = True ) -> Self: """ append element(s) to dataset Args: elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]): Elements added to dataset add_new_group (bool): Whether elem has a new group id. Only used when dataset is grouped. is_grouped (bool): Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it's True, each element will have sequential incremental group id from last available group id. Returns: Self: Dataset itself """ return super().append(elem, add_new_group, is_grouped)
[docs] @_online_except_decorator def insert( self, elem: Any, index: Any, group_id: int = -1, add_new_group: bool = False, is_grouped: bool = True, ) -> Self: """ insert element(s) to dataset Args: elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]): Elements added to dataset index (int): where to insert element(s) group_id (int): which group id you want to apply to new element(s). Default to -1, which means let group id be automatically inferred from table. add_new_group (bool): Whether elem has a new group id. Only used when dataset is grouped and group_id is -1 is_grouped (bool): Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it's True, each element will have sequential incremental group id from last available group id. Returns: Self: Dataset itself """ return super().insert(elem, index, add_new_group, is_grouped)
[docs] def list( self, by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None, **kwargs: Any, ) -> Any: """ get element(s) from dataset Args: by (Optional[Union[slice, int, Sequence[int]]]): index or indices for elements, default to None, in which case return a python list of dataset row Returns: Any: dataset row list """ if not self.is_dataset_located_in_qianfan(): log_info(f"list local dataset data by {by}") return super().list(by) else: return _list_cloud_data(self.inner_data_source_cache, by, **kwargs)
[docs] def row_number(self) -> int: if ( isinstance(self.inner_data_source_cache, QianfanDataSource) and not self.inner_data_source_cache.download_when_init ): return Data.get_dataset_info(self.inner_data_source_cache.id)["result"][ "versionInfo" ]["entityCount"] else: return super().row_number()
def __getitem__(self, key: Any) -> Any: if ( isinstance(key, int) or isinstance(key, slice) or (isinstance(key, (list, tuple)) and key and isinstance(key[0], int)) ): return self.list(key) else: return self.col_list(key) def __delitem__(self, key: Any) -> None: if isinstance(key, int): self.delete(key) elif isinstance(key, str): self.col_delete(key) else: err_msg = f"unsupported key type for deleting: {type(key)}" log_error(err_msg) raise TypeError(err_msg) # 列操作集
[docs] @_online_except_decorator def col_map(self, op: Callable[[Any], Any]) -> Self: """ map on dataset's column Args: op (Callable[[Any], Any]): handler used to map Returns: Self: Dataset itself """ return super().col_map(op)
[docs] @_online_except_decorator def col_filter(self, op: Callable[[Any], bool]) -> Self: """ filter on dataset's column Args: op (Callable[[Any], bool]): handler used to filter Returns: Self: Dataset itself """ return super().col_filter(op)
[docs] @_online_except_decorator def col_delete(self, index: Union[int, str]) -> Self: """ delete an column from dataset Args: index (str): column name to delete Returns: Self: Dataset itself """ return super().col_delete(index)
[docs] @_online_except_decorator def col_append(self, elem: Any) -> Self: """ append a row to dataset Args: elem (Dict[str, List]): a dict containing element added to dataset, which key as column name, value as column data Returns: Self: Dataset itself """ return super().col_append(elem)
[docs] @_online_except_decorator def col_insert(self, elem: Any, index: Any) -> Self: """ append a row to dataset Args: elem (Dict[str, List]): dict containing element added to dataset must has column name "name" and column data list "data" index (int): where to insert new column Returns: Self: Dataset itself """ return super().col_insert(elem, index)
# 等待接口 ready 才能对云端数据集做展示
[docs] @_online_except_decorator def col_list( self, by: Optional[ Union[slice, int, str, List[int], Tuple[int], List[str], Tuple[str]] ] = None, ) -> Any: """ get column(s) from dataset Args: by (Optional[Union[int, str, Sequence[int], Sequence[str]]]): index or indices for columns, default to None, in which case return a python list of dataset column Returns: Any: dataset column list """ return super().col_list(by)
[docs] @_online_except_decorator def col_names(self) -> List[str]: """ get column name list Returns: List[str]: column name list """ return super().col_names()
[docs] @_online_except_decorator def col_renames(self, new_names: List[str]) -> Self: """ rename all dataset column Args: new_names (List[str]): All new names for columns Returns: Self: A brand-new Dataset with new name """ return super().col_renames(new_names)
@property @_online_except_decorator def get_reference_data(self) -> List[Any]: """ get reference data in dataset Returns: List[Any]: list of output data column """ return self.col_list(self.reference_column)[self.reference_column] @property @_online_except_decorator def get_input_data(self) -> Dict[str, List[Any]]: """ get input columns data in dataset Returns: Dict[str, List[Any]]: a dict which indicates the "column name-column data" pairs """ return self[self.input_columns]
[docs] def test_using_llm( self, model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any, ) -> "Dataset": """ using arguments to init an llm instance and get output on current dataset from it set only model arguments our service arguments to instantiating Args: model_version_id (Optional[int]): version id of your own model, default to None service_model (Optional[str]): name of model you want to use as service, default to None service_endpoint (Optional[str]): endpoint of service, default to None is_chat_service (bool): the service type of service, default to True. Service will be Completion if False does_show_latency (bool): whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True **kwargs (Any): optional argument dict Returns: Dataset: A dataset contains inputs, reference outputs and llm outputs """ if model_version_id: return self._batch_inference_on_model(model_version_id, **kwargs) elif service_model or service_endpoint: return self._batch_inference_on_service( service_model, service_endpoint, is_chat_service, does_show_latency, **kwargs, ) else: err_msg = "no sufficient argument has been passed" log_error(err_msg) raise ValueError(err_msg)
[docs] async def atest_using_llm( self, model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any, ) -> "Dataset": """ using arguments to init an llm instance and get output on current dataset from it asynchronously set only model arguments our service arguments to instantiating Args: model_version_id (Optional[str]): version id of your own model, default to None service_model (Optional[str]): name of model you want to use as service, default to None service_endpoint (Optional[str]): endpoint of service, default to None is_chat_service (bool): the service type of service, default to True. Service will be Completion if False does_show_latency (bool): whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True **kwargs (Any): optional argument dict Returns: Dataset: A dataset contains inputs, reference outputs and llm outputs """ if model_version_id: return self._batch_inference_on_model(model_version_id, **kwargs) elif service_model or service_endpoint: return await self._async_batch_inference_on_service( service_model, service_endpoint, is_chat_service, does_show_latency, **kwargs, ) else: err_msg = "no sufficient argument has been passed" log_error(err_msg) raise ValueError(err_msg)
def _batch_inference_on_model( self, model_version_id: str, **kwargs: Any ) -> "Dataset": """ create batch run using specific dataset on qianfan by evaluation ability of platform Parameters: model_version_id (str): version id of your own model, default to None **kwargs (Any): Arbitrary keyword arguments Returns: Dataset: batch result contained in dataset """ if not self.is_dataset_located_in_qianfan(): err_msg = "can't start a batch run task on non-qianfan dataset" log_error(err_msg) raise ValueError(err_msg) if self.is_dataset_generic_text(): err_msg = "can't start a batch run task on generic text dataset" log_error(err_msg) raise ValueError(err_msg) model_id = Model.detail(model_version_id)["result"]["modelIdStr"] result_dataset_id = _start_an_evaluation_task_for_model_batch_inference( self.inner_data_source_cache, model_id, model_version_id ) result_dataset = Dataset.load(qianfan_dataset_id=result_dataset_id, **kwargs) if result_dataset.is_dataset_located_in_qianfan(): return result_dataset result_dataset.unpack() new_list: List[Dict[str, Any]] = [] for entry in result_dataset.list(): new_list.append( { "prompt": entry["prompt"], NewInputPromptColumnName: entry["prompt"], LLMOutputColumnName: entry["model_response"][0]["content"], OldReferenceColumnName: _extract_string(entry["response"]), } ) return Dataset.create_from_pyobj( new_list, input_columns=["prompt"], reference_column=OldReferenceColumnName, eval_input_column=NewInputPromptColumnName, eval_llm_output_column=LLMOutputColumnName, ) def _get_completion_return_dataset( self, input_str_list: List[str], output_list: List[str], request_latency_list: List[float], first_token_latency_list: List[float], does_show_latency: bool, ) -> "Dataset": table_dict = { **self.get_input_data, NewInputPromptColumnName: input_str_list, LLMOutputColumnName: output_list, } reference_column: Optional[str] = None if self.reference_column: table_dict[OldReferenceColumnName] = self.get_reference_data reference_column = OldReferenceColumnName if does_show_latency: if len(first_token_latency_list) != 0: table_dict[FirstTokenLatencyColumnName] = first_token_latency_list table_dict[RequestLatencyColumnName] = request_latency_list return Dataset.create_from_pyobj( table_dict, input_columns=self.input_columns, reference_column=reference_column, eval_input_column=NewInputPromptColumnName, eval_llm_output_column=LLMOutputColumnName, ) def _get_chat_return_dataset( self, input_list: List[List[Dict[str, Any]]], output_list: List[str], reference_list: List[Any], request_latency_list: List[float], first_token_latency_list: List[float], does_show_latency: bool, ) -> "Dataset": if not self.is_dataset_grouped() and not self.is_dataset_packed(): input_str_list = [conv[0]["content"] for conv in input_list] return self._get_completion_return_dataset( input_str_list, output_list, request_latency_list, first_token_latency_list, does_show_latency, ) table_dict: Dict[str, Any] = { NewInputChatColumnName: input_list, LLMOutputColumnName: output_list, OldReferenceColumnName: reference_list, } if does_show_latency: if len(first_token_latency_list) != 0: table_dict[FirstTokenLatencyColumnName] = first_token_latency_list table_dict[RequestLatencyColumnName] = request_latency_list return Dataset.create_from_pyobj( table_dict, input_columns=[NewInputChatColumnName], reference_column=OldReferenceColumnName, eval_input_column=NewInputChatColumnName, eval_llm_output_column=LLMOutputColumnName, ) def _batch_inference_on_service( self, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, system_prompt: str = "", **kwargs: Any, ) -> "Dataset": """ create batch run using specific dataset on qianfan Args: service_model (Optional[str]): name of model you want to use as service, default to None service_endpoint (Optional[str]): endpoint of service, default to None is_chat_service (bool): the service type of service, default to True. Service will be Completion if False does_show_latency (bool): whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True system_prompt (str): Optional system text for input using, default to "" **kwargs (Any): Arbitrary keyword arguments Keyword arguments: prompt_template (Optional[Prompt]): Optional Prompt used as input of llm, default to None. Only used when your Service is a Completion service Returns: Dataset: batch result contained in dataset """ if self.is_dataset_located_in_qianfan(): err_msg = "can't start a batch run task on qianfan dataset" log_error(err_msg) raise ValueError(err_msg) service = _check_and_generate_service( self.input_columns, service_model, service_endpoint, is_chat_service, **kwargs, ) if isinstance(service, Completion): input_str_list = self._get_input_str_list(**kwargs) output_list, request_latency_list, first_token_latency_list = ( _batch_do_on_service( service, input_str_list, system=system_prompt, **kwargs ) ) return self._get_completion_return_dataset( input_str_list, output_list, request_latency_list, first_token_latency_list, does_show_latency, ) else: input_chat_list, reference_list = self._get_input_chat_list(**kwargs) output_list, request_latency_list, first_token_latency_list = ( _batch_do_on_service( service, input_chat_list, system=system_prompt, **kwargs ) ) return self._get_chat_return_dataset( input_chat_list, output_list, reference_list, request_latency_list, first_token_latency_list, does_show_latency, ) async def _async_batch_inference_on_service( self, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, system_prompt: str = "", **kwargs: Any, ) -> "Dataset": """ asynchronously create batch run using specific dataset on qianfan Args: service_model (Optional[str]): name of model you want to use as service, default to None service_endpoint (Optional[str]): endpoint of service, default to None is_chat_service (bool): the service type of service, default to True. Service will be Completion if False does_show_latency (bool): whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True system_prompt (str): Optional system text for input using, default to "" **kwargs (Any): Arbitrary keyword arguments Keyword arguments: prompt_template (Optional[Prompt]): Optional Prompt used as input of llm, default to None. Only used when your Service is a Completion service Returns: Dataset: batch result contained in dataset """ if self.is_dataset_located_in_qianfan(): err_msg = "can't start a batch run task on qianfan dataset" log_error(err_msg) raise ValueError(err_msg) service = _check_and_generate_service( self.input_columns, service_model, service_endpoint, is_chat_service, **kwargs, ) if isinstance(service, Completion): input_str_list = self._get_input_str_list(**kwargs) output_list, request_latency_list, first_token_latency_list = ( await _async_batch_do_on_service( service, input_str_list, system=system_prompt, **kwargs ) ) return self._get_completion_return_dataset( input_str_list, output_list, request_latency_list, first_token_latency_list, does_show_latency, ) else: input_chat_list, reference_list = self._get_input_chat_list(**kwargs) output_list, request_latency_list, first_token_latency_list = ( await _async_batch_do_on_service( service, input_chat_list, system=system_prompt, **kwargs ) ) return self._get_chat_return_dataset( input_chat_list, output_list, reference_list, request_latency_list, first_token_latency_list, does_show_latency, ) def _get_input_str_list(self, **kwargs: Any) -> List[str]: prompt_template: Optional[Prompt] = kwargs.get("prompt_template", None) input_dict = self.get_input_data assert self.input_columns if self.is_dataset_grouped() or self.is_dataset_packed(): err_msg = ( "can't have a grouped or packed dataset as batch run task dataset" " when service is Completion" ) log_error(err_msg) raise TypeError(err_msg) input_str_list: List[str] = [] for i in range(self.row_number()): if prompt_template: input_str_list.append( prompt_template.render( **{ column_name: input_dict[column_name][i] for column_name in self.input_columns }, **kwargs, )[0] ) else: input_str_list.append( "\n".join( [ input_dict[column_name][i] for column_name in self.input_columns ] ) ) return input_str_list def _get_input_chat_list( self, **kwargs: Any ) -> Tuple[List[List[Dict[str, Any]]], List[Any]]: if not self.is_dataset_packed() and not self.is_dataset_grouped(): input_str_list = self._get_input_str_list(**kwargs) return [ [{"role": QfRole.User.value, "content": prompt}] for prompt in input_str_list ], [] assert self.input_columns if len(self.input_columns) > 1: err_msg = ( "input column list should only have 1 column name when your Service" " is ChatCompletion" ) log_error(err_msg) raise TypeError(err_msg) reference_column = self.reference_column if not reference_column: err_msg = "no reference column has been set" log_error(err_msg) raise ValueError(err_msg) input_column = self.input_columns[0] dataset = deepcopy(self) if dataset.is_dataset_grouped(): dataset.pack() input_chat_list: List[List[Dict[str, Any]]] = [] reference_list: List[Any] = [] for chat in dataset.list(): input_messages: List[Dict[str, Any]] = [] for i in range(len(chat)): input_messages.append( {"role": QfRole.User.value, "content": chat[i][input_column]} ) reference = _extract_string(chat[i][reference_column]) if i != len(chat) - 1: input_messages.append( { "role": QfRole.Assistant.value, "content": reference, } ) else: reference_list.append(reference) input_chat_list.append(input_messages) return input_chat_list, reference_list