qianfan.trainer package

class qianfan.trainer.DeployAction(deploy_config: Optional[DeployConfig] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for model service deployment. A TrainConfig must be supplied when instance initialized. Sample:

``` deploy_config = DeployConfig(replicas=1, pool_type=1) deploy_action = DeployAction(deploy_config=deploy_config)

output = deploy_action.exec(input) ```

input:

{‘task_id’: 47923, ‘job_id’: 33512, ‘model_id’: 1, ‘model_version_id’: 39}

output:

` {'task_id': 47923, 'job_id': 33512, 'model_id': 1, 'model_version_id': 39, 'service_id': 164, 'service_endpoint': 'xbiimimv_xxx'} `

deploy_config: Optional[DeployConfig]

deploy config include replicas and so on

exec(**kwargs: Any) Any

method wrapper

model_id: Optional[int]

model id

model_version_id: Optional[int]

model version id

result: Optional[Dict[str, Any]] = None

result of action

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.Event(action_id: Optional[str], state: ActionState, description: Optional[str] = None, data: Any = None)[source]

Bases: object

Event is the event container for the various nodes in the execution process of Action, and for each different Action, it can be abstracted into five common ActionStates. For multi-Action tasks at the Pipeline level, numerous Events will be generated during the process. Through EventHandler, custom callback events can be registered and listened to, enabling the insertion of various types of callbacks or intermediate task functions in the Pipeline nodes.

action_id: Optional[str] = None
action_state: ActionState
data: Any = None
description: Optional[str] = None
class qianfan.trainer.EventHandler[source]

Bases: object

EventHandler serves as a mechanism for registering and listening to custom callback events in the execution process of Actions. It facilitates the management of events occurring at different nodes during the execution of Actions within a Pipeline.

dispatch(event: Event) None[source]

_summary_

Parameters:
event (Event):

event to dispatch to user custom handler

class qianfan.trainer.LLMFinetune(train_type: str, dataset: Any, train_config: Optional[TrainConfig] = None, deploy_config: Optional[DeployConfig] = None, event_handler: Optional[EventHandler] = None, base_model: Optional[str] = None, **kwargs: Any)[source]

Bases: Trainer

Class implements the SFT training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training is finished.

property output: Any
resume(**kwargs: Dict) LLMFinetune[source]

LLMFinetune resume method.

Returns:

LLMFinetune: _description_

run(**kwargs: Any) Trainer[source]

_summary_ run a pipeline to run the fine-tune process.

Parameters:
**kwargs:

Any additional keyword arguments. {“input”: {}} could be specified if needed

Raises:

InvalidArgumentError: no pipeline bind to run.

Returns:
Trainer:

self, for chain invocation.

property status: str

LLMFinetune status getter.

Returns:

str: status for LLMFinetune, mapping from state of actions in pipeline.

stop(**kwargs: Dict) Trainer[source]

stop method of LLMFinetune. LLMFinetune will stop all actions in pipeline. In fact, LLMFinetune only take one pipeline, so it will be equal to stop first of ppls.

Returns:
Trainer:

self, for chain invocation.

class qianfan.trainer.LoadDataSetAction(dataset: Optional[Dataset] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for dataset’s loading, invokes the dataset’s save method to guarantee the dataset is loaded in Qianfan platform. Sample:

` load_action = LoadDataSetAction(dataset=Dataset(id=1)) load_action.exec() `

input:

none

output:

` {"datasets" : [{"id": 1, "name": "test_dataset"}]} `

dataset: Optional[Dataset] = None
exec(**kwargs: Any) Any

method wrapper

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.Model(id: Optional[int] = None, version_id: Optional[int] = None, task_id: Optional[int] = None, job_id: Optional[int] = None, name: Optional[str] = None)[source]

Bases: ExecuteSerializable[Dict, Union[QfResponse, Iterator[QfResponse]]]

batch_run_on_qianfan(dataset: Dataset, **kwargs: Any) Dataset[source]

create batch run using specific dataset on qianfan by evaluation ability of platform

Parameters:
dataset (Dataset):

A dataset instance which indicates a dataset on qianfan platform

**kwargs (Any):

Arbitrary keyword arguments

Returns:

Dataset: batch result contained in dataset

deploy(deploy_config: DeployConfig, **kwargs: Any) Service[source]

model deploy

Parameters:
deploy_config (DeployConfig):

model service deploy config

Returns:

Service: model service instance

dumps() Optional[bytes][source]

Serialize the model to bytes.

Returns:
Optional[bytes]:

bytes of this model

exec(input: Optional[Dict] = None, **kwargs: Dict) Union[QfResponse, Iterator[QfResponse]][source]

model execution, for different model service type, please input a dict with different keys. Concretely, take

`input={“messages”: [{“role”: “user”,

“content”: “hello world”}]}`

as input, when the model is a chat io Model.

Parameters:
input (Optional[Dict], optional):

input data . Defaults to None.

Raises:

InternalError: model with no service deployed is unable to call exec

Returns:
Union[QfResponse, Iterator[QfResponse]]:

output data

id: Optional[int]

remote model id

job_id: Optional[int]

train job id

loads(data: bytes) Any[source]

load model instance from bytes

Parameters:
data (bytes):

bytes of this model

Returns:

Any: model instance

name: Optional[str] = None

model name

publish(name: str = '', **kwargs: Any) Model[source]

model publish, before deploying a model, it should be published.

Parameters:
name str:

model name. Defaults to “m_{task_id}{job_id}”.

service: Optional[Service] = None

model service

task_id: Optional[int]

train tkas id

version_id: Optional[int]

remote model version id

class qianfan.trainer.ModelPublishAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Model publish action, Commonly used after TrainAction.

Sample:

Input: ` {'task_id': 47923, 'job_id': 33512} `

Output: ` {'task_id': 47923, 'job_id': 33512, 'model_id': 1, 'model_version_id': 39} `

exec(**kwargs: Any) Any

method wrapper

job_id: Optional[int] = None

job id

model: Optional[Model] = None

model object

result: Optional[Dict[str, Any]] = None

result of model publish action

resume(**kwargs: Any) Any

method wrapper

task_id: Optional[int] = None

task id

class qianfan.trainer.Service(id: Optional[int] = None, endpoint: Optional[str] = None, model: Optional[Union[Model, str]] = None, deploy_config: Optional[DeployConfig] = None, service_type: Optional[ServiceType] = None)[source]

Bases: ExecuteSerializable[Dict, Union[QfResponse, Iterator[QfResponse]]]

deploy(**kwargs: Any) Service[source]
deploy_config: Optional[DeployConfig]

service deploy config

dumps() Optional[bytes][source]

serialize the model instance to bytes

Returns:
Optional[bytes]:

bytes of the model instance

endpoint: Optional[str]

service endpoint to call

exec(input: Optional[Dict] = None, **kwargs: Dict) Union[QfResponse, Iterator[QfResponse]][source]
Parameters:
input (Optional[Union[str, List[str], List[dict]]], optional):

input of execution of service. Defaults to None.

**kwargs: additional args Dict

Raises:

InternalError: unsupported service type

Returns:
Union[str, List[str], List[dict]]:

output

get_res() Union[ChatCompletion, Completion, Embedding, Text2Image][source]

convert to the specific model resources. e.g. ChatCompletion, Completion, Embeddings, Text2Image

Returns:
Union[ChatCompletion, Completion, Embedding, Text2Image]:

resource object

id: Optional[int]

remote service id

loads(data: bytes) Any[source]

load service instance from bytes

Parameters:
data (bytes):

bytes of model instance

Returns:

Any: model instance

model: Optional[Model]

service model instance

service_type: Optional[ServiceType]

service type, for user use service as a execution must specify

property status: str

get the service status

Raises:

InternalError: id not found

Returns:

console_const.ServiceStatus

class qianfan.trainer.TrainAction(train_type: Optional[str] = None, train_config: Optional[TrainConfig] = None, base_model: Optional[str] = None, task_id: Optional[int] = None, job_id: Optional[int] = None, train_mode: Optional[TrainMode] = None, task_name: Optional[str] = None, task_description: Optional[str] = None, job_description: Optional[str] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Train Action, Synchronous invocation of the training API, taking a dataset metadata dict as input and producing a model metadata as output. Concretely, exec is called for running.

Note: this action is not involved with model publishing, please use use ModelPublishAction for publishing model.

Sample:

Input: ` {'datasets':[{'type': 1, 'id': 111}]} `

Output: ` {'task_id': 47923, 'job_id': 33512} Sample code: `

base_model: Optional[str] = None

base train type like ‘ERNIE-Bot-turbo’

exec(**kwargs: Any) Any

method wrapper

get_default_train_config(model_type: str) TrainConfig[source]
is_incr: bool = False

if it’s incremental train or not

job_description: Optional[str] = None

train job description

job_id: Optional[int] = None

train job id

result: Optional[Dict[str, Any]] = None

“train result

resume(**kwargs: Any) Any

method wrapper

stop(**kwargs: Dict) None[source]

stop method for train action

Parameters:
**kwargs (Dict):

input args for action stop

task_description: Optional[str] = None

train task description

task_id: Optional[int] = None

train task id

task_name: str = ''

train task name

train_config: Optional[TrainConfig] = None

train config

train_mode: TrainMode = 'SFT'

train mode

train_type: Optional[str] = ''

train_type

Submodules

qianfan.trainer.actions module

class qianfan.trainer.actions.DeployAction(deploy_config: Optional[DeployConfig] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for model service deployment. A TrainConfig must be supplied when instance initialized. Sample:

``` deploy_config = DeployConfig(replicas=1, pool_type=1) deploy_action = DeployAction(deploy_config=deploy_config)

output = deploy_action.exec(input) ```

input:

{‘task_id’: 47923, ‘job_id’: 33512, ‘model_id’: 1, ‘model_version_id’: 39}

output:

` {'task_id': 47923, 'job_id': 33512, 'model_id': 1, 'model_version_id': 39, 'service_id': 164, 'service_endpoint': 'xbiimimv_xxx'} `

deploy_config: Optional[DeployConfig]

deploy config include replicas and so on

exec(**kwargs: Any) Any

method wrapper

model_id: Optional[int]

model id

model_version_id: Optional[int]

model version id

result: Optional[Dict[str, Any]] = None

result of action

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.actions.LoadDataSetAction(dataset: Optional[Dataset] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Action for dataset’s loading, invokes the dataset’s save method to guarantee the dataset is loaded in Qianfan platform. Sample:

` load_action = LoadDataSetAction(dataset=Dataset(id=1)) load_action.exec() `

input:

none

output:

` {"datasets" : [{"id": 1, "name": "test_dataset"}]} `

dataset: Optional[Dataset] = None
exec(**kwargs: Any) Any

method wrapper

resume(**kwargs: Any) Any

method wrapper

class qianfan.trainer.actions.ModelPublishAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Model publish action, Commonly used after TrainAction.

Sample:

Input: ` {'task_id': 47923, 'job_id': 33512} `

Output: ` {'task_id': 47923, 'job_id': 33512, 'model_id': 1, 'model_version_id': 39} `

exec(**kwargs: Any) Any

method wrapper

job_id: Optional[int] = None

job id

model: Optional[Model] = None

model object

result: Optional[Dict[str, Any]] = None

result of model publish action

resume(**kwargs: Any) Any

method wrapper

task_id: Optional[int] = None

task id

class qianfan.trainer.actions.TrainAction(train_type: Optional[str] = None, train_config: Optional[TrainConfig] = None, base_model: Optional[str] = None, task_id: Optional[int] = None, job_id: Optional[int] = None, train_mode: Optional[TrainMode] = None, task_name: Optional[str] = None, task_description: Optional[str] = None, job_description: Optional[str] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Class for Train Action, Synchronous invocation of the training API, taking a dataset metadata dict as input and producing a model metadata as output. Concretely, exec is called for running.

Note: this action is not involved with model publishing, please use use ModelPublishAction for publishing model.

Sample:

Input: ` {'datasets':[{'type': 1, 'id': 111}]} `

Output: ` {'task_id': 47923, 'job_id': 33512} Sample code: `

base_model: Optional[str] = None

base train type like ‘ERNIE-Bot-turbo’

exec(**kwargs: Any) Any

method wrapper

get_default_train_config(model_type: str) TrainConfig[source]
is_incr: bool = False

if it’s incremental train or not

job_description: Optional[str] = None

train job description

job_id: Optional[int] = None

train job id

result: Optional[Dict[str, Any]] = None

“train result

resume(**kwargs: Any) Any

method wrapper

stop(**kwargs: Dict) None[source]

stop method for train action

Parameters:
**kwargs (Dict):

input args for action stop

task_description: Optional[str] = None

train task description

task_id: Optional[int] = None

train task id

task_name: str = ''

train task name

train_config: Optional[TrainConfig] = None

train config

train_mode: TrainMode = 'SFT'

train mode

train_type: Optional[str] = ''

train_type

qianfan.trainer.base module

class qianfan.trainer.base.BaseAction(id: Optional[str] = None, name: Optional[str] = None, event_handler: Optional[EventHandler] = None, **kwargs: Dict[str, Any])[source]

Bases: ExecuteSerializable[Input, Output], ABC

BaseAction is a reusable, atomic operation components that can be freely orchestrated for use in Pipelines.

action_error_event(e: Exception) None[source]

dispatch action error event

Parameters:

e (Exception): action runtime error

action_event(state: ActionState, msg: str = '', data: Any = None) None[source]

dispatch action event

Parameters:

state (ActionState): action state msg (str, optional): action custom dfscription. Defaults to “”. data (Any, optional): action custom data. Defaults to None.

classmethod action_type() str[source]
dumps() Optional[bytes][source]

dumps action input bytes

Returns:

serialized bytes action data

abstract exec(input: Optional[Input] = None, **kwargs: Dict) Output[source]

exec is a abstract method for execute action.

Parameters:

input (Optional[Input], optional): input. Defaults to None.

Returns:

Output: output

loads(data: bytes) Any[source]
Parameters:

data (bytes): load

Returns:

Any: action instance

abstract resume(**kwargs: Dict) Output[source]

Action resume from last input, sub-class should implement this method with their own resuming logic. BaseAction don not support last input storage, because it’s not different from actions in their each action state.

stop(**kwargs: Dict) None[source]

Action stop method, sub-class should implement this method with their own stop logic.

class qianfan.trainer.base.Executable[source]

Bases: Generic[Input, Output], ABC

generic abstraction class of executable

abstract exec(input: Optional[Input] = None, **kwargs: Dict) Output[source]
class qianfan.trainer.base.ExecuteSerializable[source]

Bases: Executable[Input, Output], Serializable

set of executable and serializable. subclass implement it to support exec and dumps/loads.

class qianfan.trainer.base.Pipeline(actions: Sequence[BaseAction], post_actions: Sequence[BaseAction] = [], event_handler: Optional[EventHandler] = None, **kwargs: Any)[source]

Bases: BaseAction[Dict[str, Any], Dict[str, Any]]

Pipeline is a sequentially executed chain composed of multiple actions, and users can customize the action chain according to their needs. At any given moment, the Pipeline retains the id of the currently executing action, allowing users to retrieve information about the action currently in progress. By registering an EventHandler, user can listen to events generated during the Pipeline running process.

exec(**kwargs: Any) Any[source]

method wrapper

exec_from(input: Optional[Dict[str, Any]] = None, start: Optional[Union[int, str]] = 0, **kwargs: Dict) Dict[str, Any][source]
register_event_handler(event_handler: EventHandler, action_id: Optional[str] = None) None[source]

Register the event handler to specific the action. Args:

event_handler (EventHandler): The event handler instance.

resume(**kwargs: Any) Any[source]

method wrapper

stop(**kwargs: Dict) None[source]

stop pipeline running, only stop the actions not running.

class qianfan.trainer.base.Serializable[source]

Bases: ABC

generic abstraction class of serializable. especially for the model, service, and trainer.

abstract dumps() Optional[bytes][source]
Returns:

serialized bytes data

abstract loads(data: bytes) Any[source]
Parameters:

data (bytes): load

Returns:

Any: _description_

class qianfan.trainer.base.Trainer[source]

Bases: ABC

Base Trainer class, which focus on one step call to run the whole training process. which define the basic 3 methods to operate training. - run() run the specific training process like fine-tuning - resume() resume from the stopped, failed - stop() stop the training process

property actions: Dict[str, BaseAction]

Get the available actions for trainer. Returns:

List[str]: The list of action names.

get_evaluate_result() Any[source]

Receive the evaluate result from the pipeline. [coming soon].

get_log() Any[source]

Receive the training log during the pipeline execution. [coming soon].

abstract property output: Any
ppls: List[Pipeline] = []

Pipelines for training, there may be multiple pipelines in the training process.

register_event_handler(event_handler: EventHandler, ppl_id: Optional[str] = None) None[source]

Register the event handler to specific the ppls. Args:

event_handler (EventHandler): The event handler instance.

result: List[Any] = []

pipeline running results, which may be an error or an object

abstract resume(**kwargs: Dict) Trainer[source]

Counter to stop method. User can resume the training process by calling resume() method. Returns:

Trainer: Trainer instance

abstract run(**kwargs: Dict) Trainer[source]

Trainer abstract method. For the diverse instance subclasses, Override this method to implement the specific training process. Returns:

Trainer: Trainer instance

property status: str

Trainer status。Implements different status for different process like fine-tuning, RLHF, PreTrain and so on.

abstract stop(**kwargs: Dict) Trainer[source]

Trainer abstract method. Subclasses implement it to support an more controllable usage in the concrete situations. Returns:

Trainer: Trainer instance

qianfan.trainer.base.with_event(func: Callable[[...], Any]) Callable[[...], Any][source]

decorator for action state tracking with event.

qianfan.trainer.configs module

class qianfan.trainer.configs.DeployConfig(*, name: str = '', endpoint_prefix: str = '', description: str = '', replicas: int = 1, pool_type: DeployPoolType, service_type: ServiceType, extras: Any = None)[source]

Bases: BaseModel

description: str

description of service

endpoint_prefix: str

Endpoint custom prefix, will be used to call resource api

extras: Any
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'description': FieldInfo(annotation=str, required=False, default=''), 'endpoint_prefix': FieldInfo(annotation=str, required=False, default=''), 'extras': FieldInfo(annotation=Any, required=False), 'name': FieldInfo(annotation=str, required=False, default=''), 'pool_type': FieldInfo(annotation=DeployPoolType, required=True), 'replicas': FieldInfo(annotation=int, required=False, default=1), 'service_type': FieldInfo(annotation=ServiceType, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

name: str

Service name

pool_type: DeployPoolType

resource pool type, public resource will be shared with others.

replicas: int
replicas for model services, related to the capacity in QPS of model service.

default set to 1

service_type: ServiceType

service type, after deploy, Service could behave like the specific type.

class qianfan.trainer.configs.TrainConfig(*, epoch: Optional[int] = None, batch_size: Optional[int] = None, learning_rate: Optional[float] = None, max_seq_len: Optional[int] = None, peft_type: Optional[str] = None, trainset_rate: int = 20, extras: Any = None)[source]

Bases: BaseModel

batch_size: Optional[int]

batch size: differ from models

epoch: Optional[int]

epoch number: differ from models

extras: Any
learning_rate: Optional[float]

learning rate: differ from models

max_seq_len: Optional[int]

max_seq_len: differ from models

model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'batch_size': FieldInfo(annotation=Union[int, NoneType], required=False), 'epoch': FieldInfo(annotation=Union[int, NoneType], required=False), 'extras': FieldInfo(annotation=Any, required=False), 'learning_rate': FieldInfo(annotation=Union[float, NoneType], required=False), 'max_seq_len': FieldInfo(annotation=Union[int, NoneType], required=False), 'peft_type': FieldInfo(annotation=Union[str, NoneType], required=False), 'trainset_rate': FieldInfo(annotation=int, required=False, default=20)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

peft_type: Optional[str]

parameter efficient FineTuning method, like LoRA, P-tuning, ALL

trainset_rate: int

rate for dataset to spilt

qianfan.trainer.consts module

class qianfan.trainer.consts.ActionState(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

This class list the key point during an action execution At default, ActionState should be get through event_handler’s dispatched event.

Done = 'Done'

Done stands for the state of doing exec

Error = 'Error'

Error stands for the state when errors occur.

Preceding = 'Preceding'

Preceding stands for the point before exec

Running = 'Running'

Running stands for the point during exec

Stopped = 'Stopped'

Stopped stands for the state when stop() is called.

class qianfan.trainer.consts.FinetuneStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

DatasetLoadFailed = 'DatasetLoadFailed'

数据集加载失败

DatasetLoadStopped = 'DatasetLoadStopped'

数据集停止加载

DatasetLoaded = 'DatasetLoaded'

数据集加载完成

DatasetLoading = 'DatasetLoading'

数据集加载中

ModelPublishFailed = 'ModelPublishFailed'

模型发布失败

ModelPublished = 'ModelPublished'

模型发布成功

ModelPublishing = 'ModelPublishing'

模型发布中,对应获取模型运行时的Creating

TrainCreated = 'TrainCreated'

任务创建,初始化

TrainFailed = 'TrainFailed'

训练任务失败,对应训练任务运行时API的状态的Failed

TrainFinished = 'TrainFinished'

训练完成 对应训练任务运行时API的状态的Done

TrainStopped = 'TrainStopped'

训练任务失败,对应训练任务运行时API的状态的Stop

Training = 'Training'

训练中 对应训练任务运行时API状态的Running

Unknown = 'Unknown'

未知状态

class qianfan.trainer.consts.ServiceStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Created = 'Created'

任务创建,初始化

DeployFailed = 'DeployFailed'

模型服务发布失败

DeployStopped = 'DeployStopped'

服务发布任务停止

Deployed = 'Deployed'

模型服务发布成功

Deploying = 'Deploying'

模型服务发布中

Unknown = 'Unknown'

未知状态

class qianfan.trainer.consts.ServiceType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: str, Enum

Chat = 'Chat'

Corresponding to the ChatCompletion

Completion = 'Completion'

Corresponding to the Completion

Embedding = 'Embedding'

Corresponding to the Embedding

Text2Image = 'Text2Image'

Corresponding to the `Text2Image

qianfan.trainer.event module

class qianfan.trainer.event.Event(action_id: Optional[str], state: ActionState, description: Optional[str] = None, data: Any = None)[source]

Bases: object

Event is the event container for the various nodes in the execution process of Action, and for each different Action, it can be abstracted into five common ActionStates. For multi-Action tasks at the Pipeline level, numerous Events will be generated during the process. Through EventHandler, custom callback events can be registered and listened to, enabling the insertion of various types of callbacks or intermediate task functions in the Pipeline nodes.

action_id: Optional[str] = None
action_state: ActionState
data: Any = None
description: Optional[str] = None
class qianfan.trainer.event.EventHandler[source]

Bases: object

EventHandler serves as a mechanism for registering and listening to custom callback events in the execution process of Actions. It facilitates the management of events occurring at different nodes during the execution of Actions within a Pipeline.

dispatch(event: Event) None[source]

_summary_

Parameters:
event (Event):

event to dispatch to user custom handler

qianfan.trainer.event.dispatch_event(event_handler: Optional[EventHandler] = None, event: Optional[Event] = None) None[source]

method to dispatch event from the event handler.

Args:
event_handler (Optional[EventHandler], optional):

event handler. Defaults to None.

event (Optional[Event], optional):

runtime generated event instance. Defaults to None.

qianfan.trainer.finetune module

class qianfan.trainer.finetune.LLMFinetune(train_type: str, dataset: Any, train_config: Optional[TrainConfig] = None, deploy_config: Optional[DeployConfig] = None, event_handler: Optional[EventHandler] = None, base_model: Optional[str] = None, **kwargs: Any)[source]

Bases: Trainer

Class implements the SFT training pipeline with several actions. Use run() to synchronously run the training pipeline until the model training is finished.

property output: Any
resume(**kwargs: Dict) LLMFinetune[source]

LLMFinetune resume method.

Returns:

LLMFinetune: _description_

run(**kwargs: Any) Trainer[source]

_summary_ run a pipeline to run the fine-tune process.

Parameters:
**kwargs:

Any additional keyword arguments. {“input”: {}} could be specified if needed

Raises:

InvalidArgumentError: no pipeline bind to run.

Returns:
Trainer:

self, for chain invocation.

property status: str

LLMFinetune status getter.

Returns:

str: status for LLMFinetune, mapping from state of actions in pipeline.

stop(**kwargs: Dict) Trainer[source]

stop method of LLMFinetune. LLMFinetune will stop all actions in pipeline. In fact, LLMFinetune only take one pipeline, so it will be equal to stop first of ppls.

Returns:
Trainer:

self, for chain invocation.

qianfan.trainer.model module

class qianfan.trainer.model.Model(id: Optional[int] = None, version_id: Optional[int] = None, task_id: Optional[int] = None, job_id: Optional[int] = None, name: Optional[str] = None)[source]

Bases: ExecuteSerializable[Dict, Union[QfResponse, Iterator[QfResponse]]]

batch_run_on_qianfan(dataset: Dataset, **kwargs: Any) Dataset[source]

create batch run using specific dataset on qianfan by evaluation ability of platform

Parameters:
dataset (Dataset):

A dataset instance which indicates a dataset on qianfan platform

**kwargs (Any):

Arbitrary keyword arguments

Returns:

Dataset: batch result contained in dataset

deploy(deploy_config: DeployConfig, **kwargs: Any) Service[source]

model deploy

Parameters:
deploy_config (DeployConfig):

model service deploy config

Returns:

Service: model service instance

dumps() Optional[bytes][source]

Serialize the model to bytes.

Returns:
Optional[bytes]:

bytes of this model

exec(input: Optional[Dict] = None, **kwargs: Dict) Union[QfResponse, Iterator[QfResponse]][source]

model execution, for different model service type, please input a dict with different keys. Concretely, take

`input={“messages”: [{“role”: “user”,

“content”: “hello world”}]}`

as input, when the model is a chat io Model.

Parameters:
input (Optional[Dict], optional):

input data . Defaults to None.

Raises:

InternalError: model with no service deployed is unable to call exec

Returns:
Union[QfResponse, Iterator[QfResponse]]:

output data

id: Optional[int]

remote model id

job_id: Optional[int]

train job id

loads(data: bytes) Any[source]

load model instance from bytes

Parameters:
data (bytes):

bytes of this model

Returns:

Any: model instance

name: Optional[str] = None

model name

publish(name: str = '', **kwargs: Any) Model[source]

model publish, before deploying a model, it should be published.

Parameters:
name str:

model name. Defaults to “m_{task_id}{job_id}”.

service: Optional[Service] = None

model service

task_id: Optional[int]

train tkas id

version_id: Optional[int]

remote model version id

class qianfan.trainer.model.Service(id: Optional[int] = None, endpoint: Optional[str] = None, model: Optional[Union[Model, str]] = None, deploy_config: Optional[DeployConfig] = None, service_type: Optional[ServiceType] = None)[source]

Bases: ExecuteSerializable[Dict, Union[QfResponse, Iterator[QfResponse]]]

deploy(**kwargs: Any) Service[source]
deploy_config: Optional[DeployConfig]

service deploy config

dumps() Optional[bytes][source]

serialize the model instance to bytes

Returns:
Optional[bytes]:

bytes of the model instance

endpoint: Optional[str]

service endpoint to call

exec(input: Optional[Dict] = None, **kwargs: Dict) Union[QfResponse, Iterator[QfResponse]][source]
Parameters:
input (Optional[Union[str, List[str], List[dict]]], optional):

input of execution of service. Defaults to None.

**kwargs: additional args Dict

Raises:

InternalError: unsupported service type

Returns:
Union[str, List[str], List[dict]]:

output

get_res() Union[ChatCompletion, Completion, Embedding, Text2Image][source]

convert to the specific model resources. e.g. ChatCompletion, Completion, Embeddings, Text2Image

Returns:
Union[ChatCompletion, Completion, Embedding, Text2Image]:

resource object

id: Optional[int]

remote service id

loads(data: bytes) Any[source]

load service instance from bytes

Parameters:
data (bytes):

bytes of model instance

Returns:

Any: model instance

model: Optional[Model]

service model instance

service_type: Optional[ServiceType]

service type, for user use service as a execution must specify

property status: str

get the service status

Raises:

InternalError: id not found

Returns:

console_const.ServiceStatus

qianfan.trainer.model.model_deploy(model: Model, deploy_config: DeployConfig, **kwargs: Any) Service[source]

model deployment implement, a polling loop will be called after deploy task created.

Parameters:
model (Model):

model to deploy

deploy_config (DeployConfig):

service deploy config, mainly including replicas and pool type.

Returns:

Service: deployed service with endpoint to call