相关推荐
大数据Flink大屏实时计算深度剖析
2024-11-20 17:50

大数据Fl<em></em>ink大屏实时计算深度剖析


大数据Fl<em></em>ink大屏实时计算深度剖析

想学习架构师构建流程请跳转:Java架构师系统架构设计

什么是智能推荐? 定义: 根据用户行为习惯所提供的数据, 系统提供策略模型,自动推荐符合用户行为的信息。 例举: 比如根据用户对商品的点击数据(时间周期,点击频次, 推荐类似的商品; 根据用户的评价与满意度, 推荐合适的品牌; 根据用户的使用习惯与点击行为,推荐类似的资讯。 应用案例

大数据Fl<em></em>ink大屏实时计算深度剖析

什么是实时数仓 数据仓库(Data Warehouse),可简写为DW或DWH,是一个庞大的数据存储集合,通过对各种业务数 据进行筛选与整合,生成企业的分析性报告和各类报表,为企业的决策提供支持。实时仓库是基于 Storm/Spark(Streaming)/Flink等实时处理框架,构建的具备实时性特征的数据仓库。

应用案例 分析物流数据, 提升物流处理效率。 大数据Fl<em></em>ink大屏实时计算深度剖析 阿里巴巴菜鸟网络实时数仓设计大数据Fl<em></em>ink大屏实时计算深度剖析 数仓分层处理架构(流式ETL: ODS -> DWD -> DWS -> ADS ODS(Operation Data Store:操作数据层, 一般为原始采集数据。 DWD(Data Warehouse Detail) :明细数据层, 对数据经过清洗,也称为DWI。 DWS(Data Warehouse Service):汇总数据层,基于DWD层数据, 整合汇总成分析某一个主题域的服 务数据,一般是宽表, 由多个属性关联在一起的表, 比如用户行为日志信息:点赞、评论、收藏等。 ADS(Application Data Store): 应用数据层, 将结果同步至RDS数据库中, 一般做报表呈现使用。 大数据Fl<em></em>ink大屏实时计算深度剖析

  1. IoT数据分析
  1. 什么是IoT 物联网是新一代信息技术,也是未来发展的趋势,英文全称为: Internet of things(IOT,顾名 思义, 物联网就是万物相联。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛 应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。
  2. 应用案例 物联网设备运营分析大数据Fl<em></em>ink大屏实时计算深度剖析 华为Iot数据分析平台架构大数据Fl<em></em>ink大屏实时计算深度剖析
  1. 智慧城市 城市中汽车越来越多, 川流不息,高德地图等APP通过技术手段采集了越来越多的摄像头、车流 的数据。 但道路却越来越拥堵,越来越多的城市开始通过大数据技术, 对城市实行智能化管理。 2018年, 杭州采用AI智慧城市,平均通行速度提高15%,监控摄像头日报警次数高达500次,识 别准确率超过92%,AI智慧城市通报占全体95%以上,在中国城市交通堵塞排行榜, 杭州从中国 第5名降至57名。 大数据Fl<em></em>ink大屏实时计算深度剖析 大数据Fl<em></em>ink大屏实时计算深度剖析
  2. 金融风控 风险是金融机构业务固有特性,与金融机构相伴而生。金融机构盈利的来源就是承担风险的风险溢 价。 金融机构中常见的六种风险:市场风险、信用风险、流动性风险、操作风险、声誉风险及法律风 险。其中最主要的是市场风险和信用风险。 线上信贷流程,通过后台大数据系统进行反欺诈和信用评估大数据Fl<em></em>ink大屏实时计算深度剖析
  3. 电商行业 用户在电商的购物网站数据通过实时大数据分析之后, 通过大屏汇总展示, 比如天猫的双11购物 活动,通过大屏, 将全国上亿买家的订单数据可视化,实时性的动态展示,包含总览数据,流式 TopN数据,多维区域统计数据等,极大的增强了对海量数据的可读性。 大数据Fl<em></em>ink大屏实时计算深度剖析 TopN排行大数据Fl<em></em>ink大屏实时计算深度剖析

大数据Flink概述 大数据Flink入门案例

Flink 连接器包含数据源输入与汇聚输出两部分。Flink自身内置了一些基础的连接器,数据源输入包含文件、目录、Socket以及 支持从collections 和 iterators 中读取数据;汇聚输出支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket。 官方地址 Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:

Flink还可以支持扩展的连接器,能够与第三方系统进行交互。目前支持以下系统:

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)
  • Google PubSub (source/sink)
  • JDBC (sink)

常用的是Kafka、ES、HDFS以及JDBC。

Flink Connectors JDBC 如何使用 功能: 将集合数据写入数据库中

 

代码

 

数据表

 

自定义写入数据源 功能:读取Socket数据, 采用流方式写入数据库中。 代码

 

自定义数据源

 

AccessLog:

 

测试数据:注意

 

自定义读取数据源 功能: 读取数据库中的数据, 并将结果打印出来。 代码

 
 

通过Sink写入HDFS数据 功能: 将Socket接收到的数据, 写入至HDFS文件中。

依赖

 

代码

 

数据源模拟实现

 

代码:

 
 

读取HDFS文件数据

 

Hadoop环境安装

  1. 配置免密码登录 生成秘钥
 

将秘钥写入认证文件

 

修改认证文件权限

 
  1. 配置环境变量 将Hadoop安装包解压, 将Hadoop加入环境变量/etc/profile
 

执行生效

 
  1. 修改Hadoop配置文件 1) 修改hadoop-env.sh文件
 

修改JAVA_HOME

 

2)修改core-site.xml文件

 

这里的主机名称是flink。 3)修改hdfs-site.xml文件

 

