qianfan.trainer package

class qianfan.trainer.BaseAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: ExecuteSerializable[Input, Output], ABC

BaseAction is a reusable, atomic operation components that can be freely orchestrated for use in Pipelines.

action_error_event(e: Exception) None[source]

dispatch action error event

Parameters:

e (Exception): action runtime error

action_event(state: ActionState, msg: str = '', data: Any = None) None[source]

dispatch action event

Parameters:

state (ActionState): action state msg (str, optional): action custom dfscription. Defaults to “”. data (Any, optional): action custom data. Defaults to None.

classmethod action_type() str[source]
dumps() Optional[bytes][source]

dumps action input bytes

Returns:

serialized bytes action data

abstract exec(input: Optional[Input] = None, **kwargs: Dict) Output[source]

exec is a abstract method for execute action.

Parameters:

input (Optional[Input], optional): input. Defaults to None.

Returns:

Output: output

loads(data: bytes) Any[source]
Parameters:

data (bytes): load

Returns:

Any: action instance

abstract resume(**kwargs: Dict) Output[source]

Action resume from last input, sub-class should implement this method with their own resuming logic. BaseAction don not support last input storage, because it’s not different from actions in their each action state.

stop(**kwargs: Dict) None[source]

Action stop method, sub-class should implement this method with their own stop logic.

class qianfan.trainer.DeployAction(deploy_config: Optional[DeployConfig] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for model service deployment. A TrainConfig must be supplied when instance initialized. Sample:

``` deploy_config = DeployConfig(replicas=1, pool_type=1) deploy_action = DeployAction(deploy_config=deploy_config)

output = deploy_action.exec(input) ```

input:

{‘task_id’: 47923, ‘job_id’: 33512, ‘model_id’: “xx”, ‘model_version_id’: “xxx”}

output:

` {'task_id': 47923, 'job_id': 33512, 'model_id': "xx", 'model_version_id': "xxx", 'service_id': 164, 'service_endpoint': 'xbiimimv_xxx'} `

deploy_config: Optional[DeployConfig]

deploy config include replicas and so on

exec(**kwargs: Any) Any

method wrapper

model_id: Optional[int]

model id

model_id_str: Optional[str]

model str id

model_version_id: Optional[int]

model version id

model_version_id_str: Optional[str]

model version str id

result: Optional[Dict[str, Any]] = None

result of action

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.Event(action_class: Type, action_id: str, state: ActionState, description: Optional[str] = None, data: Any = None)[source]

Bases: object

Event is the event container for the various nodes in the execution process of Action, and for each different Action, it can be abstracted into five common ActionStates. For multi-Action tasks at the Pipeline level, numerous Events will be generated during the process. Through EventHandler, custom callback events can be registered and listened to, enabling the insertion of various types of callbacks or intermediate task functions in the Pipeline nodes.

action_class: Type
action_id: str
action_state: ActionState
data: Any = None
description: Optional[str] = None
class qianfan.trainer.EventHandler[source]

Bases: object

EventHandler serves as a mechanism for registering and listening to custom callback events in the execution process of Actions. It facilitates the management of events occurring at different nodes during the execution of Actions within a Pipeline.

dispatch(event: Event) None[source]

_summary_

Parameters:
event (Event):

event to dispatch to user custom handler

class qianfan.trainer.LLMFinetune(train_type: Optional[str] = None, dataset: Optional[Any] = None, train_config: Optional[Union[TrainConfig, str]] = None, deploy_config: Optional[DeployConfig] = None, event_handler: Optional[EventHandler] = None, eval_dataset: Optional[Any] = None, evaluators: Optional[List[Evaluator]] = None, dataset_bos_path: Optional[str] = None, previous_trainer: Optional[Trainer] = None, previous_task_id: Optional[str] = None, name: Optional[str] = None, **kwargs: Any)[source]

Bases: Trainer

Class implements the SFT training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training is finished.

property output: Any
resume(**kwargs: Dict) LLMFinetune[source]

LLMFinetune resume method.

Returns:

LLMFinetune: _description_

run(**kwargs: Any) Trainer[source]

_summary_ run a pipeline to run the fine-tune process.

Parameters:
**kwargs:

Any additional keyword arguments. {“input”: {}} could be specified if needed

Raises:

InvalidArgumentError: no pipeline bind to run.

Returns:
Trainer:

self, for chain invocation.

property status: str

LLMFinetune status getter.

Returns:

str: status for LLMFinetune, mapping from state of actions in pipeline.

stop(**kwargs: Dict) Trainer[source]

stop method of LLMFinetune. LLMFinetune will stop all actions in pipeline. In fact, LLMFinetune only take one pipeline, so it will be equal to stop first of ppls.

Returns:
Trainer:

self, for chain invocation.

classmethod train_type_list() Dict[str, ModelInfo][source]
class qianfan.trainer.LoadDataSetAction(dataset: Optional[Union[Dataset, str]] = None, dataset_template: Optional[DataTemplateType] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for dataset’s loading, invokes the dataset’s save method to guarantee the dataset is loaded in Qianfan platform. Sample:

` load_action = LoadDataSetAction(dataset=Dataset(id=1)) load_action.exec() `

input:

none

output:

` {"datasets" : [{"id": 1, "name": "test_dataset"}]} `

class Dataset(inner_table: Table, inner_data_source_cache: Optional[DataSource] = None, inner_schema_cache: Optional[Schema] = None, input_columns: Optional[List[str]] = None, reference_column: Optional[str] = None, eval_input_column: Optional[str] = None, eval_llm_output_column: Optional[str] = None, **kwargs: Any)

Bases: Table

add_default_group_column() Self

add “_group” column to Dataset, the value in “_group” column are sequential incremental

Returns:

Self: Dataset itself

append(elem: Any, add_new_group: bool = False, is_grouped: bool = True) Self

append element(s) to dataset

Args:
elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):

Elements added to dataset

add_new_group (bool):

Whether elem has a new group id. Only used when dataset is grouped.

is_grouped (bool):

Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.

Returns:

Self: Dataset itself

async atest_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset

using arguments to init an llm instance and get output on current dataset from it asynchronously set only model arguments our service arguments to instantiating

Args:
model_version_id (Optional[str]):

version id of your own model, default to None

service_model (Optional[str]):

name of model you want to use as service, default to None

service_endpoint (Optional[str]):

endpoint of service, default to None

is_chat_service (bool):

the service type of service, default to True. Service will be Completion if False

does_show_latency (bool):

whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True

**kwargs (Any):

optional argument dict

Returns:

Dataset: A dataset contains inputs, reference outputs and llm outputs

col_append(elem: Any) Self

append a row to dataset

Args:
elem (Dict[str, List]): a dict containing element added to dataset, which

key as column name, value as column data

Returns:

Self: Dataset itself

col_delete(index: Union[int, str]) Self

delete an column from dataset

Args:

index (str): column name to delete

Returns:

Self: Dataset itself

col_filter(op: Callable[[Any], bool]) Self

filter on dataset’s column

Args:

op (Callable[[Any], bool]): handler used to filter

Returns:

Self: Dataset itself

col_insert(elem: Any, index: Any) Self

append a row to dataset

Args:
elem (Dict[str, List]): dict containing element added to dataset

must has column name “name” and column data list “data”

index (int): where to insert new column

Returns:

Self: Dataset itself

col_list(by: Optional[Union[slice, int, str, List[int], Tuple[int], List[str], Tuple[str]]] = None) Any

get column(s) from dataset

Args:
by (Optional[Union[int, str, Sequence[int], Sequence[str]]]):

index or indices for columns, default to None, in which case return a python list of dataset column

Returns:

Any: dataset column list

col_map(op: Callable[[Any], Any]) Self

map on dataset’s column

Args:

op (Callable[[Any], Any]): handler used to map

Returns:

Self: Dataset itself

col_names() List[str]

get column name list

Returns:

List[str]: column name list

col_renames(new_names: List[str]) Self

rename all dataset column

Args:

new_names (List[str]): All new names for columns

Returns:

Self: A brand-new Dataset with new name

classmethod create_from_pyarrow_table(table: Table, schema: Optional[Schema] = None, **kwargs: Any) Dataset

create a dataset from pyarrow table

Args:
table (pyarrow):

pyarrow table object used to create dataset。

schema (Optional[Schema]):

schema used to validate before exporting data, default to None

**kwargs (Any):

optional arguments

Returns:

Dataset: a dataset instance

classmethod create_from_pyobj(data: Union[List[Dict[str, Any]], Dict[str, List]], schema: Optional[Schema] = None, **kwargs: Any) Dataset

create a dataset from python dict or list

Args:
data (Union[List[Dict[str, Any]], Dict[str, List]]):

python object used to create dataset。

schema (Optional[Schema]):

schema used to validate before exporting data, default to None

**kwargs (Any):

optional arguments

Returns:

Dataset: a dataset instance

delete(index: Union[int, str]) Self

delete an element from dataset

Args:

index (Union[int, str]): element index to delete

Returns:

Self: Dataset itself

delete_group_column() Self

remove “_group” column from Dataset

Returns:

Self: Dataset itself

filter(op: Callable[[Any], bool]) Self

filter on dataset

Args:

op (Callable[[Any], bool]): handler used to filter

Returns:

Self: Dataset itself

property get_input_data: Dict[str, List[Any]]

get input columns data in dataset

Returns:
Dict[str, List[Any]]: a dict

which indicates the “column name-column data” pairs

property get_reference_data: List[Any]

get reference data in dataset

Returns:

List[Any]: list of output data column

insert(elem: Any, index: Any, group_id: int = -1, add_new_group: bool = False, is_grouped: bool = True) Self

insert element(s) to dataset

Args:
elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):

