大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)

一、需求描述

每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。

二、需求分析

  • 1.统计每个商品的点击量, 开窗
  • 2.分组窗口分组
  • 3.over窗口

三、需求实现

3.1、创建数据源示例

input/UserBehavior.csv

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000

3.2、创建目标表

CREATE DATABASE flink_sql; //创建flink_sql库
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (`w_end` timestamp NOT NULL,`item_id` bigint(20) NOT NULL,`item_count` bigint(20) NOT NULL,`rk` bigint(20) NOT NULL,PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.3、导入JDBC Connector依赖

<!-- 导入JDBC Connector依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

3.4、代码实现

package com.atguigu.flink.java.chapter_12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/31 9:11*/
public class Flink01_HotItem_TopN {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 使用sql从文件读取数据tenv.executeSql("create table user_behavior(" +"   user_id bigint, " +"   item_id bigint, " +"   category_id int, " +"   behavior string, " +"   ts bigint, " +"   event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +"   watermark for event_time as  event_time - interval '5' second " +")with(" +"   'connector'='filesystem', " +"   'path'='input/UserBehavior.csv', " +"   'format'='csv')");// 每隔 10m 统计一次最近 1h 的热门商品 top// 1. 计算每每个窗口内每个商品的点击量Table t1 = tenv.sqlQuery("select " +"   item_id, " +"   hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +"   count(*) item_count " +"from user_behavior " +"where behavior='pv' " +"group by hop(event_time, interval '10' minute, interval '1' hour), item_id");tenv.createTemporaryView("t1", t1);// 2. 按照窗口开窗, 对商品点击量进行排名Table t2 = tenv.sqlQuery("select " +"   *," +"   row_number() over(partition by w_end order by item_count desc) rk " +"from t1");tenv.createTemporaryView("t2", t2);// 3. 取 top3Table t3 = tenv.sqlQuery("select " +"   item_id, w_end, item_count, rk " +"from t2 " +"where rk<=3");// 4. 数据写入到mysql// 4.1 创建输出表tenv.executeSql("create table hot_item(" +"   item_id bigint, " +"   w_end timestamp(3), " +"   item_count bigint, " +"   rk bigint, " +"   PRIMARY KEY (w_end, rk) NOT ENFORCED)" +"with(" +"   'connector' = 'jdbc', " +"   'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +"   'table-name' = 'hot_item', " +"   'username' = 'root', " +"   'password' = 'aaaaaa' " +")");// 4.2 写入到输出表t3.executeInsert("hot_item");}
}

执行结果:
在这里插入图片描述

四、总结

Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/149058.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度(matlab代码)

目录 1 主要内容 系统结构图 P2G-CCS 耦合模型 其他算例对比 2 部分代码 3 下载链接 1 主要内容 该程序复现《基于阶梯碳交易的含P2G-CCS耦合和燃气掺氢的虚拟电厂优化调度》模型&#xff0c;以碳交易和碳封存成本、燃煤机组启停和煤耗成本、弃风成本、购气成本之和为目标…

vertx的学习总结6

Beyond the event bus 一、章节覆盖&#xff1a; 如何在事件总线之上公开服务 verticles和事件总线服务的异步测试 动态代理&#xff1a; MyService 接口 package porxy.test;import io.vertx.codegen.annotations.ProxyGen;ProxyGen public interface MyService {void he…

智慧公厕:城市公共厕所的未来之路

随着城市化进程的不断推进&#xff0c;人们对城市环境质量的要求也越来越高。在城市管理中&#xff0c;公厕作为一个必不可少的公共设施&#xff0c;不仅关乎城市的文明形象&#xff0c;还与市民的生活质量密切相关。为了解决传统公厕存在的问题&#xff0c;智慧公厕应运而生。…

Go-Python-Java-C-LeetCode高分解法-第八周合集

前言 本题解Go语言部分基于 LeetCode-Go 其他部分基于本人实践学习 个人题解GitHub连接&#xff1a;LeetCode-Go-Python-Java-C 欢迎订阅CSDN专栏&#xff0c;每日一题&#xff0c;和博主一起进步 LeetCode专栏 我搜集到了50道精选题&#xff0c;适合速成概览大部分常用算法 突…

大模型部署手记(3)通义千问+Windows GPU

1.简介 组织机构&#xff1a;阿里 代码仓&#xff1a;GitHub - QwenLM/Qwen: The official repo of Qwen (通义千问) chat & pretrained large language model proposed by Alibaba Cloud. 模型&#xff1a;Qwen/Qwen-7B-Chat-Int4 下载&#xff1a;http://huggingface…

【AI视野·今日Sound 声学论文速览 第十八期】Wed, 4 Oct 2023

AI视野今日CS.Sound 声学论文速览 Wed, 4 Oct 2023 Totally 4 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Sound Papers Mel-Band RoFormer for Music Source Separation Authors Ju Chiang Wang, Wei Tsung Lu, Minz Won最近&#xff0c;基于多频段频谱图的方法…

windows server 2012 服务器打开系统远程功能

服务器上开启远程功能 进入服务器&#xff0c;选择“添加角色和功能” 需要选择安装的服务器类型&#xff0c;如图所示 然后在服务器池中选择你需要使用的服务器。 选择完成后&#xff0c;在图示列表下勾选“远程桌面服务” 再选择需要安装的功能和角色服务。 选择完成确认内容…