4)修改mapred-site.xml文件

 

5)修改slaves文件

 

这里配置的是单节点, 指向本机主机名称。 6)修改yarn-site.xml

 
  1. 启动Hadoop服务
 

上传一个文件, 用于测试

 

如果上传失败 1)可能是namenode没有启动,则执行如下命令

 

2)检查/etc/hosts文件配置

 
  1. 访问验证

大数据Fl<em></em>ink大屏实时计算深度剖析 大数据Fl<em></em>ink大屏实时计算深度剖析

ES服务安装

  1. 到官网下载地址下载6.8.1版本的gz压缩包, 不要下载最新版本, Spring Boot等项目可能未及时更新支持。
  2. 解压安装包
 
  1. ElasticSearch不能以Root身份运行, 需要单独创建一个用户
 

执行以上命令,创建一个名为elsearch用户, 并赋予目录权限。 4. 修改配置文件 vi config/elasticsearch.yml, 只需修改以下设置

 
  1. 指定JDK版本 最新版的ElasticSearch需要JDK11版本, 下载JDK11压缩包, 并进行解压。

修改环境配置文件 vi bin/elasticsearch-env 参照以下位置, 追加一行, 设置JAVA_HOME, 指定JDK11路径。

 
  • 关闭ConcMarkSweepGC

JDK9版本以后不建议使用ConcMarkSweepGC, 如果不想出现提示, 可以将其关闭 vi config/jvm.options 将UseConcMarkSweepGC注释

 
  • 启动ElasticSearch
  • 切换用户

su elsearch

  • 以后台常驻方式启动

bin/elasticsearch -d 7. 问题处理 出现max virtual memory areas vm.max_map_count [65530] is too low, increase to at least 错误信息 修改系统配置

 

添加

 

执行生效

 
 

在文件末尾添加

 

重新切换用户即可

 

Flink ES写入功能实现 功能: 将Socket流数据, 写入至ES服务。 依赖

 

代码

 

查看index信息: http://192.168.116.141:9200/_cat/indices?v 查看具体数据: http://192.168.116.141:9200/flink-es/_search

Kafka安装

  1. 下载Kafka_2.12-1.1.1安装包
  2. 将安装包解压
 
  1. 修改kafka配置 只修改绑定IP, 因为是单节点, 其他按默认配置来。
 

如有多个IP地址, 绑定为对外访问的IP。 4. 启动zookeeper服务 kafka安装包内置了zookeeper,可以直接启动。

 
  1. 启动kafka服务
 

Flink Kafka 读取功能 功能: 通过flink读取kafka消息队列数据, 并打印显示。 依赖

 

代码

 

通过kafka生产者命令测试验证

 

扩展点:kafka消息的消费处理策略

 

Flink Kafka 写入功能 功能: 将Socket的流数据,通过flink 写入kafka 消息队列。 代码

 

通过kafka消费者命令测试验证

 

控制消息的发送处理模式:

 