Elements added to dataset

index (int): where to insert element(s) group_id (int):

which group id you want to apply to new element(s). Default to -1, which means let group id be automatically inferred from table.

add_new_group (bool):

Whether elem has a new group id. Only used when dataset is grouped and group_id is -1

is_grouped (bool):

Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.

Returns:

Self: Dataset itself

is_dataset_generic_text() bool

tell whether current dataset is generic text dataset

Returns:

bool: whether current dataset is generic text dataset

is_dataset_located_in_qianfan() bool

tell whether current dataset is cloud-based dataset

Returns:

bool: whether current dataset is cloud-based dataset

list(by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None, **kwargs: Any) Any

get element(s) from dataset

Args:
by (Optional[Union[slice, int, Sequence[int]]]):

index or indices for elements, default to None, in which case return a python list of dataset row

Returns:

Any: dataset row list

classmethod load(source: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, bos_load_args: Optional[Dict[str, Any]] = None, huggingface_dataset: Optional[Any] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, organize_data_as_group: bool = False, **kwargs: Any) Dataset

Read data from the source or create a source from the parameters and create a Table instance. If a schema is specified, perform validation after importing.

Args:
source (Optional[DataSource]): where dataset load from,

default to None,in which case, a datasource will be created inside dataset using parameters below

data_file (Optional[str]):

dataset local file path, default to None

qianfan_dataset_id (Optional[str]):

qianfan dataset ID, default to None

bos_load_args: (Optional[Dict[str, Any]]):

create a dataset and import initial dataset content from args

huggingface_dataset (Optional[Dict[str, Any], Any]):

Huggingface dataset object, only support DatasetDict and Dataset of Huggingface datasets.

bos_source_args: (Optional[Dict[str, Any]]):

create arguments for creating a file on specific bos default to None

schema (Optional[Schema]):

schema used to validate loaded data, default to None

organize_data_as_group (bool):

only available when data source’s format is FormatType.Jsonl. Indicates whether organize data within dataset in group format, default to False, and when it’s True, the default format will be a group-based 2D structure.

**kwargs (Any): optional arguments

Returns:

Dataset: a dataset instance

map(op: Callable[[Any], Any]) Self

map on dataset

Args:

op (Callable[[Any], Any]): handler used to map

Returns:

Self: Dataset itself

