基于 Flink Axum 与 Dgraph 构建实时图特征计算与服务链路


项目初期,我们的风控模型依赖于一套基于关系型数据库的特征工程体系。这套体系在处理用户个体行为或简单的一度关联时表现尚可,但随着欺诈手段的升级,我们发现大量的风险来自于团伙作案。这些团伙通过共享设备、IP地址、支付方式等资源,构建出复杂的、隐蔽的关联网络。要在秒级延迟内识别出这种网络中的高风险交易,传统的关系型数据模型力不从心。一次需要JOIN七八张表的查询,延迟能轻易飙升到数百毫秒,这在要求50ms内响应的风控场景下是不可接受的。

痛点非常明确:我们需要一个能够实时捕捉、计算并服务于图结构特征的系统。这个系统必须能够处理高吞吐的事件流,并在毫秒级延迟内响应复杂的、多跳的图查询。

初步的构想是建立一个三层架构:事件接收与处理层、图存储与计算层、以及特征服务层。

graph TD
    subgraph "事件源 (Kafka)"
        A[Transaction Events]
        B[Login Events]
        C[Device Info Events]
    end

    subgraph "实时计算层 (Apache Flink)"
        D[Flink Job: Event Consumer] --> E{KeyedProcessFunction: State Management};
        E --> F[Graph Feature Computation];
        F --> G{Async IO: Enrich from Dgraph};
        G --> F;
        F --> H[Custom Dgraph Sink];
    end

    subgraph "图存储层 (Dgraph)"
        I[(Dgraph Cluster)]
    end

    subgraph "低延迟服务层 (Rust/Axum)"
        J[Axum GraphQL API] --> K{Dgraph Client Pool};
        K --> I;
        L[ML Model Service] --> J;
    end

    A --> D;
    B --> D;
    C --> D;
    H --> I;

    style L fill:#f9f,stroke:#333,stroke-width:2px

这个架构的核心思路是:利用Apache Flink消费实时的业务事件流,在流处理过程中进行状态化的图特征计算(例如,某设备一小时内关联的新用户数),并将计算结果和实体关系实时更新到Dgraph图数据库中。最后,通过一个用Rust和Axum构建的高性能GraphQL API服务,将这些图特征以极低的延迟提供给下游的机器学习模型。

技术选型决策如下:

  • Apache Flink: 业界处理有状态流计算的标配。其强大的状态管理、Exactly-Once语义保证以及高吞吐能力是这个场景下的不二之选。特别是ProcessFunction,它能让我们访问底层状态和定时器,实现复杂的业务逻辑。
  • Dgraph: 一个原生的分布式图数据库。选择它的主要原因是其对GraphQL的原生支持(现在叫GraphQL+-,但核心思想一致)和针对深度图遍历的性能优化。相比于在其他数据库上模拟图,原生图库在多跳查询上优势巨大。
  • Axum (Rust): 服务层的性能至关重要。我们需要一个能榨干硬件性能、没有GC停顿、且内存安全的技术栈。Rust是完美的选择。在众多Web框架中,Axum因其与Tokio生态的深度集成、模块化设计和优秀的性能表现而胜出。

第一步:Dgraph 图模型设计

一切始于数据模型。在Dgraph中,我们通过GraphQL Schema来定义图的结构。一个坏的模型会让后续所有工作事倍功半。在真实项目中,我们的模型要复杂得多,但其核心可以简化为以下结构:

# Dgraph Schema
type User {
    user_id: String! @id @search(by: [hash])
    risk_score: Float
    created_at: DateTime
    transactions: [Transaction] @hasInverse(field: by_user)
    used_devices: [Device] @hasInverse(field: used_by_users)
}

type Device {
    device_id: String! @id @search(by: [hash])
    first_seen: DateTime
    used_by_users: [User] @hasInverse(field: used_devices)
    transactions: [Transaction] @hasInverse(field: on_device)
}

type Transaction {
    txn_id: String! @id
    amount: Float
    status: String @search(by: [exact])
    timestamp: DateTime
    by_user: User!
    on_device: Device!
}

这里的关键点在于:

  1. @id 指令定义了每个类型的主键,这对于后续的数据更新(Upsert)至关重要。
  2. @search(by: [hash])user_iddevice_id等高基数字段创建了哈希索引,这对于通过ID进行精确查找的性能是决定性的。
  3. @hasInverse 创建了双向边,这使得我们可以从User遍历到Device,也可以方便地从Device反向查询所有关联的User,这对于团伙分析非常有用。

