大数据-玩转数据-Flink 海量数据实时去重

一、海量数据实时去重说明

借助redis的Set,需要频繁连接Redis,如果数据量过大, 对redis的内存也是一种压力;使用Flink的MapState,如果数据量过大, 状态后端最好选择 RocksDBStateBackend; 使用布隆过滤器,布隆过滤器可以大大减少存储的数据的数据量。

二、海里书实时去重为什么需要布隆过滤器

如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。
但是随着集合中元素的增加,我们需要的存储空间越来越大。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为。
布隆过滤器即可以解决存储空间的问题, 又可以解决时间复杂度的问题.
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

三、布隆过滤基本概念

布隆过滤器(Bloom Filter,下文简称BF)由Burton Howard Bloom在1970年提出,是一种空间效率高的概率型数据结构。它专门用来检测集合中是否存在特定的元素。
它实际上是一个很长的二进制向量和一系列随机映射函数。

实现原理
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
BF是由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。
在这里插入图片描述

优点
1.不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;
2.时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是
3.哈希函数之间相互独立,可以在硬件指令层面并行计算。
缺点
1.存在假阳性的概率,不适用于任何要求100%准确率的情境;
2.只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。
使用场景
所以,BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适.
另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。
假阳性概率的计算
假阳性的概率其实就是一个不在的元素,被k个函数函数散列到的k个位置全部都是1的概率。可以按照如下的步骤进行计算: p = f(m,n,k)
其中各个字母的含义:
1.n :放入BF中的元素的总个数;
2.m:BF的总长度,也就是bit数组的个数
3.k:哈希函数的个数;
4.p:表示BF将一个不在其中的元素错判为在其中的概率,也就是false positive的概率;
A.BF中的任何一个bit在第一个元素的第一个hash函数执行完之后为 0的概率是:

B.BF中的任何一个bit在第一个元素的k个hash函数执行完之后为 0的概率是:

C.BF中的任何一个bit在所有的n元素都添加完之后为 0的概率是:

D.BF中的任何一个bit在所有的n元素都添加完之后为 1的概率是:

E.一个不存在的元素被k个hash函数映射后k个bit都是1的概率是:

结论:在哈数函数个数k一定的情况下
1.比特数组m长度越大, p越小, 表示假阳性率越低
2.已插入的元素个数n越大, p越大, 表示假阳性率越大
经过各种数学推导:
对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:

四、使用布隆过滤器实现去重

Flink已经内置了布隆过滤器的实现(使用的是google的Guava)

