消息队列中间件会保存消息,直到收到消费者消费成功并确认,在此之后消息便可以删除,不过什么时候删除,由中间件自己决定
ack消息会一直储存,直到现有的所有订阅都确认了这条消息,在此之后如果要继续保存则需要配置消息保留策略。(所以ack消息默认是写到bookie,只有bookie写失败才会尝试写元数据库)
累积确认不能在 Shared 或 Key_shared 订阅类型中使用,因为 Shared 或 Key_Shared 订阅类型涉及多个有权访问同一订阅的消费者。在共享订阅类型中,消息是单独确认的
除了确认,消费者也可以发送否定确认,即告诉broker某条消息消费失败
!!!确认与否定确认,在不同的订阅模式中起作用或者不起作用,比如灾备模式和独占模式下,累积确认只会确认/否定最后一条消息,默认再次之前的所有消息都否定了,而共享模式下则只能单独确认/否定,详情看文档:https://pulsar.apache.org/docs/3.3.x/concepts-messaging/
如果启用批处理,则一批中的所有消息都会重新发送给消费者。
与确认超时相比,否定确认更可取。首先,超时值很难设置。其次,当消息处理时间超过确认超时时,代理会重新发送消息,但这些消息可能不需要重新使用。
消费者会自动订阅重试队列,如果超过一定次数后还是无法消费成功,那么该消息会被放入死信队列。不过自动订阅retry默认是关闭的,需要启用
!!!pulsar文档说retry队列由消费者管理。这个不太懂。比如消费者消费一条消息,然后返回否定确认,然后又没有开启重试,那么该消息会被怎么处理,开重试又会怎么处理。看文档说如果是否定确认的话则由broker重新投递,超过一定次数丢到死信队列,而开起重试则由消费者把消息发送到重试队列。不过有疑问:消费者发送到重试队列,然后重试队列再发送给消费者,这不多此一举吗?不过客户端是有缓存的。重试队列目前在共享模式下可以用(文档里有代码说是消费者把消息发送给broker,然后broker过一段时间再发送给消费者,即reconsumerLater)
死信主题的功能由消费者实现。您可以决定如何处理死信主题中的消息。
总之这一块疑问很多。
backlog 大小代表的是批次总数,而不是消息总数。
batch消息:一批消息要么都成功,如果一批消息收到一个否定,那么这批消息都会重发。所以最新的pulsar版本增加了批量索引确认,即确认一批消息内的部分消息,全部确认完毕后就认为这批消息成功处理了。同样,批量索引默认是关闭的
chunking 不能与批处理同时启用。在启用分块之前,您需要禁用批处理。
分块消息由消费者端sdk自动进行组装,maxPendingChunkedMessage参数指定用于分块消息的缓存的最大值
游标是持久的,它保留消息并保留当前位置。
如果broker从故障中重新启动,它可以从持久存储(BookKeeper)中恢复游标,以便可以从上次消费的位置继续消费消息
!!!在Apache Pulsar中,puslar怎么实现一个subscription订阅多个topic?我记得dispatcher里面有个根据消费者类型来分发消息,这个地方可以看一下,另外一个cursor对应一个topic,cursor和consumer以及dispatcher以及多个topic之间的关系可以考虑一下
默认情况下没有任何持久订阅的主题的消息将被标记为已删除。如果您想防止消息被标记为已删除,您可以为此主题创建持久订阅。在这种情况下,只有已确认的消息才会被标记为已删除。
非持久主题是 Pulsar 主题,其中消息数据永远不会持久存储到磁盘,仅保存在内存中。
系统主题用于实现某些功能并消除对第三方组件的依赖,例如事务、心跳检测、主题级策略和资源组服务。系统主题使这些功能的实现变得简单、依赖且灵活。以心跳检测为例,您可以利用系统主题进行健康检查,内部让生产者/读取者在heartbeat命名空间下生产/消费消息,从而可以检测当前服务是否还活着。
默认情况下:pulsar立即删除已经被确认的消息以及持久化存储未被确认的消息,但是可以可以配置,不过只能在namespace级配置
如果对未确认消息启用了TTL,随着消息过期(如底部所示),某些消息即使尚未得到确认,也会被删除,因为根据应用于命名空间的 TTL,它们已过期(例如,因为已应用 5 分钟的 TTL,并且消息尚未得到确认,但已过去 10 分钟)。
消息去重是broker实现的,消息去重对精准一次很重要
broker两个组成部分:dispatcher:分发消息;http服务器,帮助producer和consumer找到topic
一个cluster包括zk(或etcd等其他元数据库)、bookie、broker,每个cluster的zk叫做local zk,所有集群还共用一个global zk
cursor也会持久化道bookie中
!!!!pulsar开发的managedLedger包含一个唯一的writer以及多个用于读取的cursor,一个cursor对应一个消费流即一个订阅,也就是说一个ManagedLedger对应一个topic,同时负责对该topic的读写。
分布式系统中client与服务器访问有两种方式:1:client通过serverA获取serverB的地址,然后client直接和serverB通讯,这种情况下client必须可以直接访问serverB的地址,否则会报错;2:clinet要访问serverB,然后client把请求发给serverA,serverA再去serverB,再把结果返回个client,这样client就无须和serverB通信。方式2就是proxy的方式
!!!client,不管是consumer还是producer,第一步都是发一个lookup消息给broker,来获取topic对应的broker的地址
flow默认1000
每当创建一个新的订阅时,默认情况下,该订阅会被定位在主题的末尾。
!!!pulsar除了consumer,还有reader。consumer模型由pulsar来自动管理游标,而reader模型则是由客户端自己控制游标,即pulsar只负责提供消息,而不负责管理游标。
在内部,阅读器接口被实现为消费者,使用对具有随机分配名称的主题的独占、非持久订阅。
与订阅/消费者不同,读取器本质上是非持久性的,并且不会阻止主题中的数据被删除,因此强烈建议配置数据保留。
pulsar负载均衡:核心是pulsar的topic是基于段的,也就是说一个topic分为多个partition,每个partition分为多个seg。1:存储层。一个topic的多个seg会均匀分布到多个bookie节点上,这是bookie层面提供的功能,pulsar只需要调用即可。2:server层(即broker)。broker是无状态的,所以把一个seg调度到另一个broker上时先关掉所有连接,然后转移,然后客户端重新连接。
bundle:一个namespace会被划分成多个bundle,每个bundle都包含多个partition(topic只是一个抽象的概念,partition才是底层真正的topic。)
之所bundle在namespace之下,topic之上,是因为这是一个这种,既保留了必要的信息,又不会需要太大的存储空间。
bundle:一个namespace创建的时候就会被划分成多个bundle,一个topic创建时会被分配到某个bundle。这里的topic是指非分区topic或者分区topic的一个分区
当一个bundle负载过大或者包含的topic过多的时候会导致该bundle进一步被切分为多个bundle。支持自动和手动
bundle有赋值、切分、卸载三个主要流程
采用的是hash,实际的bundle便表示一个区间,topic的hash值位于同一个区间,就属于同一个bundle
bin/pulsar-admin namespaces bundles public/default{"boundaries" : [ "0x00000000", "0x08000000", "0x10000000", "0x20000000", "0x30000000", "0x40000000", "0x50000000", "0x60000000", "0x70000000", "0x80000000", "0x90000000", "0xa0000000", "0xb0000000", "0xc0000000", "0xd0000000", "0xe0000000", "0xf0000000", "0xffffffff" ],"numBundles" : 17
}
更换负载均衡插件:通过配置项,就是说它是通过反射来实现通过配置动态加载类,他就直接去找找这个类路径
pulsar多集群赋值:指a向集群1写一条数据,该数据会被自动复制到指定的集群,支持同步(只有所有指定的集群都返回复制成功后才向客户端返回成功或者指定数量集群写成功)或者异步复制
pulsar通过一些系统topic来传递消息,比如在 Apache Pulsar 中,每个命名空间都有自己的 主题(topic),当配置变更或者有事件发生的时候就会往这个topic发送消息
主题压缩:对于同一个key,只保留最新的消息。
schema:数据以非结构化方式存储在磁盘上,producer和consumer处理时都只能发送和接受结构化的消息,所以就用一个schema来记录消息的类型,这样存储时就能把producer发来的结构化消息存储为非结构化的原始字段,然后consumer解码的时候就通过schema把原始字节翻译成指定结构
事务:https://pulsar.apache.org/docs/3.3.x/txn-how/