第二步:构建低延迟特征服务 (Axum)

在Flink管道就绪之前,先构建API服务层是一个好习惯。这能让我们先定义好服务契约,并独立地进行测试和压测。

Cargo.toml 依赖配置:

[dependencies]
tokio = { version = "1", features = ["full"] }
axum = "0.6"
dgraph-tonic = "0.9.0" # Dgraph官方的gRPC客户端
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
async-graphql = "5.0"
async-graphql-axum = "5.0"
once_cell = "1.17"

我们的目标是创建一个GraphQL端点,它能接收一个user_id,然后返回该用户关联设备在过去24小时内关联的其他用户数量,这是一个典型的二度关联(User -> Device -> User)特征。

main.rs 核心实现:

use axum::{
    routing::post,
    Router, Extension, http::StatusCode, response::{IntoResponse, Response}
};
use async_graphql::{Object, Schema, Context, EmptySubscription, SimpleObject, FieldResult};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use dgraph_tonic::{Client, Query};
use std::sync::Arc;
use tokio::net::TcpListener;
use tracing::info;

// Dgraph客户端需要被多线程共享,使用Arc
type DgraphClient = Arc<Client>;

// 全局静态Dgraph客户端,实际生产中会使用更复杂的连接池管理
static DGRAPH_CLIENT: once_cell::sync::Lazy<DgraphClient> = once_cell::sync::Lazy::new(|| {
    let client = Client::new("http://localhost:9080").expect("Failed to create Dgraph client");
    Arc::new(client)
});

// 定义GraphQL查询的返回类型
#[derive(SimpleObject, serde::Deserialize, Debug)]
struct UserGraphFeatures {
    user_id: String,
    shared_device_user_count_24h: u32,
}

// 定义GraphQL的Query根节点
struct GQuery;

#[Object]
impl GQuery {
    async fn getUserFeatures(&self, ctx: &Context<'_>, user_id: String) -> FieldResult<UserGraphFeatures> {
        let client = ctx.data::<DgraphClient>()?;

        // 这里的坑在于:Dgraph的GraphQL+-查询语言功能强大但也复杂。
        // 我们需要找到目标用户,再找到他关联的设备,
        // 再从这些设备找到关联的其他用户(排除自己),并计数。
        let query = format!(
            r#"
            query userFeatures($userId: string) {{
              var(func: eq(user_id, $userId)) {{
                ~used_by_users @filter(NOT eq(user_id, $userId)) {{
                  COUNT_VAR as uid
                }}
              }}

              features(func: eq(user_id, $userId)) {{
                user_id
                shared_device_user_count_24h: val(COUNT_VAR)
              }}
            }}
            "#
        );
        
        // 实际项目中,时间窗口过滤会更复杂,可能需要用到 @filter(ge(timestamp, ...))
        // 这里为简化,只演示图遍历逻辑。
        
        let mut txn = client.new_read_only_txn();
        let mut vars = std::collections::HashMap::new();
        vars.insert("$userId".to_string(), user_id.clone());
        
        let res = txn.query_with_vars(query, vars).await
            .map_err(|e| async_graphql::FieldError::new(format!("Dgraph query failed: {}", e)))?;

        // 解析返回的JSON
        let features_vec: Vec<UserGraphFeatures> = serde_json::from_slice(&res.json)?;

        features_vec.into_iter().next()
            .ok_or_else(|| async_graphql::FieldError::new("User not found or no features available"))
    }
}

// GraphQL请求处理器
async fn graphql_handler(
    schema: Extension<AppSchema>,
    req: GraphQLRequest,
) -> GraphQLResponse {
    schema.execute(req.into_inner()).await.into()
}

type AppSchema = Schema<GQuery, async_graphql::EmptyMutation, EmptySubscription>;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt()
        .with_env_filter("info")
        .init();

    let schema = Schema::build(GQuery, async_graphql::EmptyMutation, EmptySubscription)
        .data(DGRAPH_CLIENT.clone()) // 注入Dgraph客户端到GraphQL上下文中
        .finish();

    let app = Router::new()
        .route("/graphql", post(graphql_handler))
        .layer(Extension(schema));

    info!("GraphQL server listening on 0.0.0.0:8000");

    let listener = TcpListener::bind("0.0.0.0:8000").await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

// 错误处理的 boilerplate
impl IntoResponse for AppError {
    fn into_response(self) -> Response {
        (
            StatusCode::INTERNAL_SERVER_ERROR,
            format!("Something went wrong: {}", self.0),
        )
            .into_response()
    }
}