package com.lyh.flink12;import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;public class Flink02_UV_BoomFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 创建WatermarkStrategyWatermarkStrategy<UserBehavior> wms = WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.getTimestamp() * 1000L;}});env.readTextFile("input/UserBehavior.csv").map(line -> { // 对数据切割, 然后封装到POJO中String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为.assignTimestampsAndWatermarks(wms).keyBy(UserBehavior::getBehavior).window(TumblingEventTimeWindows.of(Time.minutes(60))).process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {private ValueState<Long> countState;private ValueState<BloomFilter<Long>> bfState;@Overridepublic void open(Configuration parameters) throws Exception {countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));bfState = getRuntimeContext().getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() {})));}@Overridepublic void process(String key,Context context,Iterable<UserBehavior> elements, Collector<String> out) throws Exception {countState.update(0L);// 在状态中初始化一个布隆过滤器// 参数1: 漏斗, 存储的类型// 参数2: 期望插入的元素总个数// 参数3: 期望的误判率(假阳性率)BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001);bfState.update(bf);for (UserBehavior behavior : elements) {// 查布隆if (!bfState.value().mightContain(behavior.getUserId())) {// 不存在 计数+1countState.update(countState.value() + 1L);// 记录这个用户di, 表示来过bfState.value().put(behavior.getUserId());}}out.collect("窗口: " + context.window() + " 的uv是: " + countState.value());}}).print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/148027.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

结构型设计模式——桥接模式

摘要 桥接模式(Bridge pattern): 使用桥接模式通过将实现和抽象放在两个不同的类层次中而使它们可以独立改变。 一、桥接模式的意图 将抽象与实现分离开来&#xff0c;使它们可以独立变化。 二、桥接模式的类图 Abstraction: 定义抽象类的接口Implementor: 定义实现类接口 …

【Pytorch笔记】4.梯度计算

深度之眼官方账号 - 01-04-mp4-计算图与动态图机制 前置知识&#xff1a;计算图 可以参考我的笔记&#xff1a; 【学习笔记】计算机视觉与深度学习(2.全连接神经网络) 计算图 以这棵计算图为例。这个计算图中&#xff0c;叶子节点为x和w。 import torchw torch.tensor([1.]…

使用关键字interface来声明使用接口-PHP8知识详解

继承特性简化了对象、类的创建&#xff0c;增加了代码的可重用性。但是php8只支持单继承&#xff0c;如果想实现多继承&#xff0c;就需要使用接口。PHP8可以实现多个接口。 接口类通过关键字interface来声明&#xff0c;接口中不能声明变量&#xff0c;只能使用关键字const声明…

机器人中的数值优化|【六】线性共轭梯度法,牛顿共轭梯度法

机器人中的数值优化|【六】线性共轭梯度法&#xff0c;牛顿共轭梯度法 往期回顾 机器人中的数值优化|【一】数值优化基础 机器人中的数值优化|【二】最速下降法&#xff0c;可行牛顿法的python实现&#xff0c;以Rosenbrock function为例 机器人中的数值优化|【三】无约束优化…

stm32 - 中断

stm32 - 中断 概念中断向量表NVIC 嵌套中断向量控制器优先级 中断EXTI概念基本结构例子- 对射式红外传感器计次例子 - 旋转编码器 概念 stm32 支持的中断资源&#xff08;都属于外设&#xff09; EXTITIMADCUSARtSPII2C stm32支持的中断 内核中断 外设中断 中断通道与优先级 一…

C# 读取Execl文件3种方法

方法 1&#xff0c;使用OLEDB可以对excel文件进行读取 1.1C#提供的数据连接有哪些 对于不同的.net数据提供者&#xff0c;ADO.NET采用不同的Connection对象连接数据库。这些Connection对我们屏蔽了具体的实现细节&#xff0c;并提供了一种统一的实现方法。 Connection类有四…

【Linux】线程池

目录 一、线程池1.什么是线程池2.线程池图解3.实现代码 二、单例模式1.单例模式的概念2.饿汉方式实现单例模式3.懒汉方式实现单例模式4.懒汉方式实现单例模式的线程池 一、线程池 1.什么是线程池 线程虽然比进程轻量了很多&#xff0c;但是每创建一个线程时&#xff0c;需要向…

UCOS的任务创建和删除

一、任务创建和删除的API函数 1、任务创建和删除本质就是调用uC/OS的函数 API函数 描述 OSTaskCreate() 创建任务 OSTaskDel() 删除任务 注意&#xff1a; 1&#xff0c;使用OSTaskCreate() 创建任务&#xff0c;任务的任务控制块以及任务栈空间所需的内存&#xff0c…

算法——买卖股票问题

309. 买卖股票的最佳时机含冷冻期 - 力扣&#xff08;LeetCode&#xff09; 一、 究其就是个动态规划的问题 算法实现图 初始化 由于有三个阶段&#xff0c;买入&#xff0c;可交易&#xff0c;冷冻期&#xff0c;那么用dp表表示现在为止的最大利润&#xff0c;则有 dp[0][…

asp.net core 远程调试

大概说下过程&#xff1a; 1、站点发布使用Debug模式 2、拷贝到远程服务器&#xff0c;以及iis创建站点。 3、本地的VS2022的安装目录&#xff1a;C:\Program Files\Microsoft Visual Studio\2022\Professional\Common7\IDE下找Remote Debugger 你的服务器是64位就拷贝x64的目…

详解Linux的系统调用fork()函数

在Linux系统中&#xff0c;fork()是一个非常重要的系统调用&#xff0c;它的作用是创建一个新的进程。具体来说&#xff0c;fork()函数会在当前进程的地址空间中复制一份子进程&#xff0c;并且这个子进程几乎完全与父进程相同&#xff0c;包括进程代码、数据、堆栈以及打开的文…

WebSocket实战之四WSS配置

一、前言 上一篇文章WebSocket实战之三遇上PAC &#xff0c;碰到的问题只能上安全的WebSocket&#xff08;WSS&#xff09;才能解决&#xff0c;配置证书还是挺麻烦的&#xff0c;主要是每年都需要重新更新证书&#xff0c;我配置过的证书最长有效期也只有两年&#xff0c;搞不…

ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装

ElasticSearch第四讲&#xff1a;ES详解&#xff1a;ElasticSearch和Kibana安装 本文是ElasticSearch第四讲&#xff1a;ElasticSearch和Kibana安装&#xff0c;主要介绍ElasticSearch和Kibana的安装。了解完ElasticSearch基础和Elastic Stack生态后&#xff0c;我们便可以开始…

ctfshow—1024系列练习

1024 柏拉图 有点像rce远程执行&#xff0c;有四个按钮&#xff0c;分别对应四份php文件&#xff0c;开始搞一下。一开始&#xff0c;先要试探出 文件上传到哪里&#xff1f; 怎么读取上传的文件&#xff1f; 第一步&#xff1a;试探上传文件位置 直接用burp抓包&#xff0c;…

力扣练习——链表在线OJ

目录 提示&#xff1a; 一、移除链表元素 题目&#xff1a; 解答&#xff1a; 二、反转链表 题目&#xff1a; 解答&#xff1a; 三、找到链表的中间结点 题目&#xff1a; 解答&#xff1a; 四、合并两个有序链表&#xff08;经典&#xff09; 题目&#xff1a; 解…

【数据结构---排序】很详细的哦

本篇文章介绍数据结构中的几种排序哦~ 文章目录 前言一、排序是什么&#xff1f;二、排序的分类 1.直接插入排序2.希尔排序3.选择排序4.冒泡排序5.快速排序6.归并排序总结 前言 排序在我们的生活当中无处不在&#xff0c;当然&#xff0c;它在计算机程序当中也是一种很重要的操…

聊聊常见的IO模型 BIO/NIO/AIO 、DIO、多路复用等IO模型

聊聊常见的IO模型 BIO/NIO/AIO/DIO、IO多路复用等IO模型 文章目录 一、前言1. 什么是IO模型2. 为什么需要IO模型 二、常见的IO模型1. 同步阻塞IO&#xff08;Blocking IO&#xff0c;BIO&#xff09;2. 同步非阻塞IO&#xff08;Non-blocking IO&#xff0c;NIO&#xff09;3.…

排序算法之【希尔排序】

&#x1f4d9;作者简介&#xff1a; 清水加冰&#xff0c;目前大二在读&#xff0c;正在学习C/C、Python、操作系统、数据库等。 &#x1f4d8;相关专栏&#xff1a;C语言初阶、C语言进阶、C语言刷题训练营、数据结构刷题训练营、有感兴趣的可以看一看。 欢迎点赞 &#x1f44d…

八大排序源码(含优化)

文章目录 1、直接插入排序2、希尔排序3、选择排序4、冒泡排序5、堆排序6、快速排序快速排序递归实现霍尔法挖坑法前后指针法快速排序小区间优化 快速排序非递归实现 7、归并排序归并排序递归实现归并排序非递归 8、计数排序 大家好&#xff0c;我是纪宁&#xff0c;这篇文章是关…

java Spring Boot 自动启动热部署 (别再改点东西就要重启啦)

上文 java Spring Boot 手动启动热部署 我们实现了一个手动热部署的代码 但其实很多人会觉得 这叫说明热开发呀 这么捞 写完还要手动去点一下 很不友好 其实我们开发人员肯定是希望重启这种事不需要自己手动去做 那么 当然可以 我们就让它自己去做 Build Project 这个操作 我们…