online_data_process(operators: List[QianfanOperator]) Dict[str, Any]

create an online ETL task on qianfan

Args:

operators (List[QianfanOperator]): operators applied to ETL task

Returns:
Dict[str, Any]: ETL task info, contains 3 field:

is_succeeded (bool): whether ETL task succeed etl_task_id (Optional[int]): etl task id, only

exists when etl task is created successfully

new_dataset_id (Optional[int]): dataset id which

stores data after etl, only exists when etl task is succeeded

row_number() int

get pyarrow table row count。

Returns:

int: row count。

save(destination: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, qianfan_dataset_create_args: Optional[Dict[str, Any]] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, replace_source: bool = False, **kwargs: Any) bool

Write data to source if a schema has been passed, validate data before exporting

Args:
destination (Optional[DataSource]):

data source where dataset exports,default to None. in which case, a datasource will be created inside dataset using parameters below

data_file (Optional[str]):

dataset local file path, default to None

qianfan_dataset_id (Optional[str]):

qianfan dataset ID, default to None

qianfan_dataset_create_args: (Optional[Dict[str: Any]]):

create arguments for creating a bare dataset on qianfan, default to None

bos_source_args: (Optional[Dict[str, Any]]):

create arguments for creating a file on specific bos default to None

schema: (Optional[Schema]):

schema used to validate before exporting data, default to None

replace_source: (bool):

if replace the original source, default to False

kwargs (Any): optional arguments

Returns:

bool: is saving succeeded

start_online_data_process_task(operators: List[QianfanOperator]) str

create an online ETL task on qianfan

Args:

operators (List[QianfanOperator]): operators applied to ETL task

Returns:

str: etl task id

test_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset

using arguments to init an llm instance and get output on current dataset from it set only model arguments our service arguments to instantiating

Args:
model_version_id (Optional[int]):

version id of your own model, default to None

service_model (Optional[str]):

name of model you want to use as service, default to None

service_endpoint (Optional[str]):

endpoint of service, default to None

is_chat_service (bool):

the service type of service, default to True. Service will be Completion if False

does_show_latency (bool):

whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True

**kwargs (Any):

optional argument dict

Returns:

Dataset: A dataset contains inputs, reference outputs and llm outputs

bos_path: Optional[str] = None
dataset: Optional[Dataset] = None
exec(**kwargs: Any) Any

method wrapper

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.ModelPublishAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Model publish action, Commonly used after TrainAction.

Sample:

Input: ` {'task_id': 47923, 'job_id': 33512} `

Output: ` {'task_id': 47923, 'job_id': 33512, 'model_id': "xxx", 'model_version_id': "aaa"} `

exec(**kwargs: Any) Any

method wrapper

job_id: Optional[str] = None

job id

model: Optional[Model] = None

model object

result: Optional[Dict[str, Any]] = None

result of model publish action

resume(**kwargs: Any) Any

method wrapper

task_id: Optional[str] = None

task id

class qianfan.trainer.PostPreTrain(train_type: str, dataset: Optional[Union[Dataset, str]] = None, train_config: Optional[Union[TrainConfig, str]] = None, event_handler: Optional[EventHandler] = None, **kwargs: Any)[source]

Bases: Trainer

Class implements the PostPreTrain training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training pipeline is finished.

property output: Any
resume(**kwargs: Dict) PostPreTrain[source]

PostPreTrain resume method.

Returns:

PostPreTrain: _description_

run(**kwargs: Any) Trainer[source]

_summary_ run a pipeline to run the fine-tune process.

Parameters:
**kwargs:

Any additional keyword arguments. {“input”: {}} could be specified if needed

Raises:

InvalidArgumentError: no pipeline bind to run.

Returns:
Trainer:

self, for chain invocation.

property status: str

PostPreTrain status getter.

Returns:

str: status for PostPreTrain, mapping from state of actions in pipeline.

stop(**kwargs: Dict) Trainer[source]

stop method of PostPreTrain. PostPreTrain will stop all actions in pipeline. In fact, PostPreTrain only take one pipeline, so it will be equal to stop first of ppls.

Returns:
Trainer:

self, for chain invocation.