// 使用 thiserror 库来简化错误处理
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
struct AppError(#[from] Box<dyn std::error::Error + Send + Sync>);

这段代码的核心是getUserFeatures函数中的Dgraph查询。它使用了GraphQL+-的var块来执行一个子查询:首先找到目标用户,然后通过反向边~used_by_users找到所有关联的设备,再从这些设备找到所有关联的用户,并把他们的UID存入变量COUNT_VAR中。最后在主查询块中,我们通过val(COUNT_VAR)获取这个变量的计数值。这种查询方式比在客户端进行多次往返查询要高效得多。

这是整个系统的引擎。我们需要创建一个Flink作业,消费Kafka中的交易事件,然后计算特征,最后写入Dgraph。

pom.xml 主要依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>io.dgraph</groupId>
    <artifactId>dgraph-java-client</artifactId>
    <version>21.03.1</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.9.1</version>
</dependency>

数据模型 (POJO):

public class TransactionEvent {
    public String txn_id;
    public String user_id;
    public String device_id;
    public double amount;
    public long timestamp; // epoch milliseconds
}

核心处理逻辑 KeyedProcessFunction:
在真实项目中,直接在Flink算子里同步调用外部数据库是一个巨大的性能陷阱。这会导致背压,拖慢整个流处理管道。正确的做法是使用AsyncDataStream.asyncUnorderedWait配合AsyncFunction。但为了简化核心逻辑展示,这里先用一个同步调用的方式作为示例,并指出其局限性。

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

// 假设有一个DgraphService类封装了与Dgraph的交互
public class GraphFeatureProcessor extends KeyedProcessFunction<String, TransactionEvent, String> {

    private transient DgraphService dgraphService;
    private transient ValueState<Long> deviceUserCount;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 在open方法中初始化外部连接是最佳实践
        dgraphService = new DgraphService("localhost:9080");
        
        ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>(
            "deviceUserCount",
            Long.class
        );
        deviceUserCount = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(TransactionEvent event, Context ctx, Collector<String> out) throws Exception {
        // 这里的 key 是 device_id
        
        // 1. 查询Dgraph,检查该用户是否是该设备的新用户
        boolean isNewAssociation = dgraphService.isNewUserDeviceAssociation(event.user_id, event.device_id);

        if (isNewAssociation) {
            // 如果是新的关联,更新Flink状态
            Long currentCount = deviceUserCount.value();
            if (currentCount == null) {
                currentCount = 0L;
            }
            currentCount++;
            deviceUserCount.update(currentCount);

            // 2. 生成Dgraph Mutation,用于更新图
            // Dgraph的Mutation使用RDF三元组格式: <subject> <predicate> <object> .
            // 我们需要为User, Device, Transaction创建或更新节点,并建立它们之间的关系
            String userUid = "_:" + event.user_id;
            String deviceUid = "_:" + event.device_id;
            String txnUid = "_:" + event.txn_id;

            StringBuilder mutation = new StringBuilder();
            // Upsert User
            mutation.append(String.format("<%s> <dgraph.type> \"User\" .\n", userUid));
            mutation.append(String.format("<%s> <user_id> \"%s\" .\n", userUid, event.user_id));
            
            // Upsert Device
            mutation.append(String.format("<%s> <dgraph.type> \"Device\" .\n", deviceUid));
            mutation.append(String.format("<%s> <device_id> \"%s\" .\n", deviceUid, event.device_id));

            // Create Transaction
            mutation.append(String.format("<%s> <dgraph.type> \"Transaction\" .\n", txnUid));
            mutation.append(String.format("<%s> <txn_id> \"%s\" .\n", txnUid, event.txn_id));
            mutation.append(String.format("<%s> <amount> \"%f\" .\n", txnUid, event.amount));
            
            // Link them together
            mutation.append(String.format("<%s> <used_devices> <%s> .\n", userUid, deviceUid));
            mutation.append(String.format("<%s> <by_user> <%s> .\n", txnUid, userUid));
            mutation.append(String.format("<%s> <on_device> <%s> .\n", txnUid, deviceUid));

            // 将生成的Mutation发送到下游的Sink
            out.collect(mutation.toString());
        }
    }

    @Override
    public void close() throws Exception {
        if (dgraphService != null) {
            dgraphService.close();
        }
    }
}

这个ProcessFunctiondevice_id为key。当一个交易事件到来时,它会先去查询Dgraph,判断这个user_iddevice_id的关联是否已经存在。这是一个简化逻辑,实际场景会更复杂,例如检查关联时间。如果是新关联,它会更新Flink的托管状态(ValueState),并生成一串RDF格式的字符串,这就是Dgraph的Mutation。_:前缀用于创建空白节点,Dgraph会根据@id字段自动处理Upsert逻辑。

自定义 Dgraph Sink:
Flink没有内置的Dgraph Sink,我们需要自己实现一个。核心是继承RichSinkFunction

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import io.dgraph.DgraphClient;
import io.dgraph.DgraphProto.Mutation;
import io.dgraph.Transaction;

public class DgraphSink extends RichSinkFunction<String> {

    private transient DgraphClient dgraphClient;
    private final String dgraphHost;
    private final int dgraphPort;

    public DgraphSink(String host, int port) {
        this.dgraphHost = host;
        this.dgraphPort = port;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        // 同样,在open方法中创建客户端
        // 生产环境中需要处理gRPC连接的管理和重试
        this.dgraphClient = DgraphService.createDgraphClient(dgraphHost, dgraphPort);
    }

    @Override
    public void invoke(String mutationRdf, Context context) throws Exception {
        // 这里的坑在于,逐条写入性能极差。
        // 一个常见的优化是攒批(Buffering),但会增加延迟和实现的复杂性。
        // 这里为了演示,采用简单的单条写入。
        Transaction txn = dgraphClient.newTransaction();
        try {
            Mutation mu = Mutation.newBuilder()
                .setSetNquads(com.google.protobuf.ByteString.copyFromUtf8(mutationRdf))
                .build();
            txn.mutate(mu);
            txn.commit();
        } catch (Exception e) {
            // 生产级别的Sink必须有完善的重试和错误处理机制
            // 例如,对于可重试的异常,可以抛出让Flink框架来处理
            txn.discard();
            throw new RuntimeException("Failed to write to Dgraph", e);
        }
    }

    @Override
    public void close() throws Exception {
        // 关闭资源
    }
}

这个Sink的核心在invoke方法。它接收上游算子传来的RDF字符串,创建一个Dgraph事务,执行Mutation,然后提交。一个常见的错误是忘记处理事务的提交和丢弃,这会导致资源泄漏或数据不一致。生产环境中,必须增加重试逻辑,并考虑将多条Mutation打包在一个事务中批量提交以提升吞吐量。

组装 Flink 作业:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.connector.kafka.source.KafkaSource;
// ... other imports

public class RealtimeGraphFeatureJob {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置Checkpointing,这是保证Exactly-Once语义的基础
        env.enableCheckpointing(60000); // 每60秒一次

        KafkaSource<TransactionEvent> source = KafkaSource.<TransactionEvent>builder()
            // ... Kafka配置 ...
            .build();

        DataStream<TransactionEvent> transactionStream = env.fromSource(source, ...);

        DataStream<String> dgraphMutations = transactionStream
            .keyBy(event -> event.device_id)
            .process(new GraphFeatureProcessor());

        dgraphMutations.addSink(new DgraphSink("localhost", 9080));

        env.execute("Realtime Graph Feature Engineering Job");
    }
}