大模型部署手记(4)MOSS+Jetson AGX Orin

1.简介 组织机构&#xff1a;复旦大学 代码仓&#xff1a;GitHub - OpenLMLab/MOSS: An open-source tool-augmented conversational language model from Fudan University 模型&#xff1a;fnlp/moss-moon-003-sft-int4 下载&#xff1a;https://huggingface.co/fnlp/mos…

C++_pen_友元

友元&#xff08;破坏封装&#xff09; 我故意让别人能使用我的私有成员 友元类 friend class B;友元函数 friend void func();友元成员函数 friend void A::func();例 #include <stdio.h>class A;class C{ public:void CprintA(A &c); };class B{ public:void Bpri…

qt 5.15.2 安卓 macos

macos环境安卓配置 我的系统是monterey12.5.1 打开qt的配置界面 这里版本是java1.8&#xff0c;注意修改这个json文件&#xff0c;显示包内容 {"common": {"sdk_tools_url": {"linux": "https://dl.google.com/android/repository/comm…

lv7 嵌入式开发-网络编程开发 07 TCP服务器实现

目录 1 函数介绍 1.1 socket函数 与 通信域 1.2 bind函数 与 通信结构体 1.3 listen函数 与 accept函数 2 TCP服务端代码实现 3 TCP客户端代码实现 4 代码优化 5 练习 1 函数介绍 其中read、write、close在IO中已经介绍过&#xff0c;只需了解socket、bind、listen、acc…

Python爬虫案例入门教程(纯小白向)——夜读书屋小说

Python爬虫案例——夜读书屋小说 前言 如果你是python小白并且对爬虫有着浓厚的兴趣&#xff0c;但是面对网上错综复杂的实战案例看也看不懂&#xff0c;那么你可以尝试阅读我的文章&#xff0c;我也是从零基础python开始学习爬虫&#xff0c;非常清楚在过程中所遇到的困难&am…

简单查找重复文本文件

声明这是最初 我的提问给个文本分类清单input查找文件夹下 .py .txt .excel .word 一模一样的文本不是找文件名 找相同格式下的文件文本是否一样 文件单独复制到文件夹下两个文件全部复制到文件夹下 print 打印相同文本文件的名字 比如查找到了3.py与4.5.是.py文件中的文本文件…

Scala第一章节

Scala第一章节 scala总目录 章节目标 理解Scala的相关概述掌握Scala的环境搭建掌握Scala小案例: 做最好的自己 1. Scala简介 1.1 概述 ​ Scala(斯嘎拉)这个名字来源于"Scalable Language(可伸缩的语言)", 它是一门基于JVM的多范式编程语言, 通俗的说: Scala是一…

JAVAWeb业务层开发->普通和基于MP

普通方式业务层开发 service定义接口&#xff08;主要实现逻辑层面的业务功能&#xff09; serviceImpl实现该接口 注意事项&#xff1a; 逻辑判断的代码可以使用&#xff1e;号&#xff0c;使得返回结果为布尔类型。 小结&#xff1a;每一个接口写完都要写测试类去检测&#…

JMeter的详细使用及相关问题

一、中文乱码问题 如果出现乱码&#xff0c;需要修改编码集&#xff0c;&#xff08;版本问题有的不需要修改&#xff0c;就不用管&#xff09; 修改后保存重启就好了。 JMeter5.5版本的按照如下修改&#xff1a; 二、JMeter的启动 ①建议直接用ApacheJMeter.jar双击启动…

<一>Qt斗地主游戏开发:开发环境搭建--VS2019+Qt5.15.2

1. 开发环境概述 对于Qt的开发环境来说&#xff0c;主流编码IDE界面一般有两种&#xff1a;Qt Creator或VSQt。为了简单起见&#xff0c;这里的操作系统限定为windows&#xff0c;编译器也通用VS了。Qt版本的话自己选择就可以了&#xff0c;当然VS的版本也是依据Qt版本来选定的…

QT4.8.7安装详细教程

QT4.8.7安装详细教程&#xff08;MinGW 4.8.2和QTCreator4.2.0&#xff09; 1.下载及安装2.配置环境 此文是在下方链接博文的基础上&#xff0c;按自己的理解整理的https://blog.csdn.net/xiaowanzi199009/article/details/104119265 1.下载及安装 这三个文件&#xff0c;顺序是…

Swift SwiftUI CoreData 过滤数据 1

Xcode: Version 14.3.1 (14E300c) iOS: 16 预览&#xff1a; Code: import SwiftUI import CoreDatastruct TodosSearch: View {State private var search_title "测试"FetchRequest var todos_search: FetchedResults<Todo>init() {let request: NSFetchReq…

Cortex-A9 架构

一、Cortex-A 处理器运行模式 Cortex-A9处理器有 9中处理模式&#xff0c;如下表所示&#xff1a; 九种运行模式 在上表中&#xff0c;除了User(USR)用户模式以外&#xff0c;其它8种运行模式都是特权模式&#xff0c;在特权模式下&#xff0c;程序可以访问所有的系统资源。这…