在数据量呈指数级增长的今天,企业期望更快地从数据中获取洞察,助力业务发展,这使得实时大数据处理技术变得越发重要。Apache Flink 是实时流计算的代表性框架,其可实现毫秒级低延迟的实时流数据计算,且拥有丰富的使用场景和活跃的用户社区。基于这一框架,阿里云构建了实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica),来为业界越来越多的实时 AI(Artificial Intelligence, 人工智能)场景提供多途径支持,包括 Alink 机器学习算法库和 AI Flow 大数据 AI 平台等。同时,依托与英特尔在大数据及 AI 领域的紧密合作,阿里云在实时计算 Flink 中集成了 Analytics Zoo Cluster Serving 来构建 AI 推理解决方案。本文将介绍这一解决方案的关键技术,并详解如何在阿里云实时计算 Flink 产品中使用该技术搭建实时 AI 推理方案。
阿里云实时计算 Flink 版
阿里云实时计算 Flink 版是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,其运行于 Yarn 或者 Kubernetes 提供的调度系统上。Kubernetes 运行在阿里云的基础架构上,包括阿里云神龙服务器、阿里云 ECS 实例,以及阿里云存储解决方案等。阿里云实时计算 Flink 版总体架构如图一所示:
图一 阿里云实时计算 Flink 版总体架构
阿里云实时计算 Flink 版性能优越,可 100% 兼容开源 Flink 接口,提供百万级作业吞吐,计算时延低至毫秒级,并支持数十种作业指标监控;同时,其还具有作业智能调优功能,提供一站式开发界面和智能诊断系统。
Analytics Zoo Cluster Serving
Analytics Zoo 是英特尔开源的统一的大数据分析和人工智能平台,其可将 TensorFlow、Keras、PyTorch 等 AI 训练和推理框架无缝扩展到分布式 Apache Spark、Flink、Ray 等大数据平台上运行。用户可以利用 Analytics Zoo 提供的分布式 AI 推理解决方案 Cluster Serving,快速构建运行于 Apache Flink 之上的实时 AI 推理服务,仅需极少量代码和指定极少量信息,即可构建一个可扩展的推理流水线。Analytics Zoo Cluster Serving 推理解决方案的架构如图二所示:
图二 Analytics Zoo Cluster Serving 架构
欲了解更多有关于 Analytics Zoo 的详细信息,请访问:
https://analytics-zoo.readthedocs.io/en/latest/
使用阿里云实时计算 Flink 版和 Analytics Zoo Cluster Serving 构建 AI 推理解决方案
在阿里云实时计算 Flink 版 2.4.2 及以上版本的内置函数中,集成了Analytics Zoo Cluster Serving 的 AI 推理功能。
用户在 Flink 应用程序调用 `CLUSTER_SERVING` 函数,只需要提供模型文件和数据文件,即可使用 TensorFlow、PyTorch 和 OpenVINO™ 工具套件的推理模型,完成机器学习端到端的应用。图三为在实时计算 Flink 版 Ververica 上进行 Cluster Serving 作业开发时的使用界面。
图三 实时计算 Flink 版的 Cluster Serving 作业开发界面
开发者可以参考以下语法要求来定义 Cluster Serving 的数据:
`CLUSTER_SERVING` 函数语法:
LOAD MODULE `cluster-serving`;
CLUSTER_SERVING(uri, `data`)
其中,`CLUSTER_SERVING` 函数参数定义如下:
使用 Analytics Zoo Cluster Serving 搭建推理解决方案示例
下面将通过具体案例来介绍使用 Analytics Zoo Cluster Serving 在阿里云实时计算 Flink 版搭建分布式推理的方法。本样例将采用一个简单深度学习模型 AutoEncoder,模型输入为一个向量(一维数组),通过自动编码模型输出一个向量。
以输入格式为 input_shape=(?,128) 的 TensorFlow SavedMode 模型为例,在阿里云实时计算 Flink 版使用 Cluster Serving 实现在线推理的步骤如下:
1. 登录阿里云 OSS 控制台,上传测试数据 input.csv 文件至 oss://***/input.csv 目录。input.csv 示例如下:
uri(String) | data(String) |
my-data | 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 |
2. 在阿里云实时计算 Flink 版控制台,创建 Flink 源表和结果表,加载 `cluster-serving` Function Module,并且使用 `CLUSTER_SERVING` 推理函数预测输入数据插入结果表:
CREATE TEMPORARY TABLE ClusterServingInput( uri STRING, `data` STRING ) WITH ( ‘connector’ = ‘filesystem’, ‘path’ = ‘oss://***/input.csv’, ‘format’ = ‘csv’ ); CREATE TEMPORARY TABLE ClusterServingOutput ( `data` STRING ) WITH ( ‘connector’ = ‘print’ ); LOAD MODULE `cluster-serving`; INSERT INTO ClusterServingOutput SELECT cluster_serving(uri, `data`) FROM ClusterServingInput; |
图四 Cluster Serving 的代码示例
3. 在作业高级配置的更多 Flink 配置和 Preview Session 集群 **其他配置** 均增加以下配置:
pipeline.global-job-parameters: ‘”modelPath:””[模型目录地址]”””’
其中,模型目录地址为模型文件的 OSS 存放目录,例如
oss://***/tf_auto/。
图五 Cluster Serving 的配置示例
4. 在作业运维界面,单击目标作业操作列下的运行。运行推理作业结果数据如下:
data (String) |
0.03932024, 0.017788976, -2.2538425E-4, 0.021853039, 0.030905273, 0.052324444, 0.053021688, 0.08776946, 0.0625467, 0.12149326, 0.048770625, 0.04533424, 0.09271586, 0.023119276, 0.0037812046, 0.052707616, 0.029312208, 0.023519568, 0.07501668, 0.037939064, 0.06363185, 0.04994791, 0.08504477, 0.06940596, 0.052632406, 0.028651983, 0.02205597, 0.05861686, 0.040175162, 0.04682718, 0.05283319, 0.01967535, 0.111778766, 0.03182444, 0.04509241, 0.09942829, 0.009491239, 0.038903005, 0.11488753, 0.019503327, 0.0346709, 0.060727723, 0.03718308, 0.1295206, 0.04292037, 0.13272661, 0.009957914, 0.050370887, 0.03016096, 0.067269124, 0.09981682, 0.08606887, 0.085193545, 0.041775055, 0.02956512, 0.027142545, 0.09836763, 0.029330501, 0.040107135, 0.0753366, 0.068517864, 0.020111244, 0.086271614, 0.04449262, 0.107297346, 0.07932708, 0.036529146, 0.014892701, 0.08245985, 0.04272034, 0.076421194, 0.035283472, 0.028554386, 0.019374546, 0.048012834, 0.011391987, 0.028220229, 0.054405782, 0.037807927, 0.085761145, 0.07974813, 0.04150249, -0.017132144, 0.08219319, 0.019887058, 0.034193676, 0.06221051, 0.07516215, 0.0598385, 0.09159884, 0.038117558, 0.05329266, 0.021038251, 0.072046585, 0.05561088, 0.017538168, 0.025691401, 0.08580602, 0.02225845, 0.117060736, 0.060743354, 0.11840888, 0.065381594, 0.08059649, 0.045638822, 0.06399781, 0.011646094, 0.043574452, 0.123603456, 0.045708194, 0.05672055, 0.08402837, 0.118181005, 0.09657015, 0.08259893, 0.0010145458, 0.031954847, 0.07388758, 0.035380267, 0.070080444, 0.06764534, 0.058731187, 0.022044828, 0.094359346, 0.05559277, 0.032255664, 0.07360537, 0.1199253 |
注:本样例模型不含标签,用户也可以使用通用的有标签的机器学习模型(如 ResNet50),配合有标准标签的数据集(如 ImageNet),对推理精度进行检验。
阿里云实时计算 Flink 版和 Analytics Zoo Cluster Serving 的应用场景
通过在阿里云实时计算 Flink 版中集成 Analytics Zoo Cluster Serving,用户可以在诸多场景中方便地进行分布式 AI 推理服务。例如,通过实时行动轨迹的搜索来协助病毒传播防控,或者通过实时过滤恶意点击来提高推荐系统的有效性。
2020 年,突如其来的疫情对公共卫生事件应对能力提出了更高的要求,通过对实时病例的行动轨迹分析,可以有效防控疫情的传播。使用 Apache Flink、Analytics Zoo Cluster Serving 以及向量搜索引擎 Proxima 等技术,可以基于确诊病例历史行动数据集,按照时点搜索,还原其行动轨迹。
阿里云实时计算 Flink 版集成的 Analytics Zoo Cluster Serving 还可用于提高推荐系统的有效性。为了满足不同用户的个性化需求,电商平台会根据用户的兴趣爱好推荐合适的商品,从而满足客户千人千面的商品需求。商家为了获取更多的平台曝光量,可能会别有用心地利用平台的推荐机制,增加自家商品的曝光机会,典型手法就是 “抱大腿” 攻击,其本质是通过大量协同点击目标商品和爆款商品,建立目标商品与爆款商品之间的关联关系。而使用 Analytics Zoo Cluster Serving 和 Apache Flink 可实现对恶意流量的实时识别,并保护识别程序的数据安全。
图六所示为一个可以实现上述场景的实时 AI 训练和推理方案的技术架构:
图六 实时 AI 训练和推理架构概览
总结
实时计算 Flink 版已经广泛应用于大数据分析实时化的场景,其与深度学习相结合,可支持构建更多 AI 应用场景,例如实时风控、实时异常检测和实时推荐等。阿里云和英特尔紧密合作,在阿里云实时计算 Flink 版中集成了 Analytics Zoo Cluster Serving,使阿里云平台可提供产品化的实时 AI 推理能力,为高效进行实时大数据开发和方案部署提供了一体化平台。