美文网首页
kubeflow端到端的用户案例-整理

kubeflow端到端的用户案例-整理

作者: sknfie | 来源:发表于2025-04-11 09:29 被阅读0次
import json
from kfp import dsl
from kfp.dsl import ContainerOp, VolumeOp
from kfp.components import create_component_from_func
from string import Template

# 定义一个函数来转换Katib Experiment结果为参数字符串
def convert_mnist_experiment_result(experiment_result) -> str:
    r = json.loads(experiment_result)
    args = []
    for hp in r:
        args.append(f"--{hp['name']}={hp['value']}")
    return " ".join(args)

# 将函数转换为Kubeflow组件
convert_mnist_experiment_result_op = create_component_from_func(
    convert_mnist_experiment_result, 
    base_image="python:3.7", 
    packages_to_install=["json"]
)

@dsl.pipeline(
    name="End to end pipeline",
    description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
    name="mnist-workflow.uid",
    namespace="kubeflow",
    step=2000
):
    # 第一步:创建一个 Katib Experiment 进行超参数优化
    objective_config = {
        "type": "minimize",
        "goal": 0.001,
        "objectiveMetricName": "loss"
    }
    
    algorithm_config = {
        "algorithmName": "random"
    }
    
    parameters = [
        {"name": "learning_rate", "parameterType": "double", "feasibleSpace": {"min": "0.01", "max": "0.1"}},
        # 添加其他超参数配置
    ]
    
    # Katib Experiment配置(这里需要根据实际情况填写)
    katib_experiment_config = {
        "apiVersion": "kubeflow.org/v1beta1",
        "kind": "Experiment",
        "metadata": {
            "name": name,
            "namespace": namespace
        },
        "spec": {
            "objective": objective_config,
            "algorithm": algorithm_config,
            "parameters": parameters,
            # 其他必要的配置
        }
    }
    
    # 创建Katib Experiment
    katib_experiment = dsl.ResourceOp(
        name="katib-experiment",
        k8s_resource=katib_experiment_config,
        success_condition="status.conditions.type.Succeeded"
    )
    
    # 第二步:使用Katib Experiment的结果来训练模型
    train_args = convert_mnist_experiment_result_op(katib_experiment.output)
    
    tf_job_config = {
        "apiVersion": "kubeflow.org/v1beta1",
        "kind": "TFJob",
        "metadata": {
            "name": name,
            "namespace": namespace
        },
        "spec": {
            "tfReplicaSpecs": {
                "Chief": {
                    "replicas": 1,
                    "restartPolicy": "OnFailure",
                    "template": {
                        "spec": {
                            "containers": [
                                {
                                    "command": [
                                        "sh",
                                        "-c"
                                    ],
                                    "args": [
                                        f"python /opt/model.py {train_args} --tf-train-steps={step}"
                                    ],
                                    "image": "liuhougangxa/tf-estimator-mnist",
                                    "name": "tensorflow"
                                }
                            ],
                            "volumes": [
                                {
                                    "name": "model-pvc",
                                    "persistentVolumeClaim": {
                                        "claimName": "modelpvc"
                                    }
                                }
                            ]
                        }
                    }
                },
                "Worker": {
                    "replicas": 3,
                    "restartPolicy": "OnFailure",
                    # Worker配置与Chief类似
                }
            }
        }
    }
    
    train = dsl.ResourceOp(
        name="train",
        k8s_resource=tf_job_config,
        success_condition="status.replicaStatuses.Worker.succeeded=3,status.replicaStatuses.Chief.succeeded=1"
    ).after(katib_experiment)
    
    # 第三步:在完成模型训练后,使用 KFServing 发布一个 InferenceService 服务
    inference_service_template = Template("""
    {
        "apiVersion": "serving.kubeflow.org/v1alpha2",
        "kind": "InferenceService",
        "metadata": {
            "name": "mnist-$workflow.uid"
        },
        "spec": {
            "default": {
                "predictor": {
                    "tensorflow": {
                        "storageUri": "pvc://modelpvc/"
                    }
                }
            }
        }
    }
    """)
    
    inference_service_json = inference_service_template.substitute(workflow_uid=name)
    inference_service = json.loads(inference_service_json)
    
    inference = dsl.ResourceOp(
        name="inference",
        k8s_resource=inference_service,
        success_condition="status.url"
    ).after(train)

# 提交Pipeline
client = kfp.Client()
run = client.create_run_from_pipeline_func(mnist_pipeline, arguments={})

相关文章

网友评论

      本文标题:kubeflow端到端的用户案例-整理

      本文链接:https://www.haomeiwen.com/subject/pgtcbjtx.html