classmethod train_type_list() Dict[str, ModelInfo][source]
class qianfan.trainer.TrainAction(train_mode: TrainMode, train_type: Optional[str] = None, train_config: Optional[TrainConfig] = None, task_id: Optional[str] = None, job_id: Optional[str] = None, peft_type: PeftType = PeftType.ALL, job_name: Optional[str] = None, task_description: Optional[str] = None, job_description: Optional[str] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Train Action, Synchronous invocation of the training API, taking a dataset metadata dict as input and producing a model metadata as output. Concretely, exec is called for running.

Note: this action is not involved with model publishing, please use use ModelPublishAction for publishing model.

Sample:

Input: ``` {‘datasets’: {“sourceType”: (

console_consts.TrainDatasetSourceType.PrivateBos.value

), “versions”: [

{
“versionBosUri”: bos_uploader.generate_bos_file_parent_path(

bos_data_src.bucket, bos_data_src.bos_file_path

)

}

]}

```

Output: ` {'task_id': "task-ddd", 'job_id': "job-xxxx"} Sample code: `

exec(**kwargs: Any) Any

method wrapper

get_default_train_config(model_type: str, train_mode: TrainMode, peft_type: PeftType) TrainConfig[source]
is_incr: bool = False

if it’s incremental train or not

job_description: Optional[str] = None

train job description

job_id: Optional[str] = None

train job id

job_name: str = ''

train task name

result: Optional[Dict[str, Any]] = None

“train result

resume(**kwargs: Any) Any

method wrapper

stop(**kwargs: Dict) None[source]

stop method for train action

Parameters:
**kwargs (Dict):

input args for action stop

task_description: Optional[str] = None

train task description

task_id: Optional[str] = None

train task id

train_config: Optional[TrainConfig] = None

train config

train_mode: TrainMode

train mode

train_type: Optional[str] = ''

train_type

validateTrainConfig(strict: bool = True) None[source]

validate train_config with ModelInfo Limits

Raises:

InvalidArgumentError: _description_

class qianfan.trainer.Trainer[source]

Bases: ABC

Base Trainer class, which focus on one step call to run the whole training process. which define the basic 3 methods to operate training. - run() run the specific training process like fine-tuning - resume() resume from the stopped, failed - stop() stop the training process

property actions: Dict[str, BaseAction]

Get the available actions for trainer. Returns:

List[str]: The list of action names.

get_evaluate_result() Any[source]

Receive the evaluate result from the pipeline. [coming soon].

get_log() Any[source]

Receive the training log during the pipeline execution. [coming soon].

abstract property output: Any
ppls: List[Pipeline] = []

Pipelines for training, there may be multiple pipelines in the training process.

register_event_handler(event_handler: EventHandler, ppl_id: Optional[str] = None) None[source]

Register the event handler to specific the ppls. Args:

event_handler (EventHandler): The event handler instance.

result: List[Any] = []

pipeline running results, which may be an error or an object

abstract resume(**kwargs: Dict) Trainer[source]

Counter to stop method. User can resume the training process by calling resume() method. Returns:

Trainer: Trainer instance

abstract run(**kwargs: Dict) Trainer[source]

Trainer abstract method. For the diverse instance subclasses, Override this method to implement the specific training process. Returns:

Trainer: Trainer instance

property status: str

Trainer status。Implements different status for different process like fine-tuning, RLHF, PreTrain and so on.

abstract stop(**kwargs: Dict) Trainer[source]

Trainer abstract method. Subclasses implement it to support an more controllable usage in the concrete situations. Returns:

Trainer: Trainer instance

Submodules

qianfan.trainer.actions module

class qianfan.trainer.actions.DeployAction(deploy_config: Optional[DeployConfig] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for model service deployment. A TrainConfig must be supplied when instance initialized. Sample:

``` deploy_config = DeployConfig(replicas=1, pool_type=1) deploy_action = DeployAction(deploy_config=deploy_config)

output = deploy_action.exec(input) ```

input:

{‘task_id’: 47923, ‘job_id’: 33512, ‘model_id’: “xx”, ‘model_version_id’: “xxx”}

output:

` {'task_id': 47923, 'job_id': 33512, 'model_id': "xx", 'model_version_id': "xxx", 'service_id': 164, 'service_endpoint': 'xbiimimv_xxx'} `

deploy_config: Optional[DeployConfig]

deploy config include replicas and so on

exec(**kwargs: Any) Any

method wrapper

model_id: Optional[int]

model id

model_id_str: Optional[str]

model str id

model_version_id: Optional[int]

model version id

model_version_id_str: Optional[str]

model version str id

result: Optional[Dict[str, Any]] = None

result of action

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.actions.EvaluateAction(eval_dataset: Dataset, evaluators: List[Evaluator], **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for evaluate models or services. Sample: input:

` {'model_id': 47923, 'model_version_id': 33512} `

output:

` {'eval_res': EvaluationResult ...} `

eval_dataset: Optional[Dataset] = None
eval_manager: Optional[EvaluationManager] = None

evaluation manager for evaluate models or services.

exec(**kwargs: Any) Any

method wrapper

result: Optional[Dict[str, Any]] = None

result of action

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.actions.LoadDataSetAction(dataset: Optional[Union[Dataset, str]] = None, dataset_template: Optional[DataTemplateType] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for dataset’s loading, invokes the dataset’s save method to guarantee the dataset is loaded in Qianfan platform. Sample:

` load_action = LoadDataSetAction(dataset=Dataset(id=1)) load_action.exec() `

input:

none

output:

` {"datasets" : [{"id": 1, "name": "test_dataset"}]} `

class Dataset(inner_table: Table, inner_data_source_cache: Optional[DataSource] = None, inner_schema_cache: Optional[Schema] = None, input_columns: Optional[List[str]] = None, reference_column: Optional[str] = None, eval_input_column: Optional[str] = None, eval_llm_output_column: Optional[str] = None, **kwargs: Any)

Bases: Table

add_default_group_column() Self

add “_group” column to Dataset, the value in “_group” column are sequential incremental

Returns:

Self: Dataset itself

append(elem: Any, add_new_group: bool = False, is_grouped: bool = True) Self

append element(s) to dataset

Args:
elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):

Elements added to dataset

add_new_group (bool):

Whether elem has a new group id. Only used when dataset is grouped.

is_grouped (bool):

Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.

Returns:

Self: Dataset itself

async atest_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset

using arguments to init an llm instance and get output on current dataset from it asynchronously set only model arguments our service arguments to instantiating

Args:
model_version_id (Optional[str]):

version id of your own model, default to None

service_model (Optional[str]):

name of model you want to use as service, default to None

service_endpoint (Optional[str]):

endpoint of service, default to None

is_chat_service (bool):

the service type of service, default to True. Service will be Completion if False

does_show_latency (bool):

whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True

**kwargs (Any):

optional argument dict

Returns:

Dataset: A dataset contains inputs, reference outputs and llm outputs

col_append(elem: Any) Self

append a row to dataset

Args:
elem (Dict[str, List]): a dict containing element added to dataset, which

key as column name, value as column data

Returns:

Self: Dataset itself

col_delete(index: Union[int, str]) Self

delete an column from dataset

Args:

index (str): column name to delete

Returns:

Self: Dataset itself

col_filter(op: Callable[[Any], bool]) Self

filter on dataset’s column

Args:

op (Callable[[Any], bool]): handler used to filter

Returns:

Self: Dataset itself

col_insert(elem: Any, index: Any) Self

append a row to dataset

Args:
elem (Dict[str, List]): dict containing element added to dataset

must has column name “name” and column data list “data”

index (int): where to insert new column

Returns:

Self: Dataset itself

col_list(by: Optional[Union[slice, int, str, List[int], Tuple[int], List[str], Tuple[str]]] = None) Any

get column(s) from dataset

Args:
by (Optional[Union[int, str, Sequence[int], Sequence[str]]]):

index or indices for columns, default to None, in which case return a python list of dataset column

Returns:

Any: dataset column list

col_map(op: Callable[[Any], Any]) Self

map on dataset’s column

Args:

op (Callable[[Any], Any]): handler used to map

Returns:

Self: Dataset itself

col_names() List[str]

get column name list

Returns:

List[str]: column name list

col_renames(new_names: List[str]) Self

rename all dataset column

Args:

new_names (List[str]): All new names for columns

Returns:

Self: A brand-new Dataset with new name

classmethod create_from_pyarrow_table(table: Table, schema: Optional[Schema] = None, **kwargs: Any) Dataset

create a dataset from pyarrow table

Args:
table (pyarrow):

pyarrow table object used to create dataset。

schema (Optional[Schema]):

schema used to validate before exporting data, default to None

**kwargs (Any):

optional arguments

Returns:

Dataset: a dataset instance

classmethod create_from_pyobj(data: Union[List[Dict[str, Any]], Dict[str, List]], schema: Optional[Schema] = None, **kwargs: Any) Dataset

create a dataset from python dict or list

Args:
data (Union[List[Dict[str, Any]], Dict[str, List]]):

python object used to create dataset。

schema (Optional[Schema]):

schema used to validate before exporting data, default to None

**kwargs (Any):

optional arguments

Returns:

Dataset: a dataset instance

delete(index: Union[int, str]) Self

delete an element from dataset

Args:

index (Union[int, str]): element index to delete

Returns:

Self: Dataset itself

delete_group_column() Self

remove “_group” column from Dataset

Returns:

Self: Dataset itself

filter(op: Callable[[Any], bool]) Self

filter on dataset

Args:

op (Callable[[Any], bool]): handler used to filter

Returns:

Self: Dataset itself

property get_input_data: Dict[str, List[Any]]

get input columns data in dataset

Returns:
Dict[str, List[Any]]: a dict

which indicates the “column name-column data” pairs

property get_reference_data: List[Any]

get reference data in dataset

Returns:

List[Any]: list of output data column

insert(elem: Any, index: Any, group_id: int = -1, add_new_group: bool = False, is_grouped: bool = True) Self

insert element(s) to dataset

Args:
elem (Union[List[List[Dict]], List[Dict], Tuple[Dict], Dict]):

Elements added to dataset

index (int): where to insert element(s) group_id (int):

which group id you want to apply to new element(s). Default to -1, which means let group id be automatically inferred from table.

add_new_group (bool):

Whether elem has a new group id. Only used when dataset is grouped and group_id is -1

is_grouped (bool):

Are element in elem in same group. Only used when dataset is grouped and elem is Sequence and add_new_group was set True. Default to True, all elements will be in same group. If it’s True, each element will have sequential incremental group id from last available group id.

Returns:

Self: Dataset itself

is_dataset_generic_text() bool

tell whether current dataset is generic text dataset

Returns:

bool: whether current dataset is generic text dataset

is_dataset_located_in_qianfan() bool

tell whether current dataset is cloud-based dataset

Returns:

bool: whether current dataset is cloud-based dataset

list(by: Optional[Union[slice, int, str, Sequence[int], Sequence[str]]] = None, **kwargs: Any) Any

get element(s) from dataset

Args:
by (Optional[Union[slice, int, Sequence[int]]]):

index or indices for elements, default to None, in which case return a python list of dataset row

Returns:

Any: dataset row list

classmethod load(source: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, bos_load_args: Optional[Dict[str, Any]] = None, huggingface_dataset: Optional[Any] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, organize_data_as_group: bool = False, **kwargs: Any) Dataset

Read data from the source or create a source from the parameters and create a Table instance. If a schema is specified, perform validation after importing.

Args:
source (Optional[DataSource]): where dataset load from,

default to None,in which case, a datasource will be created inside dataset using parameters below

data_file (Optional[str]):

dataset local file path, default to None

qianfan_dataset_id (Optional[str]):

qianfan dataset ID, default to None

bos_load_args: (Optional[Dict[str, Any]]):

create a dataset and import initial dataset content from args

huggingface_dataset (Optional[Dict[str, Any], Any]):

Huggingface dataset object, only support DatasetDict and Dataset of Huggingface datasets.

bos_source_args: (Optional[Dict[str, Any]]):

create arguments for creating a file on specific bos default to None

schema (Optional[Schema]):

schema used to validate loaded data, default to None

organize_data_as_group (bool):

only available when data source’s format is FormatType.Jsonl. Indicates whether organize data within dataset in group format, default to False, and when it’s True, the default format will be a group-based 2D structure.

**kwargs (Any): optional arguments

Returns:

Dataset: a dataset instance

map(op: Callable[[Any], Any]) Self

map on dataset

Args:

op (Callable[[Any], Any]): handler used to map

Returns:

Self: Dataset itself

online_data_process(operators: List[QianfanOperator]) Dict[str, Any]

create an online ETL task on qianfan

Args:

operators (List[QianfanOperator]): operators applied to ETL task

Returns:
Dict[str, Any]: ETL task info, contains 3 field:

is_succeeded (bool): whether ETL task succeed etl_task_id (Optional[int]): etl task id, only

exists when etl task is created successfully

new_dataset_id (Optional[int]): dataset id which

stores data after etl, only exists when etl task is succeeded

row_number() int

get pyarrow table row count。

Returns:

int: row count。

save(destination: Optional[DataSource] = None, data_file: Optional[str] = None, qianfan_dataset_id: Optional[str] = None, qianfan_dataset_create_args: Optional[Dict[str, Any]] = None, bos_source_args: Optional[Dict[str, Any]] = None, schema: Optional[Schema] = None, replace_source: bool = False, **kwargs: Any) bool

Write data to source if a schema has been passed, validate data before exporting

Args:
destination (Optional[DataSource]):

data source where dataset exports,default to None. in which case, a datasource will be created inside dataset using parameters below

data_file (Optional[str]):

dataset local file path, default to None

qianfan_dataset_id (Optional[str]):

qianfan dataset ID, default to None

qianfan_dataset_create_args: (Optional[Dict[str: Any]]):

create arguments for creating a bare dataset on qianfan, default to None

bos_source_args: (Optional[Dict[str, Any]]):

create arguments for creating a file on specific bos default to None

schema: (Optional[Schema]):

schema used to validate before exporting data, default to None

replace_source: (bool):

if replace the original source, default to False

kwargs (Any): optional arguments

Returns:

bool: is saving succeeded

start_online_data_process_task(operators: List[QianfanOperator]) str

create an online ETL task on qianfan

Args:

operators (List[QianfanOperator]): operators applied to ETL task

Returns:

str: etl task id

test_using_llm(model_version_id: Optional[str] = None, service_model: Optional[str] = None, service_endpoint: Optional[str] = None, is_chat_service: bool = True, does_show_latency: bool = True, **kwargs: Any) Dataset

using arguments to init an llm instance and get output on current dataset from it set only model arguments our service arguments to instantiating

Args:
model_version_id (Optional[int]):

version id of your own model, default to None

service_model (Optional[str]):

name of model you want to use as service, default to None

service_endpoint (Optional[str]):

endpoint of service, default to None

is_chat_service (bool):

the service type of service, default to True. Service will be Completion if False

does_show_latency (bool):

whether result dataset contain latency info column when using Service as evaluated object. Depending on different request mode (stream and non-stream), it will contains request_complete_latency or (first_token_latency, request_complete_latency) combo. Default to True

**kwargs (Any):

optional argument dict

Returns:

Dataset: A dataset contains inputs, reference outputs and llm outputs

bos_path: Optional[str] = None
dataset: Optional[Dataset] = None
exec(**kwargs: Any) Any

method wrapper

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.actions.ModelPublishAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Model publish action, Commonly used after TrainAction.

Sample:

Input: ` {'task_id': 47923, 'job_id': 33512} `

Output: ` {'task_id': 47923, 'job_id': 33512, 'model_id': "xxx", 'model_version_id': "aaa"} `

exec(**kwargs: Any) Any

method wrapper

job_id: Optional[str] = None

job id

model: Optional[Model] = None

model object

result: Optional[Dict[str, Any]] = None

result of model publish action

resume(**kwargs: Any) Any

method wrapper

task_id: Optional[str] = None

task id

class qianfan.trainer.actions.TrainAction(train_mode: TrainMode, train_type: Optional[str] = None, train_config: Optional[TrainConfig] = None, task_id: Optional[str] = None, job_id: Optional[str] = None, peft_type: PeftType = PeftType.ALL, job_name: Optional[str] = None, task_description: Optional[str] = None, job_description: Optional[str] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Train Action, Synchronous invocation of the training API, taking a dataset metadata dict as input and producing a model metadata as output. Concretely, exec is called for running.

Note: this action is not involved with model publishing, please use use ModelPublishAction for publishing model.

Sample:

Input: ``` {‘datasets’: {“sourceType”: (

console_consts.TrainDatasetSourceType.PrivateBos.value

), “versions”: [

{
“versionBosUri”: bos_uploader.generate_bos_file_parent_path(

bos_data_src.bucket, bos_data_src.bos_file_path

)

}

]}

```

Output: ` {'task_id': "task-ddd", 'job_id': "job-xxxx"} Sample code: `

exec(**kwargs: Any) Any

method wrapper

get_default_train_config(model_type: str, train_mode: TrainMode, peft_type: PeftType) TrainConfig[source]
is_incr: bool = False

if it’s incremental train or not

job_description: Optional[str] = None

train job description

job_id: Optional[str] = None

train job id

job_name: str = ''

train task name

result: Optional[Dict[str, Any]] = None

“train result

resume(**kwargs: Any) Any

method wrapper

stop(**kwargs: Dict) None[source]

stop method for train action

Parameters:
**kwargs (Dict):

input args for action stop

task_description: Optional[str] = None

train task description

task_id: Optional[str] = None

train task id

train_config: Optional[TrainConfig] = None

train config

train_mode: TrainMode

train mode

train_type: Optional[str] = ''

train_type

validateTrainConfig(strict: bool = True) None[source]

validate train_config with ModelInfo Limits

Raises:

InvalidArgumentError: _description_

qianfan.trainer.base module

class qianfan.trainer.base.BaseAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: ExecuteSerializable[Input, Output], ABC

BaseAction is a reusable, atomic operation components that can be freely orchestrated for use in Pipelines.

action_error_event(e: Exception) None[source]

dispatch action error event

Parameters:

e (Exception): action runtime error

action_event(state: ActionState, msg: str = '', data: Any = None) None[source]

dispatch action event

Parameters:

state (ActionState): action state msg (str, optional): action custom dfscription. Defaults to “”. data (Any, optional): action custom data. Defaults to None.

classmethod action_type() str[source]
dumps() Optional[bytes][source]

dumps action input bytes

Returns:

serialized bytes action data

abstract exec(input: Optional[Input] = None, **kwargs: Dict) Output[source]

exec is a abstract method for execute action.

Parameters:

input (Optional[Input], optional): input. Defaults to None.

Returns:

Output: output

loads(data: bytes) Any[source]
Parameters:

data (bytes): load

Returns:

Any: action instance

abstract resume(**kwargs: Dict) Output[source]

Action resume from last input, sub-class should implement this method with their own resuming logic. BaseAction don not support last input storage, because it’s not different from actions in their each action state.

stop(**kwargs: Dict) None[source]

Action stop method, sub-class should implement this method with their own stop logic.

class qianfan.trainer.base.Pipeline(actions: Sequence[BaseAction], post_actions: Sequence[BaseAction] = [], event_handler: Optional[EventHandler] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Pipeline is a sequentially executed chain composed of multiple actions, and users can customize the action chain according to their needs. At any given moment, the Pipeline retains the id of the currently executing action, allowing users to retrieve information about the action currently in progress. By registering an EventHandler, user can listen to events generated during the Pipeline running process.

exec(**kwargs: Any) Any[source]

method wrapper

exec_from(input: Optional[Dict[str, Any]] = None, start: Optional[Union[int, str]] = 0, **kwargs: Dict) Dict[str, Any][source]
register_event_handler(event_handler: EventHandler, action_id: Optional[str] = None) None[source]

Register the event handler to specific the action. Args:

event_handler (EventHandler): The event handler instance.

resume(**kwargs: Any) Any[source]

method wrapper

stop(**kwargs: Dict) None[source]

stop pipeline running, only stop the actions not running.

class qianfan.trainer.base.Trainer[source]

Bases: ABC

Base Trainer class, which focus on one step call to run the whole training process. which define the basic 3 methods to operate training. - run() run the specific training process like fine-tuning - resume() resume from the stopped, failed - stop() stop the training process

property actions: Dict[str, BaseAction]

Get the available actions for trainer. Returns:

List[str]: The list of action names.

get_evaluate_result() Any[source]

Receive the evaluate result from the pipeline. [coming soon].

get_log() Any[source]

Receive the training log during the pipeline execution. [coming soon].

abstract property output: Any
ppls: List[Pipeline] = []

Pipelines for training, there may be multiple pipelines in the training process.

register_event_handler(event_handler: EventHandler, ppl_id: Optional[str] = None) None[source]

Register the event handler to specific the ppls. Args:

event_handler (EventHandler): The event handler instance.

result: List[Any] = []

pipeline running results, which may be an error or an object

abstract resume(**kwargs: Dict) Trainer[source]

Counter to stop method. User can resume the training process by calling resume() method. Returns:

Trainer: Trainer instance

abstract run(**kwargs: Dict) Trainer[source]

Trainer abstract method. For the diverse instance subclasses, Override this method to implement the specific training process. Returns:

Trainer: Trainer instance

property status: str

Trainer status。Implements different status for different process like fine-tuning, RLHF, PreTrain and so on.

abstract stop(**kwargs: Dict) Trainer[source]

Trainer abstract method. Subclasses implement it to support an more controllable usage in the concrete situations. Returns:

Trainer: Trainer instance

qianfan.trainer.base.with_event(func: Callable[[...], Any]) Callable[[...], Any][source]

decorator for action state tracking with event.

qianfan.trainer.configs module

class qianfan.trainer.configs.LimitType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: int, Enum

MultipleChoice = 2
Range = 3
SingleChoice = 1
class qianfan.trainer.configs.ModelInfo(*, short_name: str, base_model_type: str, support_peft_types: List[PeftType] = [], common_params_limit: TrainLimit, specific_peft_types_params_limit: Optional[Dict[Union[str, PeftType], TrainLimit]] = None)[source]

Bases: BaseModel

base_model_type: str

base model name

common_params_limit: TrainLimit

common params limit, except suggestion params diverse from different peft types

short_name: str

short_name must be shorter than 15 characters

specific_peft_types_params_limit: Optional[Dict[Union[str, PeftType], TrainLimit]]

special params suggestion of specific peft types

support_peft_types: List[PeftType]

support peft types and suggestions for training params

class qianfan.trainer.configs.TrainConfig(*, epoch: Optional[int] = None, batch_size: Optional[int] = None, learning_rate: Optional[float] = None, max_seq_len: Optional[int] = None, logging_steps: Optional[int] = None, warmup_ratio: Optional[float] = None, weight_decay: Optional[float] = None, lora_rank: Optional[int] = None, lora_all_linear: Optional[str] = None, scheduler_name: Optional[str] = None, lora_alpha: Optional[int] = None, lora_dropout: Optional[float] = None, lora_target_modules: Optional[List[str]] = None, peft_type: Optional[Union[str, PeftType]] = None, trainset_rate: int = 20, extras: Dict[str, Any] = {})[source]

Bases: BaseModel

batch_size: Optional[int]

batch size: differ from models

epoch: Optional[int]

epoch number: differ from models

extras: Dict[str, Any]

extra fields for train_config

learning_rate: Optional[float]

learning rate: differ from models

classmethod load(path: str) TrainConfig[source]
logging_steps: Optional[int]

log saving interval steps

lora_all_linear: Optional[str]

loRA all linear layer

lora_alpha: Optional[int]

LoRA scaling params

lora_dropout: Optional[float]

loRA dropout

lora_rank: Optional[int]

loRA rank

lora_target_modules: Optional[List[str]]

LoRA参数层列表

max_seq_len: Optional[int]

max_seq_len: differ from models

peft_type: Optional[Union[str, PeftType]]

parameter efficient FineTuning method, like LoRA, P-tuning, ALL

scheduler_name: Optional[str]

for learning rate schedule

trainset_rate: int

rate for dataset to spilt

validate_config(train_limit: TrainLimit) bool[source]
warmup_ratio: Optional[float]

warmup ratio

weight_decay: Optional[float]

normalization params

class qianfan.trainer.configs.TrainLimit(**kwargs: Any)[source]

Bases: dict

qianfan.trainer.configs.get_model_info(train_mode: TrainMode, model: str) Optional[ModelInfo][source]

qianfan.trainer.consts module

class qianfan.trainer.consts.ActionState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

This class list the key point during an action execution At default, ActionState should be get through event_handler’s dispatched event.

Done = 'Done'

Done stands for the state of doing exec

Error = 'Error'

Error stands for the state when errors occur.

Preceding = 'Preceding'

Preceding stands for the point before exec

Running = 'Running'

Running stands for the point during exec

Stopped = 'Stopped'

Stopped stands for the state when stop() is called.

class qianfan.trainer.consts.PeftType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

ALL = 'FullFineTuning'

全量更新

LoRA = 'LoRA'

LoRA

PTuning = 'PromptTuning'

p-tuning

class qianfan.trainer.consts.ServiceStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Created = 'Created'

任务创建,初始化

DeployFailed = 'DeployFailed'

模型服务发布失败

DeployStopped = 'DeployStopped'

服务发布任务停止

Deployed = 'Deployed'

模型服务发布成功

Deploying = 'Deploying'

模型服务发布中

Unknown = 'Unknown'

未知状态

class qianfan.trainer.consts.ServiceType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Chat = 'Chat'

Corresponding to the ChatCompletion

Completion = 'Completion'

Corresponding to the Completion

Embedding = 'Embedding'

Corresponding to the Embedding

Text2Image = 'Text2Image'

Corresponding to the `Text2Image

class qianfan.trainer.consts.TrainStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

DatasetLoadFailed = 'DatasetLoadFailed'

数据集加载失败

DatasetLoadStopped = 'DatasetLoadStopped'

数据集停止加载

DatasetLoaded = 'DatasetLoaded'

数据集加载完成

DatasetLoading = 'DatasetLoading'

数据集加载中

EvaluationCreated = 'EvaluationCreated'

评估任务创建,初始化

EvaluationFailed = 'EvaluationFailed'

模型服务评估失败

EvaluationFinished = 'EvaluationFinished'

模型服务评估完成

EvaluationRunning = 'EvaluationRunning'

模型服务评估中

EvaluationStopped = 'EvaluationStopped'

模型服务评估停止

ModelPublishFailed = 'ModelPublishFailed'

模型发布失败

ModelPublished = 'ModelPublished'

模型发布成功

ModelPublishing = 'ModelPublishing'

模型发布中,对应获取模型运行时的Creating

TrainCreated = 'TrainCreated'

任务创建,初始化

TrainFailed = 'TrainFailed'

训练任务失败,对应训练任务运行时API的状态的Failed

TrainFinished = 'TrainFinished'

训练完成 对应训练任务运行时API的状态的Done

TrainStopped = 'TrainStopped'

训练任务失败,对应训练任务运行时API的状态的Stop

Training = 'Training'

训练中 对应训练任务运行时API状态的Running

Unknown = 'Unknown'

未知状态

qianfan.trainer.event module

class qianfan.trainer.event.Event(action_class: Type, action_id: str, state: ActionState, description: Optional[str] = None, data: Any = None)[source]

Bases: object

Event is the event container for the various nodes in the execution process of Action, and for each different Action, it can be abstracted into five common ActionStates. For multi-Action tasks at the Pipeline level, numerous Events will be generated during the process. Through EventHandler, custom callback events can be registered and listened to, enabling the insertion of various types of callbacks or intermediate task functions in the Pipeline nodes.

action_class: Type
action_id: str
action_state: ActionState
data: Any = None
description: Optional[str] = None
class qianfan.trainer.event.EventHandler[source]

Bases: object

EventHandler serves as a mechanism for registering and listening to custom callback events in the execution process of Actions. It facilitates the management of events occurring at different nodes during the execution of Actions within a Pipeline.

dispatch(event: Event) None[source]

_summary_

Parameters:
event (Event):

event to dispatch to user custom handler

qianfan.trainer.event.dispatch_event(event_handler: Optional[EventHandler] = None, event: Optional[Event] = None) None[source]

method to dispatch event from the event handler.

Args:
event_handler (Optional[EventHandler], optional):

event handler. Defaults to None.

event (Optional[Event], optional):

runtime generated event instance. Defaults to None.

qianfan.trainer.finetune module

class qianfan.trainer.finetune.LLMFinetune(train_type: Optional[str] = None, dataset: Optional[Any] = None, train_config: Optional[Union[TrainConfig, str]] = None, deploy_config: Optional[DeployConfig] = None, event_handler: Optional[EventHandler] = None, eval_dataset: Optional[Any] = None, evaluators: Optional[List[Evaluator]] = None, dataset_bos_path: Optional[str] = None, previous_trainer: Optional[Trainer] = None, previous_task_id: Optional[str] = None, name: Optional[str] = None, **kwargs: Any)[source]

Bases: Trainer

Class implements the SFT training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training is finished.

property output: Any
resume(**kwargs: Dict) LLMFinetune[source]

LLMFinetune resume method.

Returns:

LLMFinetune: _description_

run(**kwargs: Any) Trainer[source]

_summary_ run a pipeline to run the fine-tune process.

Parameters:
**kwargs:

Any additional keyword arguments. {“input”: {}} could be specified if needed

Raises:

InvalidArgumentError: no pipeline bind to run.

Returns:
Trainer:

self, for chain invocation.

property status: str

LLMFinetune status getter.

Returns:

str: status for LLMFinetune, mapping from state of actions in pipeline.

stop(**kwargs: Dict) Trainer[source]

stop method of LLMFinetune. LLMFinetune will stop all actions in pipeline. In fact, LLMFinetune only take one pipeline, so it will be equal to stop first of ppls.

Returns:
Trainer:

self, for chain invocation.

classmethod train_type_list() Dict[str, ModelInfo][source]

qianfan.trainer.post_pretrain module

class qianfan.trainer.post_pretrain.PostPreTrain(train_type: str, dataset: Optional[Union[Dataset, str]] = None, train_config: Optional[Union[TrainConfig, str]] = None, event_handler: Optional[EventHandler] = None, **kwargs: Any)[source]

Bases: Trainer

Class implements the PostPreTrain training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training pipeline is finished.

property output: Any
resume(**kwargs: Dict) PostPreTrain[source]

PostPreTrain resume method.

Returns:

PostPreTrain: _description_

run(**kwargs: Any) Trainer[source]

_summary_ run a pipeline to run the fine-tune process.

Parameters:
**kwargs:

Any additional keyword arguments. {“input”: {}} could be specified if needed

Raises:

InvalidArgumentError: no pipeline bind to run.

Returns:
Trainer:

self, for chain invocation.

property status: str

PostPreTrain status getter.

Returns:

str: status for PostPreTrain, mapping from state of actions in pipeline.

stop(**kwargs: Dict) Trainer[source]

stop method of PostPreTrain. PostPreTrain will stop all actions in pipeline. In fact, PostPreTrain only take one pipeline, so it will be equal to stop first of ppls.

Returns:
Trainer:

self, for chain invocation.

classmethod train_type_list() Dict[str, ModelInfo][source]