import json
from kfp import dsl
from kfp.dsl import ContainerOp, VolumeOp
from kfp.components import create_component_from_func
from string import Template
# 定义一个函数来转换Katib Experiment结果为参数字符串
def convert_mnist_experiment_result(experiment_result) -> str:
r = json.loads(experiment_result)
args = []
for hp in r:
args.append(f"--{hp['name']}={hp['value']}")
return " ".join(args)
# 将函数转换为Kubeflow组件
convert_mnist_experiment_result_op = create_component_from_func(
convert_mnist_experiment_result,
base_image="python:3.7",
packages_to_install=["json"]
)
@dsl.pipeline(
name="End to end pipeline",
description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
name="mnist-workflow.uid",
namespace="kubeflow",
step=2000
):
# 第一步:创建一个 Katib Experiment 进行超参数优化
objective_config = {
"type": "minimize",
"goal": 0.001,
"objectiveMetricName": "loss"
}
algorithm_config = {
"algorithmName": "random"
}
parameters = [
{"name": "learning_rate", "parameterType": "double", "feasibleSpace": {"min": "0.01", "max": "0.1"}},
# 添加其他超参数配置
]
# Katib Experiment配置(这里需要根据实际情况填写)
katib_experiment_config = {
"apiVersion": "kubeflow.org/v1beta1",
"kind": "Experiment",
"metadata": {
"name": name,
"namespace": namespace
},
"spec": {
"objective": objective_config,
"algorithm": algorithm_config,
"parameters": parameters,
# 其他必要的配置
}
}
# 创建Katib Experiment
katib_experiment = dsl.ResourceOp(
name="katib-experiment",
k8s_resource=katib_experiment_config,
success_condition="status.conditions.type.Succeeded"
)
# 第二步:使用Katib Experiment的结果来训练模型
train_args = convert_mnist_experiment_result_op(katib_experiment.output)
tf_job_config = {
"apiVersion": "kubeflow.org/v1beta1",
"kind": "TFJob",
"metadata": {
"name": name,
"namespace": namespace
},
"spec": {
"tfReplicaSpecs": {
"Chief": {
"replicas": 1,
"restartPolicy": "OnFailure",
"template": {
"spec": {
"containers": [
{
"command": [
"sh",
"-c"
],
"args": [
f"python /opt/model.py {train_args} --tf-train-steps={step}"
],
"image": "liuhougangxa/tf-estimator-mnist",
"name": "tensorflow"
}
],
"volumes": [
{
"name": "model-pvc",
"persistentVolumeClaim": {
"claimName": "modelpvc"
}
}
]
}
}
},
"Worker": {
"replicas": 3,
"restartPolicy": "OnFailure",
# Worker配置与Chief类似
}
}
}
}
train = dsl.ResourceOp(
name="train",
k8s_resource=tf_job_config,
success_condition="status.replicaStatuses.Worker.succeeded=3,status.replicaStatuses.Chief.succeeded=1"
).after(katib_experiment)
# 第三步:在完成模型训练后,使用 KFServing 发布一个 InferenceService 服务
inference_service_template = Template("""
{
"apiVersion": "serving.kubeflow.org/v1alpha2",
"kind": "InferenceService",
"metadata": {
"name": "mnist-$workflow.uid"
},
"spec": {
"default": {
"predictor": {
"tensorflow": {
"storageUri": "pvc://modelpvc/"
}
}
}
}
}
""")
inference_service_json = inference_service_template.substitute(workflow_uid=name)
inference_service = json.loads(inference_service_json)
inference = dsl.ResourceOp(
name="inference",
k8s_resource=inference_service,
success_condition="status.url"
).after(train)
# 提交Pipeline
client = kfp.Client()
run = client.create_run_from_pipeline_func(mnist_pipeline, arguments={})
网友评论