提供了三种消息处理模式

  • Semantic.NONE :Flink 不会有任何语义的保证,产生的记录可能会丢失或重复。
  • Semantic.AT_LEAST_onCE (默认设置:类似 FlinkKafkaProducer010 版本中的setFlushonCheckpoint(true) ,这可以保证不会丢失任何记录(虽然记录可能会重复)。
  • Semantic.EXACTLY_onCE :使用 Kafka 事务提供精准一次的语义。无论何时,在使用事务写入 Kafka时,都要记得为所有消费 Kafka 消息的应用程序设置所需的 isolation.level ( read_committed 或 read_uncommitted - 后者是默认值)。
  • Kafka 的消息可以携带时间戳,指示事件发生的时间或消息写入 Kafka broker 的时间。
 
 

在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义的序列化方式做相应实现, 这里以Protobuf为例做讲解。 功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析数据。

  1. 通过protobuf脚本生成JAVA文件
 

通过批处理脚本,生成JAVA文件

 

注意, 路径要配置正确。 2. 自定义序列化实现 添加POM依赖

 

AccessLog对象

 

序列话好之后会根据AccessLog对象得到一个序列号的文件大数据Fl<em></em>ink大屏实时计算深度剖析 CustomSerialSchema:

 

3. 通过flink对kafka消息生产者的实现

 

开启Kafka消费者命令行终端,验证生产者的可用性

 
  1. 通过flink对kafka消息订阅者的实现
 

通过flink的kafka生产者消息的发送, 对消费者的功能做测试验证。

大数据Fl<em></em>ink大屏实时计算深度剖析

  • 总览数据

总销售量/总销售金额 TopN: 热销商品/商品类目/商品PV/商品UV

  • 区域/分类数据

不同区域销售排名 不同分类销售排名

  1. 下载安装包 安装包 后台管理包
  2. 解压 解压安装包
 

解压管理包

 
  1. 初始化管理数据库 导入初始化数据脚本
 
  1. 修改MySQL服务同步配置 编辑配置文件
 

增加同步配置

 

重启服务

 

检查同步功能是否开启

 

创建同步用户

 

赋予同步所需权限

 
  1. 修改后台管理配置文件
 

配置内容

 

先启动后台管理服务, 再启动Canal服务, 后台管理服务启动命令

 

访问:http://192.168.116.141:8089/ 登录: admin/123456 6. Canal服务配置

 

配置内容

 

启动Canal服务:

 
  1. 后台管理配置 修改Server管理配置
 

修改Instance配置(如果没有, 则新建,载入模板即可

 

regex同步配置规则: 常见例子

  1. 所有表:.* or …
  2. canal schema下所有表: canal…*
  3. canal下的以canal打头的表:canal.canal.*
  4. canal schema下的一张表:canal.test1
  5. 多个规则组合使用:canal…*,mysql.test1,mysql.test2 (逗号分隔)

功能实现流程

  1. 订单数据源的实现
  2. flink代码功能实现
  3. Flink 与 Spring Boot的集成
  4. 测试验证,比对SQL:
 
  1. 数据呈现
 
 

kibana服务安装 Kibana是一个针对Elasticsearch的开源分析及可视化平台,用来搜索、查看交互存储在Elasticsearch索 引中的数据。 6. 到官网下载, Kibana安装包, 与之对应6.8.1版本, 选择Linux 64位版本下载,并进行解压。 7. Kibana启动不能使用root用户, 使用上面创建的elsearch用户, 进行赋权

 
  1. 修改配置文件 vi config/kibana.yml , 修改以下配置
 
  1. 启动kibana
 

看到以下日志, 代表启动正常

 

如果出现启动失败的情况, 要检查集群各节点的日志, 确保服务正常运行状态

  1. 增加订单地址信息数据源
  2. 创建对应的表与实体 实体: OrderAddress BO: JoinOrderAddress(订单数据与地址数据的合并对象) BO: HotDimensionOrder(ES存储的映射对象, 注意这里的ID唯一性, 如果是按省份统计, ID存储省份信息,如果是按地级市统计, ID则存储为市区信息。
  3. 改造订单数据源, 增加缓存写入, 地址信息数据源增加缓存的读取。
  4. 修改Canal的后台配置, 增加地址数据源的监听队列。
  5. 区域双流统计的核心代码实现: 1)增加双流的kafka配置, 每个流监听不同的数据队列。 2)每个流要加上时间水印, 设定时间窗, 设定值比后面聚合的时间窗稍小一些。 3)根据订单ID做join匹配。 4) 根据区域做汇总统计(省份、城市)。 5) 将数据写入至ES。
  6. 测试验证 验证SQL:
 
 
 
  1. 增加订单支付流水数据源
  2. 创建对应的表与实体 实体: OrderPayment BO: JoinOrderAddress
  3. 修改Canal的后台配置, 增加地址数据源的监听队列。
  4. 核心代码实现: 1)实现订单支付流水数据源的监听处理。 2)定义CEP处理规则,解析出支付成功的订单。
  5. 测试验证 检查订单状态是未支付 -》 已支付的数据
 

检查超时的数据: 初始状态为0, 指定时间之内没有已支付的数据。 6. 拓展实现, 热门商品统计排行,只统计支付成功的数据。

 
 

功能: 统计商品在一段时间内的UV(Unique Visitor) 核心代码

 
 

功能: 统计商品在一段时间内的UV(采用布隆过滤器) 核心代码

    以上就是本篇文章【大数据Flink大屏实时计算深度剖析】的全部内容了,欢迎阅览 ! 文章地址:http://ktsh.xhstdz.com/quote/80952.html 
     栏目首页      相关文章      动态      同类文章      热门文章      网站地图      返回首页 物流园资讯移动站 http://ktsh.xhstdz.com/mobile/ , 查看更多   
发表评论
0评