qianfan.trainer package
- class qianfan.trainer.BaseAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]
Bases:
ExecuteSerializable[Input,Output],ABCBaseAction is a reusable, atomic operation components that can be freely orchestrated for use in Pipelines.
- action_error_event(e: Exception) None[source]
dispatch action error event
- Parameters:
e (Exception): action runtime error
- action_event(state: ActionState, msg: str = '', data: Any = None) None[source]
dispatch action event
- Parameters:
state (ActionState): action state msg (str, optional): action custom dfscription. Defaults to “”. data (Any, optional): action custom data. Defaults to None.
- abstract exec(input: Optional[Input] = None, **kwargs: Dict) Output[source]
exec is a abstract method for execute action.
- Parameters:
input (Optional[Input], optional): input. Defaults to None.
- Returns:
Output: output
- class qianfan.trainer.DeployAction(deploy_config: Optional[DeployConfig] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Action for model service deployment. A TrainConfig must be supplied when instance initialized. Sample:
- input:
{‘task_id’: 47923, ‘job_id’: 33512, ‘model_id’: “xx”, ‘model_version_id’: “xxx”}
- output:
` {'task_id': 47923, 'job_id': 33512, 'model_id': "xx", 'model_version_id': "xxx", 'service_id': 164, 'service_endpoint': 'xbiimimv_xxx'} `
- deploy_config: Optional[DeployConfig]
deploy config include replicas and so on
- exec(**kwargs: Any) Any
method wrapper
- model_id: Optional[int]
model id
- model_id_str: Optional[str]
model str id
- model_version_id: Optional[int]
model version id
- model_version_id_str: Optional[str]
model version str id
- result: Optional[Dict[str, Any]] = None
result of action
- resume(**kwargs: Any) Any
method wrapper
- class qianfan.trainer.Event(action_class: Type, action_id: str, state: ActionState, description: Optional[str] = None, data: Any = None)[source]
Bases:
objectEvent is the event container for the various nodes in the execution process of Action, and for each different Action, it can be abstracted into five common ActionStates. For multi-Action tasks at the Pipeline level, numerous Events will be generated during the process. Through EventHandler, custom callback events can be registered and listened to, enabling the insertion of various types of callbacks or intermediate task functions in the Pipeline nodes.
- action_class: Type
- action_id: str
- action_state: ActionState
- data: Any = None
- description: Optional[str] = None
- class qianfan.trainer.EventHandler[source]
Bases:
objectEventHandler serves as a mechanism for registering and listening to custom callback events in the execution process of Actions. It facilitates the management of events occurring at different nodes during the execution of Actions within a Pipeline.
- class qianfan.trainer.LLMFinetune(train_type: str, dataset: Optional[Any] = None, train_config: Optional[Union[TrainConfig, str]] = None, deploy_config: Optional[DeployConfig] = None, event_handler: Optional[EventHandler] = None, base_model: Optional[str] = None, eval_dataset: Optional[Any] = None, evaluators: Optional[List[Evaluator]] = None, dataset_bos_path: Optional[str] = None, **kwargs: Any)[source]
Bases:
TrainerClass implements the SFT training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training is finished.
- property output: Any
- resume(**kwargs: Dict) LLMFinetune[source]
LLMFinetune resume method.
- Returns:
LLMFinetune: _description_
- run(**kwargs: Any) Trainer[source]
_summary_ run a pipeline to run the fine-tune process.
- Parameters:
- **kwargs:
Any additional keyword arguments. {“input”: {}} could be specified if needed
- Raises:
InvalidArgumentError: no pipeline bind to run.
- Returns:
- Trainer:
self, for chain invocation.
- property status: str
LLMFinetune status getter.
- Returns:
str: status for LLMFinetune, mapping from state of actions in pipeline.
- class qianfan.trainer.LoadDataSetAction(dataset: Optional[Dataset] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Action for dataset’s loading, invokes the dataset’s save method to guarantee the dataset is loaded in Qianfan platform. Sample:
` load_action = LoadDataSetAction(dataset=Dataset(id=1)) load_action.exec() `- input:
none
- output:
` {"datasets" : [{"id": 1, "name": "test_dataset"}]} `
- class Dataset(inner_table: Table, inner_data_source_cache: Optional[DataSource] = None, inner_schema_cache: Optional[Schema] = None, input_columns: Optional[List[str]] = None, reference_column: Optional[str] = None, **kwargs: Any)
Bases:
Table- add_default_group_column() Self
add “_group” column to Dataset, the value in “_group” column are sequential incremental
- Returns:
Self: Dataset itself
- append(elem: Any, add_new_group: bool = False, is_grouped: bool = True) Self
append element(s) to dataset
- Args:
- elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):
Elements added to dataset
- add_new_group (bool):
Whether elem has a new group id. Only used when dataset is grouped.
- is_grouped (bool):
Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.
- Returns:
Self: Dataset itself
- async atest_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset
using arguments to init an llm instance and get output on current dataset from it asynchronously set only model arguments our service arguments to instantiating
- Args:
- model_version_id (Optional[str]):
version id of your own model, default to None
- service_model (Optional[str]):
name of model you want to use as service, default to None
- service_endpoint (Optional[str]):
endpoint of service, default to None
- is_chat_service (bool):
the service type of service, default to True. Service will be Completion if False
- does_show_latency (bool):
whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True
- **kwargs (Any):
optional argument dict
- Returns:
Dataset: A dataset contains inputs, reference outputs and llm outputs
- col_append(elem: Any) Self
append a row to dataset
- Args:
- elem (Dict[str, List]): a dict containing element added to dataset, which
must has column name “name” and column data list “data”
- Returns:
Self: Dataset itself
- col_delete(index: Union[int, str]) Self
delete an column from dataset
- Args:
index (str): column name to delete
- Returns:
Self: Dataset itself
- col_filter(op: Callable[[Any], bool]) Self
filter on dataset’s column
- Args:
op (Callable[[Any], bool]): handler used to filter
- Returns:
Self: Dataset itself
- col_insert(elem: Any, index: Any) Self
append a row to dataset
- Args:
- elem (Dict[str, List]): dict containing element added to dataset
must has column name “name” and column data list “data”
index (int): where to insert new column
- Returns:
Self: Dataset itself
- col_list(by: Optional[Union[slice, int, str, List[int], Tuple[int], List[str], Tuple[str]]] = None) Any
get column(s) from dataset
- Args:
- by (Optional[Union[int, str, Sequence[int], Sequence[str]]]):
index or indices for columns, default to None, in which case return a python list of dataset column
- Returns:
Any: dataset column list
- col_map(op: Callable[[Any], Any]) Self
map on dataset’s column
- Args:
op (Callable[[Any], Any]): handler used to map
- Returns:
Self: Dataset itself
- col_names() List[str]
get column name list
- Returns:
List[str]: column name list
- col_renames(new_names: List[str]) Self
rename all dataset column
- Args:
new_names (List[str]): All new names for columns
- Returns:
Self: A brand-new Dataset with new name
- classmethod create_from_pyarrow_table(table: Table, schema: Optional[Schema] = None, **kwargs: Any) Dataset
create a dataset from pyarrow table
- Args:
- table (pyarrow):
pyarrow table object used to create dataset。
- schema (Optional[Schema]):
schema used to validate before exporting data, default to None
- **kwargs (Any):
optional arguments
- Returns:
Dataset: a dataset instance
- classmethod create_from_pyobj(data: Union[List[Dict[str, Any]], Dict[str, List]], schema: Optional[Schema] = None, **kwargs: Any) Dataset
create a dataset from python dict or list
- Args:
- data (Union[List[Dict[str, Any]], Dict[str, List]]):
python object used to create dataset。
- schema (Optional[Schema]):
schema used to validate before exporting data, default to None
- **kwargs (Any):
optional arguments
- Returns:
Dataset: a dataset instance
- delete(index: Union[int, str]) Self
delete an element from dataset
- Args:
index (Union[int, str]): element index to delete
- Returns:
Self: Dataset itself
- delete_group_column() Self
remove “_group” column from Dataset
- Returns:
Self: Dataset itself
- filter(op: Callable[[Any], bool]) Self
filter on dataset
- Args:
op (Callable[[Any], bool]): handler used to filter
- Returns:
Self: Dataset itself
- property get_input_data: Dict[str, List[Any]]
get input columns data in dataset
- Returns:
- Dict[str, List[Any]]: a dict
which indicates the “column name-column data” pairs
- property get_reference_data: List[Any]
get reference data in dataset
- Returns:
List[Any]: list of output data column
- insert(elem: Any, index: Any, group_id: int = -1, add_new_group: bool = False, is_grouped: bool = True) Self
insert element(s) to dataset
- Args:
- elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):
Elements added to dataset
index (int): where to insert element(s) group_id (int):
which group id you want to apply to new element(s). Default to -1, which means let group id be automatically inferred from table.
- add_new_group (bool):
Whether elem has a new group id. Only used when dataset is grouped and group_id is -1
- is_grouped (bool):
Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.
- Returns:
Self: Dataset itself
- is_dataset_generic_text() bool
tell whether current dataset is generic text dataset
- Returns:
bool: whether current dataset is generic text dataset
- is_dataset_located_in_qianfan() bool
tell whether current dataset is cloud-based dataset
- Returns:
bool: whether current dataset is cloud-based dataset
- list(by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None, **kwargs: Any) Any
get element(s) from dataset
- Args:
- by (Optional[Union[slice, int, Sequence[int]]]):
index or indices for elements, default to None, in which case return a python list of dataset row
- Returns:
Any: dataset row list
- classmethod load(source: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, bos_load_args: Optional[Dict[str, Any]] = None, huggingface_dataset: Optional[Any] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, organize_data_as_group: bool = False, **kwargs: Any) Dataset
Read data from the source or create a source from the parameters and create a Table instance. If a schema is specified, perform validation after importing.
- Args:
- source (Optional[DataSource]): where dataset load from,
default to None,in which case, a datasource will be created inside dataset using parameters below
- data_file (Optional[str]):
dataset local file path, default to None
- qianfan_dataset_id (Optional[str]):
qianfan dataset ID, default to None
- bos_load_args: (Optional[Dict[str, Any]]):
create a dataset and import initial dataset content from args
- huggingface_dataset (Optional[Dict[str, Any], Any]):
Huggingface dataset object, only support DatasetDict and Dataset of Huggingface datasets.
- bos_source_args: (Optional[Dict[str, Any]]):
create arguments for creating a file on specific bos default to None
- schema (Optional[Schema]):
schema used to validate loaded data, default to None
- organize_data_as_group (bool):
only available when data source’s format is FormatType.Jsonl. Indicates whether organize data within dataset in group format, default to False, and when it’s True, the default format will be a group-based 2D structure.
**kwargs (Any): optional arguments
- Returns:
Dataset: a dataset instance
- map(op: Callable[[Any], Any]) Self
map on dataset
- Args:
op (Callable[[Any], Any]): handler used to map
- Returns:
Self: Dataset itself
- online_data_process(operators: List[QianfanOperator]) Dict[str, Any]
create an online ETL task on qianfan
- Args:
operators (List[QianfanOperator]): operators applied to ETL task
- Returns:
- Dict[str, Any]: ETL task info, contains 3 field:
is_succeeded (bool): whether ETL task succeed etl_task_id (Optional[int]): etl task id, only
exists when etl task is created successfully
- new_dataset_id (Optional[int]): dataset id which
stores data after etl, only exists when etl task is succeeded
- row_number() int
get pyarrow table row count。
- Returns:
int: row count。
- save(destination: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, qianfan_dataset_create_args: Optional[Dict[str, Any]] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, replace_source: bool = False, **kwargs: Any) bool
Write data to source if a schema has been passed, validate data before exporting
- Args:
- destination (Optional[DataSource]):
data source where dataset exports,default to None. in which case, a datasource will be created inside dataset using parameters below
- data_file (Optional[str]):
dataset local file path, default to None
- qianfan_dataset_id (Optional[str]):
qianfan dataset ID, default to None
- qianfan_dataset_create_args: (Optional[Dict[str: Any]]):
create arguments for creating a bare dataset on qianfan, default to None
- bos_source_args: (Optional[Dict[str, Any]]):
create arguments for creating a file on specific bos default to None
- schema: (Optional[Schema]):
schema used to validate before exporting data, default to None
- replace_source: (bool):
if replace the original source, default to False
kwargs (Any): optional arguments
- Returns:
bool: is saving succeeded
- start_online_data_process_task(operators: List[QianfanOperator]) str
create an online ETL task on qianfan
- Args:
operators (List[QianfanOperator]): operators applied to ETL task
- Returns:
str: etl task id
- test_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset
using arguments to init an llm instance and get output on current dataset from it set only model arguments our service arguments to instantiating
- Args:
- model_version_id (Optional[int]):
version id of your own model, default to None
- service_model (Optional[str]):
name of model you want to use as service, default to None
- service_endpoint (Optional[str]):
endpoint of service, default to None
- is_chat_service (bool):
the service type of service, default to True. Service will be Completion if False
- does_show_latency (bool):
whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True
- **kwargs (Any):
optional argument dict
- Returns:
Dataset: A dataset contains inputs, reference outputs and llm outputs
- exec(**kwargs: Any) Any
method wrapper
- resume(**kwargs: Any) Any
method wrapper
- class qianfan.trainer.ModelPublishAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Class for Model publish action, Commonly used after TrainAction.
Sample:
Input:
` {'task_id': 47923, 'job_id': 33512} `Output:
` {'task_id': 47923, 'job_id': 33512, 'model_id': "xxx", 'model_version_id': "aaa"} `- exec(**kwargs: Any) Any
method wrapper
- job_id: Optional[int] = None
job id
- result: Optional[Dict[str, Any]] = None
result of model publish action
- resume(**kwargs: Any) Any
method wrapper
- task_id: Optional[int] = None
task id
- class qianfan.trainer.TrainAction(train_type: Optional[str] = None, train_config: Optional[TrainConfig] = None, base_model: Optional[str] = None, task_id: Optional[int] = None, job_id: Optional[int] = None, train_mode: Optional[TrainMode] = None, task_name: Optional[str] = None, task_description: Optional[str] = None, job_description: Optional[str] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Class for Train Action, Synchronous invocation of the training API, taking a dataset metadata dict as input and producing a model metadata as output. Concretely, exec is called for running.
Note: this action is not involved with model publishing, please use use ModelPublishAction for publishing model.
Sample:
Input:
` {'datasets':[{'type': 1, 'id': 111}]} `Output:
` {'task_id': 47923, 'job_id': 33512} Sample code: `- base_model: Optional[str] = None
base train type like ‘ERNIE-Bot-turbo’
- exec(**kwargs: Any) Any
method wrapper
- get_default_train_config(model_type: str) TrainConfig[source]
- is_incr: bool = False
if it’s incremental train or not
- job_description: Optional[str] = None
train job description
- job_id: Optional[int] = None
train job id
- job_str_id: Optional[str] = None
job task str id
- result: Optional[Dict[str, Any]] = None
“train result
- resume(**kwargs: Any) Any
method wrapper
- stop(**kwargs: Dict) None[source]
stop method for train action
- Parameters:
- **kwargs (Dict):
input args for action stop
- task_description: Optional[str] = None
train task description
- task_id: Optional[int] = None
train task id
- task_name: str = ''
train task name
- task_str_id: Optional[str] = None
train task str id
- train_config: Optional[TrainConfig] = None
train config
- train_type: Optional[str] = ''
train_type
- class qianfan.trainer.Trainer[source]
Bases:
ABCBase Trainer class, which focus on one step call to run the whole training process. which define the basic 3 methods to operate training. - run() run the specific training process like fine-tuning - resume() resume from the stopped, failed - stop() stop the training process
- property actions: Dict[str, BaseAction]
Get the available actions for trainer. Returns:
List[str]: The list of action names.
- abstract property output: Any
- ppls: List[Pipeline] = []
Pipelines for training, there may be multiple pipelines in the training process.
- register_event_handler(event_handler: EventHandler, ppl_id: Optional[str] = None) None[source]
Register the event handler to specific the ppls. Args:
event_handler (EventHandler): The event handler instance.
- result: List[Any] = []
pipeline running results, which may be an error or an object
- abstract resume(**kwargs: Dict) Trainer[source]
Counter to stop method. User can resume the training process by calling resume() method. Returns:
Trainer: Trainer instance
- abstract run(**kwargs: Dict) Trainer[source]
Trainer abstract method. For the diverse instance subclasses, Override this method to implement the specific training process. Returns:
Trainer: Trainer instance
- property status: str
Trainer status。Implements different status for different process like fine-tuning, RLHF, PreTrain and so on.
Submodules
qianfan.trainer.actions module
- class qianfan.trainer.actions.DeployAction(deploy_config: Optional[DeployConfig] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Action for model service deployment. A TrainConfig must be supplied when instance initialized. Sample:
- input:
{‘task_id’: 47923, ‘job_id’: 33512, ‘model_id’: “xx”, ‘model_version_id’: “xxx”}
- output:
` {'task_id': 47923, 'job_id': 33512, 'model_id': "xx", 'model_version_id': "xxx", 'service_id': 164, 'service_endpoint': 'xbiimimv_xxx'} `
- deploy_config: Optional[DeployConfig]
deploy config include replicas and so on
- exec(**kwargs: Any) Any
method wrapper
- model_id: Optional[int]
model id
- model_id_str: Optional[str]
model str id
- model_version_id: Optional[int]
model version id
- model_version_id_str: Optional[str]
model version str id
- result: Optional[Dict[str, Any]] = None
result of action
- resume(**kwargs: Any) Any
method wrapper
- class qianfan.trainer.actions.EvaluateAction(eval_dataset: Dataset, evaluators: List[Evaluator], **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Action for evaluate models or services. Sample: input:
` {'model_id': 47923, 'model_version_id': 33512} `- output:
` {'eval_res': EvaluationResult ...} `
- eval_manager: Optional[EvaluationManager] = None
evaluation manager for evaluate models or services.
- exec(**kwargs: Any) Any
method wrapper
- result: Optional[Dict[str, Any]] = None
result of action
- resume(**kwargs: Any) Any
method wrapper
- class qianfan.trainer.actions.LoadDataSetAction(dataset: Optional[Dataset] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Action for dataset’s loading, invokes the dataset’s save method to guarantee the dataset is loaded in Qianfan platform. Sample:
` load_action = LoadDataSetAction(dataset=Dataset(id=1)) load_action.exec() `- input:
none
- output:
` {"datasets" : [{"id": 1, "name": "test_dataset"}]} `
- class Dataset(inner_table: Table, inner_data_source_cache: Optional[DataSource] = None, inner_schema_cache: Optional[Schema] = None, input_columns: Optional[List[str]] = None, reference_column: Optional[str] = None, **kwargs: Any)
Bases:
Table- add_default_group_column() Self
add “_group” column to Dataset, the value in “_group” column are sequential incremental
- Returns:
Self: Dataset itself
- append(elem: Any, add_new_group: bool = False, is_grouped: bool = True) Self
append element(s) to dataset
- Args:
- elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):
Elements added to dataset
- add_new_group (bool):
Whether elem has a new group id. Only used when dataset is grouped.
- is_grouped (bool):
Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.
- Returns:
Self: Dataset itself
- async atest_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset
using arguments to init an llm instance and get output on current dataset from it asynchronously set only model arguments our service arguments to instantiating
- Args:
- model_version_id (Optional[str]):
version id of your own model, default to None
- service_model (Optional[str]):
name of model you want to use as service, default to None
- service_endpoint (Optional[str]):
endpoint of service, default to None
- is_chat_service (bool):
the service type of service, default to True. Service will be Completion if False
- does_show_latency (bool):
whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True
- **kwargs (Any):
optional argument dict
- Returns:
Dataset: A dataset contains inputs, reference outputs and llm outputs
- col_append(elem: Any) Self
append a row to dataset
- Args:
- elem (Dict[str, List]): a dict containing element added to dataset, which
must has column name “name” and column data list “data”
- Returns:
Self: Dataset itself
- col_delete(index: Union[int, str]) Self
delete an column from dataset
- Args:
index (str): column name to delete
- Returns:
Self: Dataset itself
- col_filter(op: Callable[[Any], bool]) Self
filter on dataset’s column
- Args:
op (Callable[[Any], bool]): handler used to filter
- Returns:
Self: Dataset itself
- col_insert(elem: Any, index: Any) Self
append a row to dataset
- Args:
- elem (Dict[str, List]): dict containing element added to dataset
must has column name “name” and column data list “data”
index (int): where to insert new column
- Returns:
Self: Dataset itself
- col_list(by: Optional[Union[slice, int, str, List[int], Tuple[int], List[str], Tuple[str]]] = None) Any
get column(s) from dataset
- Args:
- by (Optional[Union[int, str, Sequence[int], Sequence[str]]]):
index or indices for columns, default to None, in which case return a python list of dataset column
- Returns:
Any: dataset column list
- col_map(op: Callable[[Any], Any]) Self
map on dataset’s column
- Args:
op (Callable[[Any], Any]): handler used to map
- Returns:
Self: Dataset itself
- col_names() List[str]
get column name list
- Returns:
List[str]: column name list
- col_renames(new_names: List[str]) Self
rename all dataset column
- Args:
new_names (List[str]): All new names for columns
- Returns:
Self: A brand-new Dataset with new name
- classmethod create_from_pyarrow_table(table: Table, schema: Optional[Schema] = None, **kwargs: Any) Dataset
create a dataset from pyarrow table
- Args:
- table (pyarrow):
pyarrow table object used to create dataset。
- schema (Optional[Schema]):
schema used to validate before exporting data, default to None
- **kwargs (Any):
optional arguments
- Returns:
Dataset: a dataset instance
- classmethod create_from_pyobj(data: Union[List[Dict[str, Any]], Dict[str, List]], schema: Optional[Schema] = None, **kwargs: Any) Dataset
create a dataset from python dict or list
- Args:
- data (Union[List[Dict[str, Any]], Dict[str, List]]):
python object used to create dataset。
- schema (Optional[Schema]):
schema used to validate before exporting data, default to None
- **kwargs (Any):
optional arguments
- Returns:
Dataset: a dataset instance
- delete(index: Union[int, str]) Self
delete an element from dataset
- Args:
index (Union[int, str]): element index to delete
- Returns:
Self: Dataset itself
- delete_group_column() Self
remove “_group” column from Dataset
- Returns:
Self: Dataset itself
- filter(op: Callable[[Any], bool]) Self
filter on dataset
- Args:
op (Callable[[Any], bool]): handler used to filter
- Returns:
Self: Dataset itself
- property get_input_data: Dict[str, List[Any]]
get input columns data in dataset
- Returns:
- Dict[str, List[Any]]: a dict
which indicates the “column name-column data” pairs
- property get_reference_data: List[Any]
get reference data in dataset
- Returns:
List[Any]: list of output data column
- insert(elem: Any, index: Any, group_id: int = -1, add_new_group: bool = False, is_grouped: bool = True) Self
insert element(s) to dataset
- Args:
- elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):
Elements added to dataset
index (int): where to insert element(s) group_id (int):
which group id you want to apply to new element(s). Default to -1, which means let group id be automatically inferred from table.
- add_new_group (bool):
Whether elem has a new group id. Only used when dataset is grouped and group_id is -1
- is_grouped (bool):
Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.
- Returns:
Self: Dataset itself
- is_dataset_generic_text() bool
tell whether current dataset is generic text dataset
- Returns:
bool: whether current dataset is generic text dataset
- is_dataset_located_in_qianfan() bool
tell whether current dataset is cloud-based dataset
- Returns:
bool: whether current dataset is cloud-based dataset
- list(by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None, **kwargs: Any) Any
get element(s) from dataset
- Args:
- by (Optional[Union[slice, int, Sequence[int]]]):
index or indices for elements, default to None, in which case return a python list of dataset row
- Returns:
Any: dataset row list
- classmethod load(source: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, bos_load_args: Optional[Dict[str, Any]] = None, huggingface_dataset: Optional[Any] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, organize_data_as_group: bool = False, **kwargs: Any) Dataset
Read data from the source or create a source from the parameters and create a Table instance. If a schema is specified, perform validation after importing.
- Args:
- source (Optional[DataSource]): where dataset load from,
default to None,in which case, a datasource will be created inside dataset using parameters below
- data_file (Optional[str]):
dataset local file path, default to None
- qianfan_dataset_id (Optional[str]):
qianfan dataset ID, default to None
- bos_load_args: (Optional[Dict[str, Any]]):
create a dataset and import initial dataset content from args
- huggingface_dataset (Optional[Dict[str, Any], Any]):
Huggingface dataset object, only support DatasetDict and Dataset of Huggingface datasets.
- bos_source_args: (Optional[Dict[str, Any]]):
create arguments for creating a file on specific bos default to None
- schema (Optional[Schema]):
schema used to validate loaded data, default to None
- organize_data_as_group (bool):
only available when data source’s format is FormatType.Jsonl. Indicates whether organize data within dataset in group format, default to False, and when it’s True, the default format will be a group-based 2D structure.
**kwargs (Any): optional arguments
- Returns:
Dataset: a dataset instance
- map(op: Callable[[Any], Any]) Self
map on dataset
- Args:
op (Callable[[Any], Any]): handler used to map
- Returns:
Self: Dataset itself
- online_data_process(operators: List[QianfanOperator]) Dict[str, Any]
create an online ETL task on qianfan
- Args:
operators (List[QianfanOperator]): operators applied to ETL task
- Returns:
- Dict[str, Any]: ETL task info, contains 3 field:
is_succeeded (bool): whether ETL task succeed etl_task_id (Optional[int]): etl task id, only
exists when etl task is created successfully
- new_dataset_id (Optional[int]): dataset id which
stores data after etl, only exists when etl task is succeeded
- row_number() int
get pyarrow table row count。
- Returns:
int: row count。
- save(destination: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, qianfan_dataset_create_args: Optional[Dict[str, Any]] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, replace_source: bool = False, **kwargs: Any) bool
Write data to source if a schema has been passed, validate data before exporting
- Args:
- destination (Optional[DataSource]):
data source where dataset exports,default to None. in which case, a datasource will be created inside dataset using parameters below
- data_file (Optional[str]):
dataset local file path, default to None
- qianfan_dataset_id (Optional[str]):
qianfan dataset ID, default to None
- qianfan_dataset_create_args: (Optional[Dict[str: Any]]):
create arguments for creating a bare dataset on qianfan, default to None
- bos_source_args: (Optional[Dict[str, Any]]):
create arguments for creating a file on specific bos default to None
- schema: (Optional[Schema]):
schema used to validate before exporting data, default to None
- replace_source: (bool):
if replace the original source, default to False
kwargs (Any): optional arguments
- Returns:
bool: is saving succeeded
- start_online_data_process_task(operators: List[QianfanOperator]) str
create an online ETL task on qianfan
- Args:
operators (List[QianfanOperator]): operators applied to ETL task
- Returns:
str: etl task id
- test_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset
using arguments to init an llm instance and get output on current dataset from it set only model arguments our service arguments to instantiating
- Args:
- model_version_id (Optional[int]):
version id of your own model, default to None
- service_model (Optional[str]):
name of model you want to use as service, default to None
- service_endpoint (Optional[str]):
endpoint of service, default to None
- is_chat_service (bool):
the service type of service, default to True. Service will be Completion if False
- does_show_latency (bool):
whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True
- **kwargs (Any):
optional argument dict
- Returns:
Dataset: A dataset contains inputs, reference outputs and llm outputs
- exec(**kwargs: Any) Any
method wrapper
- resume(**kwargs: Any) Any
method wrapper
- class qianfan.trainer.actions.ModelPublishAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Class for Model publish action, Commonly used after TrainAction.
Sample:
Input:
` {'task_id': 47923, 'job_id': 33512} `Output:
` {'task_id': 47923, 'job_id': 33512, 'model_id': "xxx", 'model_version_id': "aaa"} `- exec(**kwargs: Any) Any
method wrapper
- job_id: Optional[int] = None
job id
- result: Optional[Dict[str, Any]] = None
result of model publish action
- resume(**kwargs: Any) Any
method wrapper
- task_id: Optional[int] = None
task id
- class qianfan.trainer.actions.TrainAction(train_type: Optional[str] = None, train_config: Optional[TrainConfig] = None, base_model: Optional[str] = None, task_id: Optional[int] = None, job_id: Optional[int] = None, train_mode: Optional[TrainMode] = None, task_name: Optional[str] = None, task_description: Optional[str] = None, job_description: Optional[str] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Class for Train Action, Synchronous invocation of the training API, taking a dataset metadata dict as input and producing a model metadata as output. Concretely, exec is called for running.
Note: this action is not involved with model publishing, please use use ModelPublishAction for publishing model.
Sample:
Input:
` {'datasets':[{'type': 1, 'id': 111}]} `Output:
` {'task_id': 47923, 'job_id': 33512} Sample code: `- base_model: Optional[str] = None
base train type like ‘ERNIE-Bot-turbo’
- exec(**kwargs: Any) Any
method wrapper
- get_default_train_config(model_type: str) TrainConfig[source]
- is_incr: bool = False
if it’s incremental train or not
- job_description: Optional[str] = None
train job description
- job_id: Optional[int] = None
train job id
- job_str_id: Optional[str] = None
job task str id
- result: Optional[Dict[str, Any]] = None
“train result
- resume(**kwargs: Any) Any
method wrapper
- stop(**kwargs: Dict) None[source]
stop method for train action
- Parameters:
- **kwargs (Dict):
input args for action stop
- task_description: Optional[str] = None
train task description
- task_id: Optional[int] = None
train task id
- task_name: str = ''
train task name
- task_str_id: Optional[str] = None
train task str id
- train_config: Optional[TrainConfig] = None
train config
- train_type: Optional[str] = ''
train_type
qianfan.trainer.base module
- class qianfan.trainer.base.BaseAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]
Bases:
ExecuteSerializable[Input,Output],ABCBaseAction is a reusable, atomic operation components that can be freely orchestrated for use in Pipelines.
- action_error_event(e: Exception) None[source]
dispatch action error event
- Parameters:
e (Exception): action runtime error
- action_event(state: ActionState, msg: str = '', data: Any = None) None[source]
dispatch action event
- Parameters:
state (ActionState): action state msg (str, optional): action custom dfscription. Defaults to “”. data (Any, optional): action custom data. Defaults to None.
- abstract exec(input: Optional[Input] = None, **kwargs: Dict) Output[source]
exec is a abstract method for execute action.
- Parameters:
input (Optional[Input], optional): input. Defaults to None.
- Returns:
Output: output
- class qianfan.trainer.base.Pipeline(actions: Sequence[BaseAction], post_actions: Sequence[BaseAction] = [], event_handler: Optional[EventHandler] = None, **kwargs: Any)[source]
Bases:
BaseAction[Dict[str,Any],Dict[str,Any]]Pipeline is a sequentially executed chain composed of multiple actions, and users can customize the action chain according to their needs. At any given moment, the Pipeline retains the id of the currently executing action, allowing users to retrieve information about the action currently in progress. By registering an EventHandler, user can listen to events generated during the Pipeline running process.
- exec_from(input: Optional[Dict[str, Any]] = None, start: Optional[Union[int, str]] = 0, **kwargs: Dict) Dict[str, Any][source]
- register_event_handler(event_handler: EventHandler, action_id: Optional[str] = None) None[source]
Register the event handler to specific the action. Args:
event_handler (EventHandler): The event handler instance.
- class qianfan.trainer.base.Trainer[source]
Bases:
ABCBase Trainer class, which focus on one step call to run the whole training process. which define the basic 3 methods to operate training. - run() run the specific training process like fine-tuning - resume() resume from the stopped, failed - stop() stop the training process
- property actions: Dict[str, BaseAction]
Get the available actions for trainer. Returns:
List[str]: The list of action names.
- abstract property output: Any
- ppls: List[Pipeline] = []
Pipelines for training, there may be multiple pipelines in the training process.
- register_event_handler(event_handler: EventHandler, ppl_id: Optional[str] = None) None[source]
Register the event handler to specific the ppls. Args:
event_handler (EventHandler): The event handler instance.
- result: List[Any] = []
pipeline running results, which may be an error or an object
- abstract resume(**kwargs: Dict) Trainer[source]
Counter to stop method. User can resume the training process by calling resume() method. Returns:
Trainer: Trainer instance
- abstract run(**kwargs: Dict) Trainer[source]
Trainer abstract method. For the diverse instance subclasses, Override this method to implement the specific training process. Returns:
Trainer: Trainer instance
- property status: str
Trainer status。Implements different status for different process like fine-tuning, RLHF, PreTrain and so on.
qianfan.trainer.configs module
- class qianfan.trainer.configs.ModelInfo(*, short_name: str, base_model_type: str, support_peft_types: List[PeftType] = [], common_params_limit: TrainLimit, specific_peft_types_params_limit: Optional[Dict[Union[str, PeftType], TrainLimit]] = None)[source]
Bases:
BaseModel- base_model_type: str
base model name
- common_params_limit: TrainLimit
common params limit, except suggestion params diverse from different peft types
- short_name: str
short_name must be shorter than 15 characters
- specific_peft_types_params_limit: Optional[Dict[Union[str, PeftType], TrainLimit]]
special params suggestion of specific peft types
- class qianfan.trainer.configs.TrainConfig(*, epoch: Optional[int] = None, batch_size: Optional[int] = None, learning_rate: Optional[float] = None, max_seq_len: Optional[int] = None, peft_type: Optional[Union[str, PeftType]] = None, trainset_rate: int = 20, logging_steps: Optional[int] = None, warmup_ratio: Optional[float] = None, weight_decay: Optional[float] = None, lora_rank: Optional[int] = None, lora_all_linear: Optional[str] = None, scheduler_name: Optional[str] = None, lora_alpha: Optional[int] = None, lora_dropout: Optional[float] = None, extras: Dict[str, Any] = {})[source]
Bases:
BaseModel- batch_size: Optional[int]
batch size: differ from models
- epoch: Optional[int]
epoch number: differ from models
- extras: Dict[str, Any]
- learning_rate: Optional[float]
learning rate: differ from models
- classmethod load(path: str) TrainConfig[source]
- logging_steps: Optional[int]
log saving interval steps
- lora_all_linear: Optional[str]
loRA all linear layer
- lora_alpha: Optional[int]
LoRA scaling params
- lora_dropout: Optional[float]
loRA dropout
- lora_rank: Optional[int]
loRA rank
- max_seq_len: Optional[int]
max_seq_len: differ from models
- peft_type: Optional[Union[str, PeftType]]
parameter efficient FineTuning method, like LoRA, P-tuning, ALL
- scheduler_name: Optional[str]
for learning rate schedule
- trainset_rate: int
rate for dataset to spilt
- validate_config(train_limit: TrainLimit) bool[source]
- validate_valid_fields(limit: TrainLimit) str[source]
return invalid field name if value is not in limit.supported_hyper_params return “” if all fields are valid.
- warmup_ratio: Optional[float]
warmup ratio
- weight_decay: Optional[float]
normalization params
- class qianfan.trainer.configs.TrainLimit(*, batch_size_limit: Optional[Tuple[int, int]] = None, max_seq_len_options: Optional[List[int]] = None, epoch_limit: Optional[Tuple[int, int]] = None, learning_rate_limit: Optional[Tuple[float, float]] = None, log_steps_limit: Optional[Tuple[int, int]] = None, warmup_ratio_limit: Optional[Tuple[float, float]] = None, weight_decay_limit: Optional[Tuple[float, float]] = None, lora_rank_options: Optional[List[int]] = None, lora_alpha_options: Optional[List[int]] = None, lora_dropout_limit: Optional[Tuple[float, float]] = None, scheduler_name_options: Optional[List[str]] = None, supported_hyper_params: List[str] = [])[source]
Bases:
BaseModel- batch_size_limit: Optional[Tuple[int, int]]
batch size limit
- epoch_limit: Optional[Tuple[int, int]]
epoch limit
- learning_rate_limit: Optional[Tuple[float, float]]
learning rate limit
- log_steps_limit: Optional[Tuple[int, int]]
log steps limit
- lora_alpha_options: Optional[List[int]]
loRA alpha limit
- lora_dropout_limit: Optional[Tuple[float, float]]
loRA dropout limit
- lora_rank_options: Optional[List[int]]
loRA rank options
- max_seq_len_options: Optional[List[int]]
max seq len options
- scheduler_name_options: Optional[List[str]]
scheduler name options
- supported_hyper_params: List[str]
supported hyper params
- warmup_ratio_limit: Optional[Tuple[float, float]]
warmup_ratio limit
- weight_decay_limit: Optional[Tuple[float, float]]
weight_decay limit
qianfan.trainer.consts module
- class qianfan.trainer.consts.ActionState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,EnumThis class list the key point during an action execution At default, ActionState should be get through event_handler’s dispatched event.
- Done = 'Done'
Done stands for the state of doing exec
- Error = 'Error'
Error stands for the state when errors occur.
- Preceding = 'Preceding'
Preceding stands for the point before exec
- Running = 'Running'
Running stands for the point during exec
- Stopped = 'Stopped'
Stopped stands for the state when stop() is called.
- class qianfan.trainer.consts.FinetuneStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,Enum- DatasetLoadFailed = 'DatasetLoadFailed'
数据集加载失败
- DatasetLoadStopped = 'DatasetLoadStopped'
数据集停止加载
- DatasetLoaded = 'DatasetLoaded'
数据集加载完成
- DatasetLoading = 'DatasetLoading'
数据集加载中
- EvaluationCreated = 'EvaluationCreated'
评估任务创建,初始化
- EvaluationFailed = 'EvaluationFailed'
模型服务评估失败
- EvaluationFinished = 'EvaluationFinished'
模型服务评估完成
- EvaluationRunning = 'EvaluationRunning'
模型服务评估中
- EvaluationStopped = 'EvaluationStopped'
模型服务评估停止
- ModelPublishFailed = 'ModelPublishFailed'
模型发布失败
- ModelPublished = 'ModelPublished'
模型发布成功
- ModelPublishing = 'ModelPublishing'
模型发布中,对应获取模型运行时的Creating
- TrainCreated = 'TrainCreated'
任务创建,初始化
- TrainFailed = 'TrainFailed'
训练任务失败,对应训练任务运行时API的状态的Failed
- TrainFinished = 'TrainFinished'
训练完成 对应训练任务运行时API的状态的Done
- TrainStopped = 'TrainStopped'
训练任务失败,对应训练任务运行时API的状态的Stop
- Training = 'Training'
训练中 对应训练任务运行时API状态的Running
- Unknown = 'Unknown'
未知状态
- class qianfan.trainer.consts.PeftType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,Enum- ALL = 'ALL'
全量更新
- LoRA = 'LoRA'
LoRA
- PTuning = 'P-tuning'
p-tuning
- class qianfan.trainer.consts.ServiceStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,Enum- Created = 'Created'
任务创建,初始化
- DeployFailed = 'DeployFailed'
模型服务发布失败
- DeployStopped = 'DeployStopped'
服务发布任务停止
- Deployed = 'Deployed'
模型服务发布成功
- Deploying = 'Deploying'
模型服务发布中
- Unknown = 'Unknown'
未知状态
- class qianfan.trainer.consts.ServiceType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
str,Enum- Chat = 'Chat'
Corresponding to the ChatCompletion
- Completion = 'Completion'
Corresponding to the Completion
- Embedding = 'Embedding'
Corresponding to the Embedding
qianfan.trainer.event module
- class qianfan.trainer.event.Event(action_class: Type, action_id: str, state: ActionState, description: Optional[str] = None, data: Any = None)[source]
Bases:
objectEvent is the event container for the various nodes in the execution process of Action, and for each different Action, it can be abstracted into five common ActionStates. For multi-Action tasks at the Pipeline level, numerous Events will be generated during the process. Through EventHandler, custom callback events can be registered and listened to, enabling the insertion of various types of callbacks or intermediate task functions in the Pipeline nodes.
- action_class: Type
- action_id: str
- action_state: ActionState
- data: Any = None
- description: Optional[str] = None
- class qianfan.trainer.event.EventHandler[source]
Bases:
objectEventHandler serves as a mechanism for registering and listening to custom callback events in the execution process of Actions. It facilitates the management of events occurring at different nodes during the execution of Actions within a Pipeline.
- qianfan.trainer.event.dispatch_event(event_handler: Optional[EventHandler] = None, event: Optional[Event] = None) None[source]
method to dispatch event from the event handler.
- Args:
- event_handler (Optional[EventHandler], optional):
event handler. Defaults to None.
- event (Optional[Event], optional):
runtime generated event instance. Defaults to None.
qianfan.trainer.finetune module
- class qianfan.trainer.finetune.LLMFinetune(train_type: str, dataset: Optional[Any] = None, train_config: Optional[Union[TrainConfig, str]] = None, deploy_config: Optional[DeployConfig] = None, event_handler: Optional[EventHandler] = None, base_model: Optional[str] = None, eval_dataset: Optional[Any] = None, evaluators: Optional[List[Evaluator]] = None, dataset_bos_path: Optional[str] = None, **kwargs: Any)[source]
Bases:
TrainerClass implements the SFT training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training is finished.
- property output: Any
- resume(**kwargs: Dict) LLMFinetune[source]
LLMFinetune resume method.
- Returns:
LLMFinetune: _description_
- run(**kwargs: Any) Trainer[source]
_summary_ run a pipeline to run the fine-tune process.
- Parameters:
- **kwargs:
Any additional keyword arguments. {“input”: {}} could be specified if needed
- Raises:
InvalidArgumentError: no pipeline bind to run.
- Returns:
- Trainer:
self, for chain invocation.
- property status: str
LLMFinetune status getter.
- Returns:
str: status for LLMFinetune, mapping from state of actions in pipeline.