如果有看我写的 Flink 系例的前期文章,大部份是写 Flink 各算子、窗口、方法、Table&SQL、连接器等的直接用法与示例,那是为了尽可能的将学习知识点的基础应用简化成直接的示例成果,将概念通过单一示例展现,将复杂度降低增加学习的简易性,以免一开始看到一堆的概念算子就心生退意。程序员嘛,大部份时候,习惯于短平快的内容,希望 10 分钟能看完的东西,别婆婆妈妈扯个把小时,但往往这样,只能撑握皮毛,确无法深入理解,这算是行业人的通病吧,但各有所爱也无法统一,就这样吧多说无益。
本章以模拟一个电商平台的日常订单数据统计系统为设计基础,将电商平台的实时订单通过 Flink 实时流计算能力,按聚合维度实时计算,输出订单流计算结果,再通过监控大屏展示,实时快速撑握电商平台订单数据趋势、分类占比、销量排行等,从而全局了解电商平台业务运行状态,为电商平台高层决策、运营、分析、成长等提供最基础、最实时的数据依据。
其实上述计算模式通过其它第三方框架或平台等进行离线计算也一样是可以完成,如:sprak、hodoop 等,但 Flink 的优势就是实时数据流计算,不需要等待数据批量入仓后再进行统一清洗、加工、计算、存储等,Flink 的实时流计算引擎可以将数据分段(按时间或数量)快速计算,就像水龙头一样打开(数据)水流入到桶内,对装满不同大小桶的(数据)水进行计算,如:体积、重量等(窗口内聚合计算),则水龙头源源不断的流出水,从而保证了当前看到数据即为最新计算结果,效率和体验都是最佳选择。
以电商项目运作模式为起点,将电商平台中对各维度计算的应用场景,再结合我们学习的 Flink 流式计算技术,融合到真实业务中,通过技术加速业务成长,通过业务检验新技术的可行性,从而推动新技术的落地与大面积的应用,!
根据电商项目的作业场景,我们选取以订单系统系统进行实时维度统计,将数据流按以下几个场景,采用 Flink 流式实时计算能力进行开发实现:
1. 商品累计销售总额(按分钟刷新)
2. 商品累计销售总量(按分钟刷新)
3. 每分钟商品销售流量
4. 每分钟商品分类销售量排名
5. 品牌营销能力:按累计销量、累计销售额计算
6. 消费前十排名用户(按分钟刷新)
7. 性别购买能力(按分钟刷新,统计各性别购买总额与占比)
8. 每 10 分钟商品销量(统计各性别分时段购买总量)
1. 将前期学习的算子、方法、窗口、连接器、Table&SQLAPI、水印等知识点串连汇集
2. 根据电商项目特点,结合业务需求,将 Flink 能力特性与业务流程融合到项目实践中
3. 检验前期学习的知识,加强动手能力,通过编码成果来验证设计目标、运作流程等满足业务需求目标;
4. 通过学习新的技术知识或设计新业务流程架构,实现架构知识与设计能力提升。
注:其实最终目标,就是。
项目采用技术点如下:
flink 开发库
kafka 集群
vue2 + element.ui + echarts (大屏显示)
spring boot webflux (大屏后端服务)
redis 存储实时计算数据(生产因该考虑数据同步落库到 mysql 进行持久化)
项目流程:
1. 一个正常流程的电商订单系统,会将先所有用户的订单发送到订单表或订单队列中,我们此处的 “微服务” 则是用示例代码模拟订单系统,将订单推送到 kafka 集群中,用来做业务削峰与订单缓存。
2. 通过集成 Flink 库,开发多维度实时流计算客户端,上传到 Flink 集群中,提交启动运行 Task JOB 服务。
3.Flink 客户端从 kafka 中获取订单数据,实时计算各窗口限定颗粒度的数据流对象,将算子结果输出到 redis 中。
4. 大屏监控服务实时或定时轮询获取 Redis 的最新实时计算结果,并对维度数据进行格式化,输出到前端,前端根据各维度要求生成对应图表效果;
前端采用 vue2.x + element.ui
主要用到图表展示组件 echarts
进入前端项目根目录,下载安装依赖模块
安装完毕后,再命令运行前端项目
或打包后上传到 nginx 服务中做静态资源
工程目录结构
后端项目与整个 flink 项目示例整合在一个工程项目包内,工程名称叫 flink-examples,其中划分不同的业务模块,以下为工程结构:
flink-examples
------ connectors(中件间连接器示例模块)
------ examples (模拟电商订单数据并推送到 kafka 中,以及 flink 核心流处理客户端)
------ stream(数据流与算子、方法、窗口等示例代码)
------ tableapi(table&sql 与中件间的使用示例代码)
------ web(获取 flink 算子计算后的存储结果,提供给前端展示)
jdk 1.8
springboot 2.3.4.RELEASE
redis 3.x
flink 1.11.1
因工程示例本地已开发完毕,直接进行打包;
模块打包成 jar 包,其中的 examples 模块打包后是独立可运行的客户端,也是上传到 flink 集群平台执行 TaskJOB 的客户端 jar 包;
启动 Flink 的 Standalone 模式集群服务;
主机:192.168.110.35(master)、192.168.110.35(slaves)
flink 安装目录: /opt/flink-1.11.1
由于已提前搭建好 Standalone 模式集群,则直接进入 master 下直接启动集群。如果未搭建 flink 的 Standalone 模式集群, 参见另一文章:
只需要在 master 主机下启动 flink-cluster 集群,在 master 主服务启动过程中,会执行远程命令启动所有 slaves 从服务;
启动 Kafka 中件间服务,整个工程项目中,电商平台订单数据存放在 kafka 消息队列中,集成 Flink 的 job 客户端将从 Kafka 消息队列中拉取订单数据;
主机:192.168.110.35(单节点)
kafka 安装目录:/opt/kafka_2.11-2.2.2
由于已提前搭建好 Kafka 中间件,则直接进入安装目录启动消息服务。如果未搭建 Kafka 消息中件间服务, 参见另一文章:
完成此步后,接下来可以访问 flink 平台,运行客户端;
在打包后提交到 flink 平台运行过程中有一个 jar 执行错误,如下:
:是在 mavne 打包过程中某些依赖 jar 包执行出错,以及 jar 包重复引用等,在打成 jar 过程中,meta-INF 中多了一些 *.SF,*.DSA,*.RSA 文件导致的(签名摘要文件);
:
1. 手动解压 jar 包,将 meta-INF 中的 *.SF,*.DSA,*.RSA 文件删除后,重新打成 jar 包上传到 Flink 平台;
2. 或者在 pom.xml 中配置 maven 打包插件做过滤
地址栏访问 flink 管理平台:
访问端口可在 /opt/flink-1.11.1/conf/flink-conf.yaml 配置中直接修改,本平台配置为 8083;
默认进入的是客户端主页,在主页中显示 Available Task Slots = 32(翻译过来叫可用的任务槽),是 Flink 根据 /opt/flink-1.11.1/conf/flink-conf.yaml 配置文件中的 taskmanager.numberOfTaskSlots: 16,识别当前集群的可用总任务数。两台主机的配置相同,CPU 均为 8 核 16 进程,按照一个 Task Slots 分配一个 CPU 进程。因此两台主机累计可用 CPU 核心进程为 32 进程,Total Task Slots 则 Master 和 Slaves 各配置为 16,合计 ,后续在提交运行 Jar 客户端时,需要配置的并行度,即指的就是当前 Available Task Slots 范围内的可用数。但 Available Task Slots 与 Task JOB 的并行度,并不太容易理清关系,按照网上有一个 Flink Task Slots 计算公式:
但我个人的理解,即当前 TaskJOB 中所有算子并行度合计的最大可用数,即为 的剩余数;
这个我没有认真去求证,但有一篇博文件可以作为参考来理解 slot ,链接地址
所以实际生产使用,需要评估 job 客户端 Slots 使用量,以免无法最大化发挥与利用平台有效资源;
从左侧 Submit New Job 菜单进入,点击 Add New 按钮,在弹出窗口中选择我们上一步打包的 Job 客户端,即 examples 的打包后的 jar 文件,该 jar 文件包含了 flink 开发库、kafka 客户端、redis 客户端等依赖包;点击打开后,开始上传 jar 包,上传速度与网络以及包大小有关;
打工本地工程,在 examples 模块源码 的 src》test》java》com.flink.test.CreateKafkaMsg 类中,直接右键》run ,该类模拟创建订单数据并向 kafka 发送订单消息的示例,假设不间断产生电商平台订单数据;
数据结构如下:
选择 examples-1.0-SNAPSHOT.jar 客户端,在展开的输入框中,按以下内容输入;
Entry Class:com.flink.examples.StartFlinkKafkaServer
Parallelism:16(并行度)
Program Arguments: 参数(无)
Savepoint Path: 打印输出文件路径(无)
点击 Submit,提交 Task Job 作业,Flink 平台分配资源进行执行;
通过平台显示,当前运行算子数量有为 7,其中子节点的 6 个为算子,起始根节点为 source 数据流加载算子方法,采用 flink-kafka 中间件连接器从 kafka 消息队列中获取 mq 订单数据源,分配给不同的业务算子,进行聚合计算;在每个算子的方法中,会将窗口统计数据存储到 redis 中,提供给大屏监控服务的后台使用;
在运行前端项目后,访问 即后看到大屏监控从后端服务中(后端服务从 redis 中加载指标数据)获取数据生成各维度指标图标。界面在展示过程中,会定时轮询后台服务接口,获取更新数据,刷新前端图表;
采用了一个 Demo 专门模拟生成电商订单数据,不停向 Kafka 推送结构化订单数据,目的是为仿照电商平台架构中的订单生成与数据削峰缓存处理过程;在由 Flink 平台获取 kafka 中的数据流,放到算子中进行聚合计算,输出结果到 redis 中,大屏不停的轮询后台应用获取数据进行展示;流程简单而言:订单》Kafka》Flink》Redis《后台《前端
CreateKafkaMsg.java
从 Kafka 中获取数据,将源源不断的数据流分别按不同的统计业务场景,分别在不同的算子与窗口下进行聚合计算。
StartFlinkKafkaServer.java
其它略..... 以工程源码为主;
当 JOB 作业提交到到 Flink 平台后,为了确认 Jar 客户端的运行情况,除了在 Job 作业详情总览界面上查看算子运行状态外,还可以在 Jar 客户端正常运行过程中,从 Flink Dashboard 平台 JobManager 中查看作业的执行日志,用于分析与排查 TaskJob 作业的执行情况,也可以将开发过程中,需要的程序日志信息等在此功能窗口中打印用于数据跟踪;
整个学习与开发过程,几乎没有的理论性的长篇总结,主要以场景为切入点,通过示例实践了解整个流程:
1.flink 客户端可以在本地开发环境上运行,同理也可以部署在独立服务器上单节点运行,但采用 flink 通常需要考虑大规则的数据应用场景,服务架构以集群为主,分为 on Yarn 和 Standalone 两种集群模式。
2.flink 提供中间件连接器,可以将一个中间件的数据做为输入通道,如:es,mysql,redis,mq 等,做为源源不断的数据来源,写入到 Flink 的数据(批)流中,通过将数据流按窗口、水印等放到一个或分多个计算算子中,进行聚合计算,在将计算结果进行合并或归类,再通过中间件连接器将结果输出到中间件中(es,mysql,redis 等);
3. 一旦提交启用 Flink 客户端后,Flink 会一直处于运行中(无输出源,则批流处理完毕打印日志后 JOB 停止),不断的按时长或数量等窗口分段滑动或滚动模式计算周期内的数据。
4.flink 可以通过 dataStreamApi 开发客户端算子和数输流输入输出功能,也同样可以用 Table SQL 开发相同功能;
此示例只是为了演示一个电商平台的数据流实时处理过程,但生产环境下实时计算方案大同小异,相差的只是业务场景的不一样;
Gitee:
内容未做仔细审稿,如有错误,敬请指出;