系列文章目录
vLLM (1) - Qwen2推理&部署
vLLM (2) - 架构总览
vLLM (3) - Sequence & SequenceGroup
vLLM (4) - LLMEngine上篇
vLLM (5) - LLMEngine下篇
vLLM (6) - Scheduler & BlockSpaceManager
文章目录
- 系列文章目录
- 前言
- 一、Scheduler
- 1.概述
- 2.Scheduler._schedule_default()回顾
- 3.Scheduler._schedule_prefills()
- 4.Scheduler._schedule_running()
- 5.Scheduler._schedule_swapped()
- 6.Scheduler._preempt()
- 7.Scheduler调度举例
- 8.Scheduler与BlockSpaceManager的关系
- 二、BlockSpaceManager
- 1.初始化
- 2.can_allocate() & allocate()
- 3.can_append_slots() & append_slots()
- 4.其他方法
- 三、CacheEngine
- 总结
前言
前面两篇展开讲解了LLMEngine
,涉及了Worker
、CacheEngine
、ModelRunner
和Scheduler
等核心组件,相信大家对其工作原理与流程已经有了相对清晰的认识。受限于篇幅,上一篇我们没有对Scheduler
彻底展开,这将会是本篇的重点。
回想一下vLLM
等大模型推理框架解决的问题是什么?当大模型开放给用户的时候,会面对大量请求,因此推理框架的目标就是通过高效利用现有资源(比如显存)等方式来增大吞吐量,降低延迟,以便快速给到用户响应。在vLLM
中,通过Scheduler
合理的调度方式,以及PageAttention
优化的GPU
使用率,完成了上述目标。而作为Scheduler
的重要组成部分,BlockSpaceManager
完成了内存块的管理。因而,本篇会围绕Scheduler
(调度逻辑)以及BlockSpaceManager
这两点展开。
一、Scheduler
1.概述
大模型推理框架中通过合理调度来增大吞吐量并非vLLM
首创,通过代码你会发现,vLLM
中的调度基于ORCA
团队提出的iteration-level scheduling,业界通常称为continuous batching
。
大模型推理的时候通常追求更大的batch
,以便充分发挥GPU
的并行性能。之前的大模型推理框架使用的batching
策略称为naive batching
或者static batching
。它是request-level
的调度,其执行逻辑是将一个batch
的输入喂给模型,需要等到所有序列均完成解码之后一起释放资源,进行下一个batch
。下图(来源于Anyscale)描述了这一过程,其中黄色块表示prompt token
,蓝色块表示生成的token
,左图为第一个迭代,右图表示batch
中所有序列完成。由于每个序列生成长度并相同,先完成的(较短的)需要等待其他序列完成后一起释放资源,GPU
没有得到充分的利用,如右图中空白区域所示。
continuous batching
则将调度细化到iteration-level
,意思是每经过一个迭代(生成一个token
)调度一次,对应到vLLM
代码就是之前讲过的LLMEngine.step()
这个方法。这种调度方式能够避免naive batching
等待所有序列完成才能释放资源的问题。如下图所示,在T5时刻S3序列已经完成,T6时刻(next iteration
)S3占用的显存被释放,S5加入batch
进行prefill
和decode
。这种策略极大的增加了吞吐量,降低了延时。
2.Scheduler._schedule_default()回顾
上一篇我们谈过,Scheduler
调度的朴素实现集中在self._schedule_default()
方法中,我们也已经论述过它的执行逻辑。现简要概括如下(代码和注释参考上一篇):
1)根据调度预算budget
,调度self.waiting
、self.running
以及self.swapped
;
2)只要self.waiting
中仍有请求,并且调度预算budget
允许,就优先调度self.waiting
,去做prefill
;
3)当2)不满足时,就调度self.running
和self.swapped
去做decode
,当然这两者有优先级之分;所以在vLLM
中,要么是做prefill
,要么是做decode
。
接下来我们来仔细看一下self._schedule_prefills()
、self._schedule_running()
以及self._schedule_swapped()
等重要方法。
3.Scheduler._schedule_prefills()
该方法的主干逻辑比较简单(代码和注释如下):对于waiting
队列中的请求,尽可能全部调度去做prefill
,但是受到如下几方面的制约:
prompt_limit
:调度配置中有两个参数限制了单个seq
的prompt
长度,1)在一个迭代中被处理的tokens
的最大数量max_num_batched_tokens
(涉及被调度的所有请求);2)模型可处理的一个seq
的最大长度max_model_len
,包括prompt
和生成的文本;取两者之间的更小值作为prompt_limit
;gpu_blocks
:gpu_blocks
是用来存储KV caches
的,没有充足的空闲的gpu_blocks
意味着没有空间可以缓存相应的KV caches
,因而需要通过self.block_manager.can_allocate(seq_group)
判断是否能够调度当前请求;budget
:调度预算限制主要体现在1)总的token
预算token_budget
,也就是上面提到的max_num_batched_tokens
;2)最大可处理的seq
的数量max_num_seqs
;
调度完成返回两部分:调度的结果,以及由于上述限制未能完成调度的部分,前者将会经历一次模型的前向过程,也就是做prefill
,后者等待下一次迭代的调度。
# vllm/core/scheduler.py
class Scheduler:def _schedule_prefills(self,waiting_queue: deque, # waiting队列budget: SchedulingBudget, # 调度预算curr_loras: Optional[Set[int]],enable_chunking: bool = False, # 是否使用chunked_prefill -> 我们默认是不用的) -> Tuple[deque, SchedulerPrefillOutputs]:"""调度在预填充(prefill)阶段的sequence groups,抢占(重新计算)会被视作新的prefill,尽可能调度,直至调度预算不够为止budget会跟着调度原位更新"""# 忽略的seq_groups,他们可能太长,直接忽略(但认为是被调度过,并完成了的,对应状态更新为FINISH_IGNORED)ignored_seq_groups: List[SequenceGroup] = []# 成功调度的seq_groupsseq_groups: List[SequenceGroup] = []# 拷贝,防止原位修改waiting_queue = deque([s for s in waiting_queue])# 剩余未能被调度的请求,此项和lora相关,忽略leftover_waiting_sequences: Deque[SequenceGroup] = deque()while self._passed_delay(time.time()) and waiting_queue:# waiting队列中选取第一个seq_group,且该seq_group只有一个seq序列,只包含promptseq_group = waiting_queue[0]waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)assert len(waiting_seqs) == 1, ("Waiting sequence group should have only one prompt ""sequence.")# 下一时刻需要计算的token数量,应该是整个prompt的长度num_new_tokens = self._get_num_new_tokens(seq_group,SequenceStatus.WAITING,enable_chunking, budget)# 验证下一时刻计算的token数等于prompt token数量if not enable_chunking:num_prompt_tokens = waiting_seqs[0].get_len()assert num_new_tokens == num_prompt_tokens# 超出prompt的长度限制,添加到ignored_seq_groups,序列seq状态设置为FINISHED_IGNOREDprompt_limit = self._get_prompt_limit(seq_group)if num_new_tokens > prompt_limit:logger.warning("Input prompt (%d tokens) is too long"" and exceeds limit of %d", num_new_tokens, prompt_limit)for seq in waiting_seqs:seq.status = SequenceStatus.FINISHED_IGNOREDignored_seq_groups.append(seq_group)waiting_queue.popleft()continue# block_manager确定是否能为该seq_group分配资源can_allocate = self.block_manager.can_allocate(seq_group)if can_allocate == AllocStatus.LATER:breakelif can_allocate == AllocStatus.NEVER:logger.warning("Input prompt (%d tokens) is too long"" and exceeds the capacity of block_manager",num_new_tokens)for seq in waiting_seqs:seq.status = SequenceStatus.FINISHED_IGNOREDignored_seq_groups.append(seq_group)waiting_queue.popleft()continue# lora相关,忽略lora_int_id = 0if self.lora_enabled:lora_int_id = seq_group.lora_int_idassert curr_loras is not Noneassert self.lora_config is not Noneif (self.lora_enabled and lora_int_id > 0and lora_int_id not in curr_lorasand len(curr_loras) >= self.lora_config.max_loras):# We don't have a space for another LoRA, so# we ignore this request for now.leftover_waiting_sequences.appendleft(seq_group)waiting_queue.popleft()continue# 预算已经用完,不能从waiting中调度更多的seq_group去做prefill,结束调度num_new_seqs = seq_group.get_max_num_running_seqs()if (num_new_tokens == 0 # ==0 这个好像没遇到or not budget.can_schedule(num_new_tokens=num_new_tokens,num_new_seqs=num_new_seqs)):break# 调度,并将序列的状态从WAITING改成RUNNINGif curr_loras is not None and lora_int_id > 0:curr_loras.add(lora_int_id)waiting_queue.popleft()self._allocate_and_set_running(seq_group) # block_managerseq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,token_chunk_size=num_new_tokens))# budget原位更新信息budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)budget.add_num_seqs(seq_group.request_id, num_new_seqs)# 剩余未能被调度的请求waiting_queue.extendleft(leftover_waiting_sequences)if len(seq_groups) > 0:self.prev_prompt = True# 剩余未能被调度的请求,prefill调度情况return waiting_queue, SchedulerPrefillOutputs(seq_groups=seq_groups,ignored_seq_groups=ignored_seq_groups,num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=True))# num_lookahead_slots=0,我们没有使用lookahead
4.Scheduler._schedule_running()
该方法调度可以分为两种情况:
- 空闲的
gpu_blocks
充足:这种情况可以直接调度; - 空闲的
gpu_blocks
不足:假如可用的gpu_blocks
数量都小于seqs
的数量,那么就无法直接调度(通常假设一个seq
至少占用一个block
);这时候就涉及到抢占preempt
,具体操作是从running_queue
中优先级低的开始不断剔除,根据preempt
模式,将这些剔除的部分装在preempted
或者swapped_out
,前者对应的是重计算,而后者则是将这些请求暂时搬到cpu
上,等待机会搬回gpu
继续计算;
blocks_to_swap_out
:由于抢占,需要记录gpu_block
和(转去的)cpu_block
;
blocks_to_copy
:对于parallel sampling
场景,需要对一个输入采样生成多个输出,此时一个seq_group
中就会存在多个seqs
。刚开始这些seqs
共享相同的prompt
,也就可以共享相同的内存空间(blocks
);当采样的token
开始不一样时,就会在当前block
上触发复制机制,申请一块新的physical_block
,并且复制当前block
上的tokens
。上述过程就是所谓的copy-on-write
机制;blocks_to_copy
是用来记录copy
前后的这两个block
的。
class Scheduler:# ...def _schedule_running(self,running_queue: deque, # running队列budget: SchedulingBudget, # 调度预算curr_loras: Optional[Set[int]],policy: Policy, # running队列的排序策略, 默认是fcfsenable_chunking: bool = False, # 是否使用chunked prefill,默认不使用) -> Tuple[deque, SchedulerRunningOutputs]:"""调度处于running状态的seq_groups"""# 被抢占之后需要swapp_out的block,gpu block number -> cpu block numberblocks_to_swap_out: List[Tuple[int, int]] = []# 当最后一个block与别的seqs共享的时候,需要申请一个新的block,并且复制这些tokensblocks_to_copy: List[Tuple[int, int]] = []# 通常情况下都是已完成prefill,在做decode,也是我们遇到的情况decode_seq_groups: List[ScheduledSequenceGroup] = []# chunked prefill对应的情况,我们这边不涉及,一直都是空prefill_seq_groups: List[ScheduledSequenceGroup] = []# 被抢占的seq_group和被swap的seq_group,对应于两种不同的抢占模式preempted: List[SequenceGroup] = []swapped_out: List[SequenceGroup] = []# NOTE(woosuk): Preemption happens only when there is no available slot# to keep all the sequence groups in the RUNNING state.# In this case, the policy is responsible for deciding which sequence# groups to preempt.now = time.time()running_queue = policy.sort_by_priority(now, running_queue) # 根据策略优先排序while running_queue:seq_group = running_queue[0]num_running_tokens = self._get_num_new_tokens(seq_group, SequenceStatus.RUNNING, enable_chunking, budget)if num_running_tokens == 0:# token budget不够,跳出循环breakrunning_queue.popleft()while not self._can_append_slots(seq_group): # 使用block_manager判断# 槽位不够,也就是新生成token存储kv cache的空间不够# 预算中移除相应的seq_group,也就是实时更新bugetbudget.subtract_num_batched_tokens(seq_group.request_id,num_running_tokens)num_running_seqs = seq_group.get_max_num_running_seqs()budget.subtract_num_seqs(seq_group.request_id,num_running_seqs)# lora,忽略if (curr_loras is not None and seq_group.lora_int_id > 0and seq_group.lora_int_id in curr_loras):curr_loras.remove(seq_group.lora_int_id)if running_queue:# 抢占最低优先级的seq_groupvictim_seq_group = running_queue.pop()preempted_mode = self._preempt(victim_seq_group,blocks_to_swap_out) # 根据抢占模式选择重计算还是转到cpu上if preempted_mode == PreemptionMode.RECOMPUTE:preempted.append(victim_seq_group)else:swapped_out.append(victim_seq_group)else:# 抢占当前的seq_group,也就是先直接干掉自己,并跳出循环preempted_mode = self._preempt(seq_group,blocks_to_swap_out)if preempted_mode == PreemptionMode.RECOMPUTE:preempted.append(seq_group)else:swapped_out.append(seq_group)breakelse:# 刚上来一般资源充足,都直接进入这个分支。# 有槽位,为该seq_group的seqs分配新的槽位;同时更新blocks_to_copy;self._append_slots(seq_group, blocks_to_copy)is_prefill = seq_group.is_prefill()if is_prefill:# chunked prefill的情况,忽略prefill_seq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,token_chunk_size=num_running_tokens))else:# 常规decodedecode_seq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,token_chunk_size=1))# 更新budgetbudget.add_num_batched_tokens(seq_group.request_id,num_running_tokens)# 这部分直接忽略# OPTIMIZATION: Note that get_max_num_running_seqs is# expensive. For the default scheduling chase where# enable_chunking is False, num_seqs are updated before running# this method, so we don't have to update it again here.if enable_chunking:num_running_seqs = seq_group.get_max_num_running_seqs()budget.add_num_seqs(seq_group.request_id, num_running_seqs)if curr_loras is not None and seq_group.lora_int_id > 0:curr_loras.add(seq_group.lora_int_id)return running_queue, SchedulerRunningOutputs(decode_seq_groups=decode_seq_groups,prefill_seq_groups=prefill_seq_groups,preempted=preempted,swapped_out=swapped_out,blocks_to_swap_out=blocks_to_swap_out,blocks_to_copy=blocks_to_copy,num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=False))
5.Scheduler._schedule_swapped()
该方法与Scheduler._shedule_running()
非常类似,因为它两本质上是一样的,只是这边需要swap in
,代码和注释见下方。
class Scheduler:# ...def _schedule_swapped(self,swapped_queue: deque, # swapped队列,包含的是被swapped out的请求budget: SchedulingBudget, # 调度预算curr_loras: Optional[Set[int]],policy: Policy, # swapped队列的排序策略enable_chunking: bool = False, # 是否使用chunked prefill,默认不使用) -> Tuple[deque, SchedulerSwappedInOutputs]:"""调度被swap out的sequence groups"""# 需要swapp_in的block,cpu block number -> gpu block numberblocks_to_swap_in: List[Tuple[int, int]] = []# 当最后一个block与别的seqs共享的时候,需要申请一个新的block,并且复制这些tokensblocks_to_copy: List[Tuple[int, int]] = []# 同 _schedule_running()decode_seq_groups: List[ScheduledSequenceGroup] = []prefill_seq_groups: List[ScheduledSequenceGroup] = []now = time.time()swapped_queue = policy.sort_by_priority(now, swapped_queue)# 不可行的seq_groups,类似于上面的ignored_seq_groupsinfeasible_seq_groups: List[SequenceGroup] = []# 剩余未能被调度的请求leftover_swapped: Deque[SequenceGroup] = deque()while swapped_queue:seq_group = swapped_queue[0]# block_manager确定是否能为该seq_group (swap in)分配资源is_prefill = seq_group.is_prefill()alloc_status = self.block_manager.can_swap_in(seq_group, self._get_num_lookahead_slots(is_prefill))if alloc_status == AllocStatus.LATER:breakelif alloc_status == AllocStatus.NEVER:logger.warning("Failing the request %s because there's not enough kv ""cache blocks to run the entire sequence.",seq_group.request_id)for seq in seq_group.get_seqs():seq.status = SequenceStatus.FINISHED_IGNOREDinfeasible_seq_groups.append(seq_group)swapped_queue.popleft()continue# lora,忽略lora_int_id = 0if self.lora_enabled:lora_int_id = seq_group.lora_int_idassert curr_loras is not Noneassert self.lora_config is not Noneif (lora_int_id > 0 and (lora_int_id not in curr_loras)and len(curr_loras) >= self.lora_config.max_loras):# We don't have a space for another LoRA, so# we ignore this request for now.leftover_swapped.appendleft(seq_group)swapped_queue.popleft()continue# The total number of sequences in the RUNNING state should not# exceed the maximum number of sequences.num_new_seqs = seq_group.get_max_num_running_seqs()num_new_tokens = self._get_num_new_tokens(seq_group,SequenceStatus.SWAPPED,enable_chunking, budget)# budget无法调度, 同 _schedule_prefillsif (num_new_tokens == 0or not budget.can_schedule(num_new_tokens=num_new_tokens,num_new_seqs=num_new_seqs)):breakif lora_int_id > 0 and curr_loras is not None:curr_loras.add(lora_int_id)# 调度,做swap in,并添加槽位swapped_queue.popleft()self._swap_in(seq_group, blocks_to_swap_in)self._append_slots(seq_group, blocks_to_copy)# 同 _schedule_running()is_prefill = seq_group.is_prefill()if is_prefill: # chunked prefill, 忽略prefill_seq_groups.append(ScheduledSequenceGroup(seq_group,token_chunk_size=num_new_tokens))else:decode_seq_groups.append(ScheduledSequenceGroup(seq_group, token_chunk_size=1))# 更新预算budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)budget.add_num_seqs(seq_group.request_id, num_new_seqs)# 剩余的无法完成调度的swapped_queueswapped_queue.extendleft(leftover_swapped)return swapped_queue, SchedulerSwappedInOutputs(decode_seq_groups=decode_seq_groups,prefill_seq_groups=prefill_seq_groups,blocks_to_swap_in=blocks_to_swap_in,blocks_to_copy=blocks_to_copy,num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=False),infeasible_seq_groups=infeasible_seq_groups,)
6.Scheduler._preempt()
前面提到,抢占preempt
是self._schedule_running
中的重要环节,它的目的是暂时舍弃优先级低的seqs
,确保优先级较高的部分有充足的blocks
或者slots
使用。根据不同的抢占模式(重计算RECOMPUTE
和置换SWAP
)执行不同的(被)抢占逻辑,需要记录block
的交换情况,更新seq
的状态。
def _preempt(self,seq_group: SequenceGroup,blocks_to_swap_out: List[Tuple[int, int]],preemption_mode: Optional[PreemptionMode] = None,) -> PreemptionMode:# If preemption mode is not specified, we determine the mode as follows:# We use recomputation by default since it incurs lower overhead than# swapping. However, when the sequence group has multiple sequences# (e.g., beam search), recomputation is not currently supported. In# such a case, we use swapping instead.# FIXME(woosuk): This makes our scheduling policy a bit bizarre.# As swapped sequences are prioritized over waiting sequences,# sequence groups with multiple sequences are implicitly prioritized# over sequence groups with a single sequence.# TODO(woosuk): Support recomputation for sequence groups with multiple# sequences. This may require a more sophisticated CUDA kernel.# 根据上面说的,确定抢占模式if self.user_specified_preemption_mode is None:if seq_group.get_max_num_running_seqs() == 1:preemption_mode = PreemptionMode.RECOMPUTEelse:preemption_mode = PreemptionMode.SWAPelif self.user_specified_preemption_mode == "swap":preemption_mode = PreemptionMode.SWAPelse:preemption_mode = PreemptionMode.RECOMPUTE# 发生过多抢占,发出提醒if self.num_cumulative_preemption % 50 == 0:logger.warning("Sequence group %s is preempted by %s mode because there is ""not enough KV cache space. This can affect the end-to-end ""performance. Increase gpu_memory_utilization or ""tensor_parallel_size to provide more KV cache memory. ""total_num_cumulative_preemption=%d", seq_group.request_id,preemption_mode, self.num_cumulative_preemption + 1)self.num_cumulative_preemption += 1# 根据不同的抢占模式执行不同的(被)抢占逻辑if preemption_mode == PreemptionMode.RECOMPUTE:self._preempt_by_recompute(seq_group)elif preemption_mode == PreemptionMode.SWAP:self._preempt_by_swap(seq_group, blocks_to_swap_out)else:raise AssertionError("Invalid preemption mode.")return preemption_modedef _preempt_by_recompute(self,seq_group: SequenceGroup,) -> None:"""重计算"""seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)assert len(seqs) == 1# 将seqs/seq_group恢复成waiting时期的状态,并将相应的block释放出来for seq in seqs:seq.status = SequenceStatus.WAITINGself.free_seq(seq)seq.reset_state_for_recompute()def _preempt_by_swap(self,seq_group: SequenceGroup,blocks_to_swap_out: List[Tuple[int, int]],) -> None:"""交换至cpu"""self._swap_out(seq_group, blocks_to_swap_out)def _swap_in(self,seq_group: SequenceGroup,blocks_to_swap_in: List[Tuple[int, int]],) -> None:"""cpu -> gpu,在self._schedule_swapped()中被调用"""# 更新swap_in,同时将seq状态设置为RUNNINGmapping = self.block_manager.swap_in(seq_group)blocks_to_swap_in.extend(mapping)for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):seq.status = SequenceStatus.RUNNINGdef _swap_out(self,seq_group: SequenceGroup,blocks_to_swap_out: List[Tuple[int, int]],) -> None:"""gpu -> cpu"""if not self.block_manager.can_swap_out(seq_group):# FIXME(woosuk): Abort the sequence group instead of aborting the# entire engine.raise RuntimeError("Aborted due to the lack of CPU swap space. Please increase ""the swap space to avoid this error.")# 更新swap_out,同时将seq状态设置为SWAPPEDmapping = self.block_manager.swap_out(seq_group)blocks_to_swap_out.extend(mapping)for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):seq.status = SequenceStatus.SWAPPED
7.Scheduler调度举例
为了更形象的说明Scheduler
的调度,我们这里举个小例子。假设用户请求有300条,每条的prompt tokens
个数是[27, 30, 24, 27, 30, 24, ...]
调度预算budget
的max_num_seqs
是256,max_num_batched_tokens
是2048。
1)self._schedule_prefills()
调度self.waiting
,从300条请求中按照时间顺序优先级调度了75条去做prefill
;为什么是75条?75条的token
数量为27 + 30 + 24 + ... = 2025 < 2048
,如果是76条就超过了max_num_batched_tokens
;
2)由于budget
的max_num_seqs
是256,而1)中只调度了75条,所以继续执行1)中操作3次(分3个iteration
完成),调度请求个数为75 + 75 + 75 + 31 = 256
,它们做完prefill
之后状态都修改成了RUNNING
,剩余44条仍然在self.waiting
中;
3)self._schedule_runnings()
调度self.running
(256条);self.swapped
此时还是空的,因此不需要调度;
4)多次执行3),如果存在free gpu blocks
不够的情况,则swap out
;如果有部分请求已完成,也就是处于RUNNING
状态的seq
个数少于max_num_seqs
,可以继续执行1)做prefill
。
总结一下:有可以做prefill
的请求是尽可能做prefill
,否则做decode
。
8.Scheduler与BlockSpaceManager的关系
Scheduler
的调度逻辑基本涵盖在以上几个方法中,但同时需要self.block_manager
对内存进行管理。为了更直观简洁的理解Scheduler
和BlockSpaceManager
之间的关系,我将两者的方法做成了一张表格如下。第一列是Scheduler
的上层方法;第二列是Scheduler
的底层的方法,通常被第一列的上层方法调用;第三列是BlockSpaceManager
的方法,和第二列有明显的对应关系,所以说block_manager
是scheduler
中非常重要的部分。
二、BlockSpaceManager
BlockSpaceManager
是我们用来管理逻辑块和物理块的一个类,BlockSpaceManagerV1
是它的一个子类,也是本次我们所用到的。我们将对其方法展开说一下,也是完备我们对Scheduler
整个过程的理解。
1.初始化
BlockSpaceManagerV1
初始化部分如下,创建了self.gpu_allocator
以及self.cpu_allocator
这两个内存分配器,由于我们这边场景相对简单,上述内存分配器是UncachedBlockAllocator
的实例化对象。那么self.gpu_allocator
的工作是什么呢?在开始的时候根据可用的显存创建物理块physical blocks
,当调度发生时,需要申请物理块,或者释放物理块,统计闲置的物理块,就这么简单。
# vll/core/block_manager_v1.py
class BlockSpaceManagerV1(BlockSpaceManager):"""管理逻辑块和物理块之间的映射"""def __init__(self,block_size: int, # block大小,默认16num_gpu_blocks: int, # 之前获取的gpu块的个数num_cpu_blocks: int, # 之前获取的cpu块的个数watermark: float = 0.01, # 水位线,用于留存一部分blockssliding_window: Optional[int] = None, # 是否使用sliding windowenable_caching: bool = False, # 是否使用chunked prefill) -> None:self.block_size = block_sizeself.num_total_gpu_blocks = num_gpu_blocksself.num_total_cpu_blocks = num_cpu_blocks# 忽略if enable_caching and sliding_window is not None:raise NotImplementedError("Sliding window is not allowed with prefix caching enabled!")self.block_sliding_window = Noneif sliding_window is not None:# Round up to nearest block size to regularize sliding window# allocation sizes.self.block_sliding_window = math.ceil(sliding_window / block_size)self.watermark = watermarkassert watermark >= 0.0self.enable_caching = enable_cachingself.watermark_blocks = int(watermark * num_gpu_blocks)if self.enable_caching:# 该情况下使用CachedBlockAllocator来分配block,我们暂时不涉及logger.info("Automatic prefix caching is enabled.")self.gpu_allocator: BlockAllocatorBase = CachedBlockAllocator(Device.GPU, block_size, num_gpu_blocks)self.cpu_allocator: BlockAllocatorBase = CachedBlockAllocator(Device.CPU, block_size, num_cpu_blocks)else:# 常规情况,使用UncachedBlockAllocatorself.gpu_allocator = UncachedBlockAllocator(Device.GPU, block_size, num_gpu_blocks)self.cpu_allocator = UncachedBlockAllocator(Device.CPU, block_size, num_cpu_blocks)# 记录seq_i到BlockTable的映射关系,也就是该seq使用了哪些blockself.block_tables: Dict[int, BlockTable] = {}# cross-attention涉及的由seqence_group.req_id到BlockTable的映射,只在encoder-decoder模型中才会用到# Note that each SequenceGroup has a unique request IDself.cross_block_tables: Dict[str, BlockTable] = {}
2.can_allocate() & allocate()
在Scheduler._schedule_prefills()
方法中,同时调用了self.can_allocate()
以及self.allocate()
方法。self.can_allocate()
判断当前是否有足够的空闲的gpu_blocks
分配给当前seq_group
做prefill
;在实际计算时,一般不会打满所有的gpu_blocks
而是留存一些,也就是这边的self.watermark_blocks
。
def BlockSpaceManagerV1(BlockSpaceManager):# ...def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:"""判断当前是否有足够的空闲的gpu_blocks可以分配"""# FIXME(woosuk): Here we assume that all sequences in the group share# the same prompt. This may not be true for preempted sequences.# 对于encoder-decoder模型的一些限制,跳过check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)# 计算所需的blocks数量self_num_required_blocks = self._get_seq_num_required_blocks(seq_group.get_seqs(status=SequenceStatus.WAITING)[0])cross_num_required_blocks = self._get_seq_num_required_blocks(seq_group.get_encoder_seq())num_required_blocks = self_num_required_blocks + \cross_num_required_blocksif self.block_sliding_window is not None:num_required_blocks = min(num_required_blocks,self.block_sliding_window)# 获取当前空闲的blocks数量num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()# 使用水位线来防止频繁的缓存驱逐,留点家底缓冲if (self.num_total_gpu_blocks - num_required_blocks <self.watermark_blocks):# 所需超过总的blocks数量,直接舍弃return AllocStatus.NEVERif num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:# blocks充裕,可以直接分配return AllocStatus.OKelse:# 空闲的blocks不够,需要等待后续释之后才能分配return AllocStatus.LATER
在self.can_allocate()
返回AllocStatus.OK
,也就是有充足的free_gpu_blocks
的情况下,可使用self.allocate()
方法为当前seq_group
分配gpu blocks
,我们走的分支非常简单(见代码和注释),通过self.gpu_allocator
分配一个free gpu block
,并更新其ref_count
(表示被几个seq
共用)。
def BlockSpaceManagerV1(BlockSpaceManager):# ...def allocate(self, seq_group: SequenceGroup) -> None:"""为seq_group分配gpu blocks"""# 对于encoder-decoder模型的一些限制,跳过is_encoder_decoder = seq_group.is_encoder_decoder()check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)# Allocate decoder sequencesseq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]block_table: BlockTable = \self._allocate_sequence(seq,seq_group.num_seqs(),is_encoder_decoder)# Assign the self-attention block tables for each sequence.for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):self.block_tables[seq.seq_id] = block_table.copy()# Allocate encoder sequenceif is_encoder_decoder:# A SequenceGroup has only a single encoder sequence (at most),# thus allocate with a ref count of 1block_table = self._allocate_sequence(seq_group.get_encoder_seq(),1, is_encoder_decoder)# Assign the cross-attention block table for the SequenceGroup.self.cross_block_tables[seq_group.request_id] = block_tabledef _allocate_sequence(self, \seq: Sequence, \ref_count: int, \is_encoder_decoder: bool = True) -> BlockTable:"""为prompt tokens申请新的物理块"""# 根据逻辑块分配/更新物理块num_prompt_blocks = len(seq.logical_token_blocks)block_table: BlockTable = [] # BlockTable = List[PhysicalTokenBlock]for logical_idx in range(num_prompt_blocks):# 前两种情况都不是常规情况,忽略if (self.block_sliding_window is not Noneand logical_idx >= self.block_sliding_window):block = block_table[logical_idx % self.block_sliding_window]# Set the reference counts of the token blocks.block.ref_count = ref_countelif not is_encoder_decoder and self.enable_caching:# 这种对应着CachedBlockAllocatorblock = self.gpu_allocator.allocate(seq.hash_of_block(logical_idx),seq.num_hashed_tokens_of_block(logical_idx))else:# 【走这条分支】非常简单的分配,并且为分配的block更新其ref_count,也就是该block为ref_count个seq共享block = self.gpu_allocator.allocate()block.ref_count = ref_countblock_table.append(block)return block_table
3.can_append_slots() & append_slots()
这两个方法是在Scheduler._schedule_running()
和Scheduler._schedule_swapped()
中使用的,也就是在decode
阶段,每次生成一个token
,那么每个seq
每次需要的是申请一个槽位slot
而不是像prefill
阶段那样申请若干个blocks
。
这边讲一下self.append_slot()
这个方法。seq
的逻辑块logical_token_blocks
的更新是要早于物理块physical_token_blocks
(在下述代码中用block_table
表示)的更新的,调用这个方法的时候,逻辑块已经完成了更新,也就是已经考虑新的token
,而物理块还处理待更新状态,这时候就分为两种情况:
1)当前block_table
中最后一个block
还有槽位,可以直接使用;
2)最后一个block
的block_size
个槽位全部使用完,这时就必须使用self._allocate_last_physical_block(seq)
新开一个物理块;从代码来看,该情况对应的分支就是len(block_table) < len(logical_blocks)
,也就是物理块比逻辑块少一块。
最后要说的是,self.append_slot()
的返回结果是blocks_to_copy
,也就是记录了copy-on-write
的信息。
def BlockSpaceManagerV1(BlockSpaceManager):# ...def can_append_slots(self,seq_group: SequenceGroup,num_lookahead_slots: int = 0) -> bool:"""对于该seq_group,是否有充足的空间(槽位slots)以支持继续生成"""assert (num_lookahead_slots == 0), "lookahead allocation not supported in BlockSpaceManagerV1"# Simple heuristic: If there is at least one free block# for each sequence, we can append.# 当每个seq至少能分配一个空闲的block时,可以选择appendnum_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()num_seqs = seq_group.num_seqs(status=SequenceStatus.RUNNING)return num_seqs <= num_free_gpu_blocksdef append_slots(self,seq: Sequence,num_lookahead_slots: int = 0,) -> List[Tuple[int, int]]:"""为新的token分配一个槽位"""# seq占用的逻辑块 & 物理块logical_blocks = seq.logical_token_blocksblock_table = self.block_tables[seq.seq_id]# 逻辑块个数大于物理块(比如当前token数是33,block_size=16, 逻辑块刚好开始第3块,物理块第2块刚好被填满,需要申请第3块)if len(block_table) < len(logical_blocks):assert len(block_table) == len(logical_blocks) - 1# 该分支忽略if (self.block_sliding_windowand len(block_table) >= self.block_sliding_window):# reuse a blockblock_table.append(block_table[len(block_table) %self.block_sliding_window])else:# 分配一个新的物理块new_block = self._allocate_last_physical_block(seq)block_table.append(new_block)# 这边返回的是blocks_to_copyreturn []# 在最后一个物理块中分配一个槽位给新的tokenlast_block = block_table[-1]assert last_block.device == Device.GPUif last_block.ref_count == 1:# 当前block没有被其他seqs共享,可以直接分配# 忽略if self.enable_caching:# If the last block is now complete, we may reuse an old block to save memory.maybe_new_block = self._maybe_promote_last_block(seq, last_block)block_table[-1] = maybe_new_blockreturn []else:# 当前block被其他seqs共享,需要使用Copy on Write# 分道扬镳,直接独开一个物理块,作为该seq新的最后一个物理块new_block = self._allocate_last_physical_block(seq)block_table[-1] = new_block# 这边free只是将ref_count减去1,而不是直接完全释放成空闲的blockself.gpu_allocator.free(last_block)# 返回copy的src和dst的block信息return [(last_block.block_number, new_block.block_number)]
4.其他方法
BlockSpaceManagerV1
还包含其他方法,诸如self.swap_out
、self.swap_in()
和self.free()
,此处不再一一展开,感兴趣的可以自行阅读源码。
三、CacheEngine
值得一提的是,BlockSpaceManger
只是做了一个block
的分配,是一种指派或者说是安排。这些block
实际存储指定tokens
的kv caches
是在模型执行部分由CacheEngine
来完成的,这里就不展开了。
总结
本篇展开讲述了Scheduler
以及BlockSpaceManager
的工作原理和过程。至此,相信大家对vLLM
框架的整个流程有了较为完整清晰的认识。当然,vLLM
中还有很多tricks
本系列尚未涉及,比如Prefix Caching
、Speculative Decoding
等等,这些部分可能会单独开一些系列并作为补充链接放在下一篇。