局限与未来路径

这套架构虽然解决了核心问题,但在生产环境中依然存在挑战和优化空间。

  1. Flink与Dgraph的交互瓶颈: GraphFeatureProcessor中同步查询Dgraph是当前最大的瓶颈。要解决这个问题,必须切换到AsyncDataStream.asyncUnorderedWait,并为DgraphService实现一套基于异步gRPC客户端的接口。这会显著增加代码复杂度,但能将吞吐量提升一个数量级。

  2. Dgraph Schema演进: 一旦系统上线,修改Dgraph的Schema会变得非常棘手。任何不兼容的变更都可能导致Flink作业失败或数据写入错误。需要建立一套严格的Schema管理和灰度发布流程。

  3. 端到端一致性: Flink通过Checkpoint可以保证其内部状态的Exactly-Once。但DgraphSink的事务提交和Flink的Checkpoint是两个独立的操作。要实现真正的端到端Exactly-Once,需要实现一个两阶段提交(TwoPhaseCommitSinkFunction)的Sink,这在工程上是巨大的挑战。对于大多数风控场景,At-Least-Once加上业务层的幂等处理通常是更务实的选择。

  4. 可测试性: 对这个分布式系统进行单元测试和集成测试很困难。引入Testcontainers框架,可以在测试中动态启动Kafka、Flink和Dgraph容器,从而构建一个高保真的测试环境,这是保障系统稳定性的关键投资。


  目录