1.配景先容
PV/UV统计是流式说明一个常见的场景。通过PV可以对会见的网站做流量或热门说明,譬喻告白主可以通过PV值预估痛蚨枫告网页所带来的流量以及告白收入。其它一些场景必要对会见的用户作说明,好比说明用户的网页点击举动,此时就必要对UV做统计。
行使Spark Streaming SQL,并团结Redis可以很利便举办PV/UV的统计。本文将先容通过Streaming SQL斲丧Loghub中存储的用户会见信息,对已往1分钟内的数据举办PV/UV统计,将功效存入Redis中。
2.筹备事变
- 建设E-MapReduce 3.23.0以上版本的Hadoop集群。
- 下载并编译E-MapReduce-SDK包
- git clone git@github.com:aliyun/aliyun-emapreduce-sdk.git
- cd aliyun-emapreduce-sdk
- git checkout -b master-2.x origin/master-2.x
- mvn clean package -DskipTests
编译完后, assembly/target目次下会天生emr-datasources_shaded_${version}.jar,个中${version}为sdk的版本。
数据源
本文回收Loghub作为数据源,有关日记收罗、日记理会请参考日记处事。
3.统计PV/UV
一样平常场景下必要将统计出的PV/UV以及响应的统计时刻存入Redis。其他一些营业场景中,也会只生涯最新功效,用新的功效不绝包围更新旧的数据。以下起首先容第一种环境的操纵流程。
3.1启动客户端
呼吁行启动streaming-sql客户端
- streaming-sql --master yarn-client --num-executors 2 --executor-memory 2g --executor-cores 2 --jars emr-datasources_shaded_2.11-${version}.jar --driver-class-path emr-datasources_shaded_2.11-${version}.jar
也可以建设SQL语句文件,通过streaming-sql -f的方法运行。
3.1界说数据表
数据源表界说如下
- CREATE TABLE loghub_source(user_ip STRING, __time__ TIMESTAMP)
- USING loghub
- OPTIONS(
- sls.project=${sls.project},
- sls.store=${sls.store},
- access.key.id=${access.key.id},
- access.key.secret=${access.key.secret},
- endpoint=${endpoint});
个中,数据源表包括user_ip和__time__两个字段,别离代表用户的IP地点和loghub上的时刻列。OPTIONS中设置项的值按照现实设置。
功效表界说如下
- CREATE TABLE redis_sink
- USING redis
- OPTIONS(
- table='statistic_info',
- host=${redis_host},
- key.column='interval');
个中,statistic_info为Redis存储功效的表名,interval对应统计功效中的interval字段;设置项${redis_host}的值按照现实设置。
3.2建设流功课
- CREATE SCAN loghub_scan
- ON loghub_source
- USING STREAM
- OPTIONS(
- watermark.column='__time__',
- watermark.delayThreshold='10 second');
-
- CREATE STREAM job
- OPTIONS(
- checkpointLocation=${checkpoint_location})
- INSERT INTO redis_sink
- SELECT COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
- FROM loghub_scan
- GROUP BY TUMBLING(__time__, interval 1 minute), window;
4.3查察统计功效
最终的统计功效如下图所示

可以看到,每隔一分钟城市天生一条数据,key的情势为表名:interval,value为pv和uv的值。
3.4实现包围更新
将功效表的设置项key.column修改为一个牢靠的值,譬喻界说如下
- CREATE TABLE redis_sink
- USING redis
- OPTIONS(
- table='statistic_info',
- host=${redis_host},
- key.column='statistic_type');
建设流功课的SQL改为
- CREATE STREAM job
- OPTIONS(
- checkpointLocation='/tmp/spark-test/checkpoint')
- INSERT INTO redis_sink
- SELECT "PV_UV" as statistic_type,COUNT(user_ip) AS pv, approx_count_distinct( user_ip) AS uv, window.end AS interval
- FROM loghub_scan
- GROUP BY TUMBLING(__time__, interval 1 minute), window;
最终的统计功效如下图所示

可以看到,Redis中值保存了一个值,这个值每分钟都被更新,value包括pv、uv和interval的值。
4.总结
本文扼要先容了行使Streaming SQL团结Redis实现流式处理赏罚中统计PV/UV的需求。后续文章,我将先容Spark Streaming SQL的更多内容。 (编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|