将长时间运行的批处理任务(如 MapReduce)暴露为标准的同步风格 REST API,本身就是一个架构上的矛盾。用户期望快速响应,而后台任务可能需要数小时才能完成。在 MLOps 场景中,通过 Kubeflow Pipelines 运行这类任务时,这个矛盾尤为突出。一个简单的代理转发请求,然后让客户端长时间等待或进行复杂的轮询,是脆弱且低效的。我们需要一个更稳健的中间层——一个智能的、有状态的异步任务网关。
这个网关的核心职责是解耦 API 调用和后台任务的生命周期。它必须提供一个简洁的 API 接口,能够接收任务请求,立即返回一个任务标识符,然后可靠地在后台与 Kubernetes 和 Kubeflow 进行交互,监控任务状态,并在任务完成后提供结果查询。选择 Rust 来构建这个关键基础设施组件并非偶然,而是基于其内存安全、卓越的并发性能以及与 Kubernetes 生态系统(通过 kube-rs
库)无缝集成的能力,这些都是生产环境所必需的。
架构决策:在轮询、回调与状态监视之间权衡
在设计这样一个网关时,我们面临几个关键的架构选择,每个选择都有其明确的优缺点。
方案 A:客户端轮询代理 (Client-Side Polling Proxy)
这是最直接的实现方式。网关接收到 POST /jobs
请求后,使用 Kubeflow SDK 或直接操作 CRD 创建一个 PipelineRun
资源,并将生成的 PipelineRun
名称作为任务 ID 返回给客户端。客户端随后需要定期调用 GET /jobs/{id}/status
来查询任务状态。
- 优点:
- 实现简单,网关本身是无状态的,易于水平扩展。
- API 契约清晰,对客户端来说逻辑直观。
- 缺点:
- 控制平面压力: 大量客户端的频繁轮询会对 Kubernetes API Server 造成巨大压力,尤其是在任务量大的情况下。
- 资源浪费: 客户端和网关都在无效的轮询上浪费了大量 CPU 和网络资源。
- 延迟: 状态更新不是实时的,其及时性取决于客户端的轮询频率。
在真实项目中,这种方案很快就会成为性能瓶颈和运维噩梦。对 K8s API Server 的过度请求甚至可能导致整个集群不稳定。
方案 B:事件驱动回调网关 (Event-Driven Callback Gateway)
此方案试图解决轮询的低效问题。网关在创建 PipelineRun
后,会利用 Kubernetes 的 watch
机制来监视该资源的状态变化。当任务完成(成功或失败)时,网关会调用客户端在初始请求中预先注册的回调 URL (Webhook)。
- 优点:
- 高效: 状态更新是近乎实时的,避免了无效轮询。
- 低耦合: 网关与 K8s API Server 之间是高效的事件驱动通信。
- 缺点:
- 客户端复杂性: 要求客户端必须暴露一个公网可访问的 HTTP 端点来接收回调,这在很多网络环境中是不可行或不安全的。
- 可靠性问题: 如果回调失败(客户端宕机、网络问题),如何进行重试?这会给网关增加复杂的状态管理和重试逻辑。
- 安全性: Webhook 机制需要额外的安全措施,如签名验证,以防止恶意调用。
这种模式适用于服务端到服务端的内部集成,但作为通用的公共 API 网关,它对客户端的要求过高。
最终选择:基于 Rust 和 watch
机制的状态监视网关
我们选择一个混合方案,结合前两者的优点,同时规避其核心缺点。该网关在内部是有状态的,它通过高效的 watch
机制监控 Kubeflow 任务,并将状态缓存在内存中。对外,它依然提供一个简单的、可供客户端轮询的 API。
sequenceDiagram participant Client participant Rust Gateway participant Kubernetes API Server participant Kubeflow PipelineRun Client->>+Rust Gateway: POST /jobs (with MapReduce spec) Rust Gateway->>+Kubernetes API Server: CREATE PipelineRun CRD Kubernetes API Server-->>-Rust Gateway: PipelineRun created (e.g., 'job-xyz') Rust Gateway-->>-Client: 202 Accepted (jobId: 'job-xyz') Note right of Rust Gateway: Gateway starts watching
PipelineRun 'job-xyz' events. Kubernetes API Server->>Rust Gateway: Watch Event: 'job-xyz' is Running Note right of Rust Gateway: Update in-memory state for 'job-xyz' to RUNNING. loop Status Check Client->>+Rust Gateway: GET /jobs/job-xyz Rust Gateway-->>-Client: 200 OK (status: 'RUNNING') end Kubernetes API Server->>Kubeflow PipelineRun: Task completes Kubernetes API Server->>Rust Gateway: Watch Event: 'job-xyz' is Succeeded Note right of Rust Gateway: Update in-memory state to SUCCEEDED
and cache result location. Client->>+Rust Gateway: GET /jobs/job-xyz Rust Gateway-->>-Client: 200 OK (status: 'SUCCEEDED', resultsUrl: '...')
这个架构的核心优势在于:
- 对内高效: 网关与 K8s API Server 之间使用长连接的
watch
,避免了轮询开销。 - 对外简单: 客户端仍然使用简单的 RESTful 轮询模型,无需暴露回调端点。
- 性能与安全: Rust 的
tokio
运行时可以高效地管理成千上万个并发的watch
连接,而其类型系统和所有权模型保证了内存安全,这对于一个常驻内存、管理状态的系统至关重要。
核心实现
我们将使用 axum
作为 Web 框架,kube-rs
与 Kubernetes 交互,tokio
提供异步运行时,并使用 DashMap
作为线程安全的内存状态存储。
1. 项目结构与依赖
首先,配置 Cargo.toml
:
[package]
name = "kubeflow-mapreduce-gateway"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = "0.7"
tokio = { version = "1", features = ["full"] }
kube = { version = "0.88", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22", features = ["v1_29"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9"
dashmap = "5.5"
uuid = { version = "1.7", features = ["v4"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
thiserror = "1.0"
anyhow = "1.0"
futures-util = "0.3"
2. 定义 API 和状态
我们需要定义任务的状态和它在内存中的表示。
src/state.rs
:
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use dashmap::DashMap;
// 公开给用户的任务状态
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub enum JobStatus {
Pending,
Running,
Succeeded,
Failed,
}
// 内部存储的任务详情
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Job {
pub id: String,
pub status: JobStatus,
pub submitted_at: u64,
// 任务完成后,存储结果的URL或摘要
pub result_info: Option<String>,
// 存储失败原因
pub error_message: Option<String>,
}
// 整个应用的状态,使用 Arc<DashMap> 以实现线程安全共享
pub type AppState = Arc<DashMap<String, Job>>;
3. Kubernetes PipelineRun
监控器
这是网关的核心逻辑。一个后台任务会持续监控 PipelineRun
资源的变化,并更新共享的 AppState
。
src/watcher.rs
:
use crate::state::{AppState, Job, JobStatus};
use futures_util::stream::StreamExt;
use kube::{
api::{Api, ListParams, Patch, PatchParams},
runtime::{watcher, WatchStreamExt},
Client,
};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use serde_json::Value;
use tracing::{error, info, warn};
// 这是一个简化的 PipelineRun CRD 结构,只包含我们关心的字段
#[derive(kube::CustomResource, Clone, Debug, serde::Deserialize, serde::Serialize)]
#[kube(
group = "kubeflow.org",
version = "v1beta1",
kind = "PipelineRun",
plural = "pipelineruns"
)]
#[serde(rename_all = "camelCase")]
pub struct PipelineRunSpec {
// 实际的 PipelineRun 结构比这复杂得多,这里为了演示而简化
}
pub async fn run_watcher(state: AppState, client: Client, namespace: String) -> anyhow::Result<()> {
let pipelineruns: Api<PipelineRun> = Api::namespaced(client, &namespace);
let lp = ListParams::default();
info!("Starting PipelineRun watcher in namespace '{}'", namespace);
let mut stream = watcher(pipelineruns, lp).applied_objects().boxed();
while let Some(pr) = stream.next().await {
match pr {
Ok(pr) => {
let pr_name = match pr.metadata.name.clone() {
Some(name) => name,
None => {
warn!("Watcher received a PipelineRun with no name. Skipping.");
continue;
}
};
// 只处理由我们网关管理 Job
if !state.contains_key(&pr_name) {
continue;
}
// 从 PipelineRun 的 status 字段解析任务状态
// 注意:这部分逻辑高度依赖于 Kubeflow PipelineRun CRD 的具体结构
let current_status = pr
.status
.as_ref()
.and_then(|s| s.get("phase"))
.and_then(|v| v.as_str());
info!("Watched event for PipelineRun '{}': status is {:?}", &pr_name, current_status);
let new_status = match current_status {
Some("Running") => JobStatus::Running,
Some("Succeeded") => JobStatus::Succeeded,
Some("Failed") | Some("Error") => JobStatus::Failed,
_ => JobStatus::Pending,
};
// 更新内存中的状态
if let Some(mut job) = state.get_mut(&pr_name) {
if job.status != new_status {
info!("Updating job '{}' status from {:?} to {:?}", &pr_name, job.status, new_status);
job.status = new_status;
// 如果任务失败,记录错误信息
if job.status == JobStatus::Failed {
job.error_message = pr
.status
.as_ref()
.and_then(|s| s.get("message"))
.and_then(|v| v.as_str())
.map(String::from);
}
}
}
}
Err(e) => {
error!("Watcher stream error: {}", e);
// 在生产环境中,这里应该有重连和退避策略
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
Ok(())
}
4. API 端点实现
使用 axum
创建 API 路由,并与 AppState
交互。
src/main.rs
:
mod state;
mod watcher;
use crate::state::{AppState, Job, JobStatus};
use crate::watcher::{run_watcher, PipelineRun};
use axum::{
extract::{Path, State},
http::StatusCode,
response::Json,
routing::{get, post},
Router,
};
use kube::{
api::{Api, PostParams},
Client,
};
use serde_json::{json, Value};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use dashmap::DashMap;
use tracing::info;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
// 1. 初始化 Kubernetes 客户端
let client = Client::try_default().await?;
let namespace = std::env::var("NAMESPACE").unwrap_or_else(|_| "default".into());
// 2. 初始化应用状态
let app_state: AppState = Arc::new(DashMap::new());
// 3. 在后台启动 watcher
let watcher_state = app_state.clone();
let watcher_client = client.clone();
let watcher_namespace = namespace.clone();
tokio::spawn(async move {
if let Err(e) = run_watcher(watcher_state, watcher_client, watcher_namespace).await {
tracing::error!("Watcher task failed: {}", e);
}
});
// 4. 定义 API 路由
let app = Router::new()
.route("/jobs", post(submit_job))
.route("/jobs/:id", get(get_job_status))
.with_state((app_state, client, namespace));
// 5. 启动 HTTP 服务
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
info!("Gateway listening on {}", listener.local_addr()?);
axum::serve(listener, app).await?;
Ok(())
}
// POST /jobs - 提交新任务
async fn submit_job(
State((state, client, namespace)): State<(AppState, Client, String)>,
Json(payload): Json<Value>, // 接收任意 JSON 作为任务定义
) -> Result<Json<Value>, StatusCode> {
let job_id = format!("mapreduce-job-{}", uuid::Uuid::new_v4().to_string());
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
// 创建初始 Job 状态
let job = Job {
id: job_id.clone(),
status: JobStatus::Pending,
submitted_at: now,
result_info: None,
error_message: None,
};
state.insert(job_id.clone(), job);
// 构建 PipelineRun CRD
// 这是一个非常简化的示例。在实际应用中,这部分会更复杂,
// 可能需要模板引擎 (如 Tera) 来根据 payload 生成 PipelineRun 的 YAML。
let pipelinerun_manifest: PipelineRun = serde_json::from_value(json!({
"apiVersion": "kubeflow.org/v1beta1",
"kind": "PipelineRun",
"metadata": {
"name": job_id,
"namespace": namespace,
},
"spec": {
// 这里应该根据 payload 来填充实际的 pipeline spec
"pipelineSpec": {
// ... MapReduce pipeline definition here ...
}
}
})).map_err(|e| {
tracing::error!("Failed to deserialize PipelineRun manifest: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let api: Api<PipelineRun> = Api::namespaced(client, &namespace);
let pp = PostParams::default();
// 向 Kubernetes 提交资源
match api.create(&pp, &pipelinerun_manifest).await {
Ok(_) => {
info!("Successfully submitted PipelineRun '{}'", &job_id);
Ok(Json(json!({ "jobId": job_id })))
}
Err(e) => {
tracing::error!("Failed to create PipelineRun for job '{}': {}", &job_id, e);
// 清理已创建的内存状态
state.remove(&job_id);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
// GET /jobs/:id - 查询任务状态
async fn get_job_status(
Path(id): Path<String>,
State((state, _, _)): State<(AppState, Client, String)>,
) -> Result<Json<Job>, StatusCode> {
match state.get(&id) {
Some(job) => Ok(Json(job.clone())),
None => Err(StatusCode::NOT_FOUND),
}
}
生产环境考量与局限性
这个实现展示了核心架构,但在部署到生产环境之前,还有几个关键问题需要解决。
状态持久化: 当前的状态完全存储在内存的
DashMap
中。如果网关 Pod 重启,所有正在运行的任务状态都会丢失。一个生产级的实现需要将状态持久化到外部存储,如 Redis 或 etcd。当网关重启时,它可以从持久化存储中恢复状态,并重新建立对所有未完成任务的watch
。高可用性: 单实例网关是单点故障。可以部署多个实例,但需要一个领导者选举机制(例如,使用 Kubernetes 的 Lease 对象)来确保只有一个实例运行
watcher
循环并更新持久化状态,而其他实例可以处理只读的GET
请求或作为热备。身份认证与授权: 当前的 API 是完全开放的。需要集成认证机制(如 JWT、OAuth2)来保护端点,并可能需要与 Kubernetes RBAC 集成,以确保发起请求的用户有权在目标命名空间中创建
PipelineRun
。结果存储与获取: 文章没有详细说明任务完成后如何处理结果。一个完整的方案需要
PipelineRun
将其输出(例如,指向 S3 存储桶中结果文件的路径)写入其status
或某个ConfigMap
中。watcher
在检测到任务成功后,需要解析这些信息并更新Job
结构中的result_info
字段。
这个基于 Rust 的网关架构,在性能、资源效率和与云原生生态的集成方面,提供了一个坚实的基础。它解决了同步 API 与异步后端任务之间的核心矛盾,为 MLOps 平台或其他批处理系统提供了一个可靠、可扩展的入口。它的局限性主要在于状态管理和高可用性,这些是通过引入外部依赖(如 Redis)和更复杂的部署模式(如领导者选举)来解决的,这标志着从一个功能原型向一个健壮的生产级服务的演进路径。