vLLM (6) - Scheduler BlockSpaceManager

系列文章目录

vLLM (1) - Qwen2推理&部署
vLLM (2) - 架构总览
vLLM (3) - Sequence & SequenceGroup
vLLM (4) - LLMEngine上篇
vLLM (5) - LLMEngine下篇
vLLM (6) - Scheduler & BlockSpaceManager

文章目录

  • 系列文章目录
  • 前言
  • 一、Scheduler
    • 1.概述
    • 2.Scheduler._schedule_default()回顾
    • 3.Scheduler._schedule_prefills()
    • 4.Scheduler._schedule_running()
    • 5.Scheduler._schedule_swapped()
    • 6.Scheduler._preempt()
    • 7.Scheduler调度举例
    • 8.Scheduler与BlockSpaceManager的关系
  • 二、BlockSpaceManager
    • 1.初始化
    • 2.can_allocate() & allocate()
    • 3.can_append_slots() & append_slots()
    • 4.其他方法
  • 三、CacheEngine
  • 总结


前言

前面两篇展开讲解了LLMEngine,涉及了WorkerCacheEngineModelRunnerScheduler等核心组件,相信大家对其工作原理与流程已经有了相对清晰的认识。受限于篇幅,上一篇我们没有对Scheduler彻底展开,这将会是本篇的重点。
回想一下vLLM等大模型推理框架解决的问题是什么?当大模型开放给用户的时候,会面对大量请求,因此推理框架的目标就是通过高效利用现有资源(比如显存)等方式来增大吞吐量,降低延迟,以便快速给到用户响应。在vLLM中,通过Scheduler合理的调度方式,以及PageAttention优化的GPU使用率,完成了上述目标。而作为Scheduler的重要组成部分,BlockSpaceManager完成了内存块的管理。因而,本篇会围绕Scheduler(调度逻辑)以及BlockSpaceManager这两点展开。


一、Scheduler

1.概述

大模型推理框架中通过合理调度来增大吞吐量并非vLLM首创,通过代码你会发现,vLLM中的调度基于ORCA团队提出的iteration-level scheduling,业界通常称为continuous batching
大模型推理的时候通常追求更大的batch,以便充分发挥GPU的并行性能。之前的大模型推理框架使用的batching策略称为naive batching或者static batching。它是request-level的调度,其执行逻辑是将一个batch的输入喂给模型,需要等到所有序列均完成解码之后一起释放资源,进行下一个batch。下图(来源于Anyscale)描述了这一过程,其中黄色块表示prompt token,蓝色块表示生成的token,左图为第一个迭代,右图表示batch中所有序列完成。由于每个序列生成长度并相同,先完成的(较短的)需要等待其他序列完成后一起释放资源,GPU没有得到充分的利用,如右图中空白区域所示。
在这里插入图片描述
continuous batching则将调度细化到iteration-level,意思是每经过一个迭代(生成一个token)调度一次,对应到vLLM代码就是之前讲过的LLMEngine.step()这个方法。这种调度方式能够避免naive batching等待所有序列完成才能释放资源的问题。如下图所示,在T5时刻S3序列已经完成,T6时刻(next iteration)S3占用的显存被释放,S5加入batch进行prefilldecode。这种策略极大的增加了吞吐量,降低了延时。
在这里插入图片描述

2.Scheduler._schedule_default()回顾

上一篇我们谈过,Scheduler调度的朴素实现集中在self._schedule_default()方法中,我们也已经论述过它的执行逻辑。现简要概括如下(代码和注释参考上一篇):
1)根据调度预算budget,调度self.waitingself.running以及self.swapped
2)只要self.waiting中仍有请求,并且调度预算budget允许,就优先调度self.waiting,去做prefill
3)当2)不满足时,就调度self.runningself.swapped去做decode,当然这两者有优先级之分;所以在vLLM中,要么是做prefill,要么是做decode
接下来我们来仔细看一下self._schedule_prefills()self._schedule_running()以及self._schedule_swapped()等重要方法。

3.Scheduler._schedule_prefills()

该方法的主干逻辑比较简单(代码和注释如下):对于waiting队列中的请求,尽可能全部调度去做prefill,但是受到如下几方面的制约:

  • prompt_limit:调度配置中有两个参数限制了单个seqprompt长度,1)在一个迭代中被处理的tokens的最大数量max_num_batched_tokens(涉及被调度的所有请求);2)模型可处理的一个seq的最大长度max_model_len,包括prompt和生成的文本;取两者之间的更小值作为prompt_limit
  • gpu_blocksgpu_blocks是用来存储KV caches的,没有充足的空闲的gpu_blocks意味着没有空间可以缓存相应的KV caches,因而需要通过self.block_manager.can_allocate(seq_group)判断是否能够调度当前请求;
  • budget:调度预算限制主要体现在1)总的token预算token_budget,也就是上面提到的max_num_batched_tokens;2)最大可处理的seq的数量max_num_seqs

调度完成返回两部分:调度的结果,以及由于上述限制未能完成调度的部分,前者将会经历一次模型的前向过程,也就是做prefill,后者等待下一次迭代的调度。

# vllm/core/scheduler.py
class Scheduler:def _schedule_prefills(self,waiting_queue: deque,            # waiting队列budget: SchedulingBudget,        # 调度预算curr_loras: Optional[Set[int]],enable_chunking: bool = False,   # 是否使用chunked_prefill -> 我们默认是不用的) -> Tuple[deque, SchedulerPrefillOutputs]:"""调度在预填充(prefill)阶段的sequence groups,抢占(重新计算)会被视作新的prefill,尽可能调度,直至调度预算不够为止budget会跟着调度原位更新"""# 忽略的seq_groups,他们可能太长,直接忽略(但认为是被调度过,并完成了的,对应状态更新为FINISH_IGNORED)ignored_seq_groups: List[SequenceGroup] = []# 成功调度的seq_groupsseq_groups: List[SequenceGroup] = []# 拷贝,防止原位修改waiting_queue = deque([s for s in waiting_queue])# 剩余未能被调度的请求,此项和lora相关,忽略leftover_waiting_sequences: Deque[SequenceGroup] = deque()while self._passed_delay(time.time()) and waiting_queue:# waiting队列中选取第一个seq_group,且该seq_group只有一个seq序列,只包含promptseq_group = waiting_queue[0]waiting_seqs = seq_group.get_seqs(status=SequenceStatus.WAITING)assert len(waiting_seqs) == 1, ("Waiting sequence group should have only one prompt ""sequence.")# 下一时刻需要计算的token数量,应该是整个prompt的长度num_new_tokens = self._get_num_new_tokens(seq_group,SequenceStatus.WAITING,enable_chunking, budget)# 验证下一时刻计算的token数等于prompt token数量if not enable_chunking:num_prompt_tokens = waiting_seqs[0].get_len()assert num_new_tokens == num_prompt_tokens# 超出prompt的长度限制,添加到ignored_seq_groups,序列seq状态设置为FINISHED_IGNOREDprompt_limit = self._get_prompt_limit(seq_group)if num_new_tokens > prompt_limit:logger.warning("Input prompt (%d tokens) is too long"" and exceeds limit of %d", num_new_tokens, prompt_limit)for seq in waiting_seqs:seq.status = SequenceStatus.FINISHED_IGNOREDignored_seq_groups.append(seq_group)waiting_queue.popleft()continue# block_manager确定是否能为该seq_group分配资源can_allocate = self.block_manager.can_allocate(seq_group)if can_allocate == AllocStatus.LATER:breakelif can_allocate == AllocStatus.NEVER:logger.warning("Input prompt (%d tokens) is too long"" and exceeds the capacity of block_manager",num_new_tokens)for seq in waiting_seqs:seq.status = SequenceStatus.FINISHED_IGNOREDignored_seq_groups.append(seq_group)waiting_queue.popleft()continue# lora相关,忽略lora_int_id = 0if self.lora_enabled:lora_int_id = seq_group.lora_int_idassert curr_loras is not Noneassert self.lora_config is not Noneif (self.lora_enabled and lora_int_id > 0and lora_int_id not in curr_lorasand len(curr_loras) >= self.lora_config.max_loras):# We don't have a space for another LoRA, so# we ignore this request for now.leftover_waiting_sequences.appendleft(seq_group)waiting_queue.popleft()continue# 预算已经用完,不能从waiting中调度更多的seq_group去做prefill,结束调度num_new_seqs = seq_group.get_max_num_running_seqs()if (num_new_tokens == 0    # ==0 这个好像没遇到or not budget.can_schedule(num_new_tokens=num_new_tokens,num_new_seqs=num_new_seqs)):break# 调度,并将序列的状态从WAITING改成RUNNINGif curr_loras is not None and lora_int_id > 0:curr_loras.add(lora_int_id)waiting_queue.popleft()self._allocate_and_set_running(seq_group)   # block_managerseq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,token_chunk_size=num_new_tokens))# budget原位更新信息budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)budget.add_num_seqs(seq_group.request_id, num_new_seqs)# 剩余未能被调度的请求waiting_queue.extendleft(leftover_waiting_sequences)if len(seq_groups) > 0:self.prev_prompt = True# 剩余未能被调度的请求,prefill调度情况return waiting_queue, SchedulerPrefillOutputs(seq_groups=seq_groups,ignored_seq_groups=ignored_seq_groups,num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=True))# num_lookahead_slots=0,我们没有使用lookahead

4.Scheduler._schedule_running()

该方法调度可以分为两种情况:

  • 空闲的gpu_blocks充足:这种情况可以直接调度;
  • 空闲的gpu_blocks不足:假如可用的gpu_blocks数量都小于seqs的数量,那么就无法直接调度(通常假设一个seq至少占用一个block);这时候就涉及到抢占preempt,具体操作是从running_queue中优先级低的开始不断剔除,根据preempt模式,将这些剔除的部分装在preempted或者swapped_out,前者对应的是重计算,而后者则是将这些请求暂时搬到cpu上,等待机会搬回gpu继续计算;

blocks_to_swap_out:由于抢占,需要记录gpu_block和(转去的)cpu_block
blocks_to_copy:对于parallel sampling场景,需要对一个输入采样生成多个输出,此时一个seq_group中就会存在多个seqs。刚开始这些seqs共享相同的prompt,也就可以共享相同的内存空间(blocks);当采样的token开始不一样时,就会在当前block上触发复制机制,申请一块新的physical_block,并且复制当前block上的tokens。上述过程就是所谓的copy-on-write机制;blocks_to_copy是用来记录copy前后的这两个block的。

class Scheduler:# ...def _schedule_running(self,running_queue: deque,             # running队列budget: SchedulingBudget,         # 调度预算curr_loras: Optional[Set[int]],policy: Policy,                   # running队列的排序策略, 默认是fcfsenable_chunking: bool = False,    # 是否使用chunked prefill,默认不使用) -> Tuple[deque, SchedulerRunningOutputs]:"""调度处于running状态的seq_groups"""# 被抢占之后需要swapp_out的block,gpu block number -> cpu block numberblocks_to_swap_out: List[Tuple[int, int]] = []# 当最后一个block与别的seqs共享的时候,需要申请一个新的block,并且复制这些tokensblocks_to_copy: List[Tuple[int, int]] = []# 通常情况下都是已完成prefill,在做decode,也是我们遇到的情况decode_seq_groups: List[ScheduledSequenceGroup] = []# chunked prefill对应的情况,我们这边不涉及,一直都是空prefill_seq_groups: List[ScheduledSequenceGroup] = []# 被抢占的seq_group和被swap的seq_group,对应于两种不同的抢占模式preempted: List[SequenceGroup] = []swapped_out: List[SequenceGroup] = []# NOTE(woosuk): Preemption happens only when there is no available slot# to keep all the sequence groups in the RUNNING state.# In this case, the policy is responsible for deciding which sequence# groups to preempt.now = time.time()running_queue = policy.sort_by_priority(now, running_queue)  # 根据策略优先排序while running_queue:seq_group = running_queue[0]num_running_tokens = self._get_num_new_tokens(seq_group, SequenceStatus.RUNNING, enable_chunking, budget)if num_running_tokens == 0:# token budget不够,跳出循环breakrunning_queue.popleft()while not self._can_append_slots(seq_group):   # 使用block_manager判断# 槽位不够,也就是新生成token存储kv cache的空间不够# 预算中移除相应的seq_group,也就是实时更新bugetbudget.subtract_num_batched_tokens(seq_group.request_id,num_running_tokens)num_running_seqs = seq_group.get_max_num_running_seqs()budget.subtract_num_seqs(seq_group.request_id,num_running_seqs)# lora,忽略if (curr_loras is not None and seq_group.lora_int_id > 0and seq_group.lora_int_id in curr_loras):curr_loras.remove(seq_group.lora_int_id)if running_queue:# 抢占最低优先级的seq_groupvictim_seq_group = running_queue.pop()preempted_mode = self._preempt(victim_seq_group,blocks_to_swap_out) # 根据抢占模式选择重计算还是转到cpu上if preempted_mode == PreemptionMode.RECOMPUTE:preempted.append(victim_seq_group)else:swapped_out.append(victim_seq_group)else:# 抢占当前的seq_group,也就是先直接干掉自己,并跳出循环preempted_mode = self._preempt(seq_group,blocks_to_swap_out)if preempted_mode == PreemptionMode.RECOMPUTE:preempted.append(seq_group)else:swapped_out.append(seq_group)breakelse:# 刚上来一般资源充足,都直接进入这个分支。# 有槽位,为该seq_group的seqs分配新的槽位;同时更新blocks_to_copy;self._append_slots(seq_group, blocks_to_copy)is_prefill = seq_group.is_prefill()if is_prefill:# chunked prefill的情况,忽略prefill_seq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,token_chunk_size=num_running_tokens))else:# 常规decodedecode_seq_groups.append(ScheduledSequenceGroup(seq_group=seq_group,token_chunk_size=1))# 更新budgetbudget.add_num_batched_tokens(seq_group.request_id,num_running_tokens)# 这部分直接忽略# OPTIMIZATION:  Note that get_max_num_running_seqs is# expensive. For the default scheduling chase where# enable_chunking is False, num_seqs are updated before running# this method, so we don't have to update it again here.if enable_chunking:num_running_seqs = seq_group.get_max_num_running_seqs()budget.add_num_seqs(seq_group.request_id, num_running_seqs)if curr_loras is not None and seq_group.lora_int_id > 0:curr_loras.add(seq_group.lora_int_id)return running_queue, SchedulerRunningOutputs(decode_seq_groups=decode_seq_groups,prefill_seq_groups=prefill_seq_groups,preempted=preempted,swapped_out=swapped_out,blocks_to_swap_out=blocks_to_swap_out,blocks_to_copy=blocks_to_copy,num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=False))

5.Scheduler._schedule_swapped()

该方法与Scheduler._shedule_running()非常类似,因为它两本质上是一样的,只是这边需要swap in,代码和注释见下方。

class Scheduler:# ...def _schedule_swapped(self,swapped_queue: deque,            # swapped队列,包含的是被swapped out的请求budget: SchedulingBudget,        # 调度预算curr_loras: Optional[Set[int]],policy: Policy,                  # swapped队列的排序策略enable_chunking: bool = False,   # 是否使用chunked prefill,默认不使用) -> Tuple[deque, SchedulerSwappedInOutputs]:"""调度被swap out的sequence groups"""# 需要swapp_in的block,cpu block number -> gpu block numberblocks_to_swap_in: List[Tuple[int, int]] = []# 当最后一个block与别的seqs共享的时候,需要申请一个新的block,并且复制这些tokensblocks_to_copy: List[Tuple[int, int]] = []# 同 _schedule_running()decode_seq_groups: List[ScheduledSequenceGroup] = []prefill_seq_groups: List[ScheduledSequenceGroup] = []now = time.time()swapped_queue = policy.sort_by_priority(now, swapped_queue)# 不可行的seq_groups,类似于上面的ignored_seq_groupsinfeasible_seq_groups: List[SequenceGroup] = []# 剩余未能被调度的请求leftover_swapped: Deque[SequenceGroup] = deque()while swapped_queue:seq_group = swapped_queue[0]# block_manager确定是否能为该seq_group (swap in)分配资源is_prefill = seq_group.is_prefill()alloc_status = self.block_manager.can_swap_in(seq_group, self._get_num_lookahead_slots(is_prefill))if alloc_status == AllocStatus.LATER:breakelif alloc_status == AllocStatus.NEVER:logger.warning("Failing the request %s because there's not enough kv ""cache blocks to run the entire sequence.",seq_group.request_id)for seq in seq_group.get_seqs():seq.status = SequenceStatus.FINISHED_IGNOREDinfeasible_seq_groups.append(seq_group)swapped_queue.popleft()continue# lora,忽略lora_int_id = 0if self.lora_enabled:lora_int_id = seq_group.lora_int_idassert curr_loras is not Noneassert self.lora_config is not Noneif (lora_int_id > 0 and (lora_int_id not in curr_loras)and len(curr_loras) >= self.lora_config.max_loras):# We don't have a space for another LoRA, so# we ignore this request for now.leftover_swapped.appendleft(seq_group)swapped_queue.popleft()continue# The total number of sequences in the RUNNING state should not# exceed the maximum number of sequences.num_new_seqs = seq_group.get_max_num_running_seqs()num_new_tokens = self._get_num_new_tokens(seq_group,SequenceStatus.SWAPPED,enable_chunking, budget)# budget无法调度, 同 _schedule_prefillsif (num_new_tokens == 0or not budget.can_schedule(num_new_tokens=num_new_tokens,num_new_seqs=num_new_seqs)):breakif lora_int_id > 0 and curr_loras is not None:curr_loras.add(lora_int_id)# 调度,做swap in,并添加槽位swapped_queue.popleft()self._swap_in(seq_group, blocks_to_swap_in)self._append_slots(seq_group, blocks_to_copy)# 同 _schedule_running()is_prefill = seq_group.is_prefill()if is_prefill:  # chunked prefill, 忽略prefill_seq_groups.append(ScheduledSequenceGroup(seq_group,token_chunk_size=num_new_tokens))else:decode_seq_groups.append(ScheduledSequenceGroup(seq_group, token_chunk_size=1))# 更新预算budget.add_num_batched_tokens(seq_group.request_id, num_new_tokens)budget.add_num_seqs(seq_group.request_id, num_new_seqs)# 剩余的无法完成调度的swapped_queueswapped_queue.extendleft(leftover_swapped)return swapped_queue, SchedulerSwappedInOutputs(decode_seq_groups=decode_seq_groups,prefill_seq_groups=prefill_seq_groups,blocks_to_swap_in=blocks_to_swap_in,blocks_to_copy=blocks_to_copy,num_lookahead_slots=self._get_num_lookahead_slots(is_prefill=False),infeasible_seq_groups=infeasible_seq_groups,)

6.Scheduler._preempt()

前面提到,抢占preemptself._schedule_running中的重要环节,它的目的是暂时舍弃优先级低的seqs,确保优先级较高的部分有充足的blocks或者slots使用。根据不同的抢占模式(重计算RECOMPUTE和置换SWAP)执行不同的(被)抢占逻辑,需要记录block的交换情况,更新seq的状态。

    def _preempt(self,seq_group: SequenceGroup,blocks_to_swap_out: List[Tuple[int, int]],preemption_mode: Optional[PreemptionMode] = None,) -> PreemptionMode:# If preemption mode is not specified, we determine the mode as follows:# We use recomputation by default since it incurs lower overhead than# swapping. However, when the sequence group has multiple sequences# (e.g., beam search), recomputation is not currently supported. In# such a case, we use swapping instead.# FIXME(woosuk): This makes our scheduling policy a bit bizarre.# As swapped sequences are prioritized over waiting sequences,# sequence groups with multiple sequences are implicitly prioritized# over sequence groups with a single sequence.# TODO(woosuk): Support recomputation for sequence groups with multiple# sequences. This may require a more sophisticated CUDA kernel.# 根据上面说的,确定抢占模式if self.user_specified_preemption_mode is None:if seq_group.get_max_num_running_seqs() == 1:preemption_mode = PreemptionMode.RECOMPUTEelse:preemption_mode = PreemptionMode.SWAPelif self.user_specified_preemption_mode == "swap":preemption_mode = PreemptionMode.SWAPelse:preemption_mode = PreemptionMode.RECOMPUTE# 发生过多抢占,发出提醒if self.num_cumulative_preemption % 50 == 0:logger.warning("Sequence group %s is preempted by %s mode because there is ""not enough KV cache space. This can affect the end-to-end ""performance. Increase gpu_memory_utilization or ""tensor_parallel_size to provide more KV cache memory. ""total_num_cumulative_preemption=%d", seq_group.request_id,preemption_mode, self.num_cumulative_preemption + 1)self.num_cumulative_preemption += 1# 根据不同的抢占模式执行不同的(被)抢占逻辑if preemption_mode == PreemptionMode.RECOMPUTE:self._preempt_by_recompute(seq_group)elif preemption_mode == PreemptionMode.SWAP:self._preempt_by_swap(seq_group, blocks_to_swap_out)else:raise AssertionError("Invalid preemption mode.")return preemption_modedef _preempt_by_recompute(self,seq_group: SequenceGroup,) -> None:"""重计算"""seqs = seq_group.get_seqs(status=SequenceStatus.RUNNING)assert len(seqs) == 1# 将seqs/seq_group恢复成waiting时期的状态,并将相应的block释放出来for seq in seqs:seq.status = SequenceStatus.WAITINGself.free_seq(seq)seq.reset_state_for_recompute()def _preempt_by_swap(self,seq_group: SequenceGroup,blocks_to_swap_out: List[Tuple[int, int]],) -> None:"""交换至cpu"""self._swap_out(seq_group, blocks_to_swap_out)def _swap_in(self,seq_group: SequenceGroup,blocks_to_swap_in: List[Tuple[int, int]],) -> None:"""cpu -> gpu,在self._schedule_swapped()中被调用"""# 更新swap_in,同时将seq状态设置为RUNNINGmapping = self.block_manager.swap_in(seq_group)blocks_to_swap_in.extend(mapping)for seq in seq_group.get_seqs(status=SequenceStatus.SWAPPED):seq.status = SequenceStatus.RUNNINGdef _swap_out(self,seq_group: SequenceGroup,blocks_to_swap_out: List[Tuple[int, int]],) -> None:"""gpu -> cpu"""if not self.block_manager.can_swap_out(seq_group):# FIXME(woosuk): Abort the sequence group instead of aborting the# entire engine.raise RuntimeError("Aborted due to the lack of CPU swap space. Please increase ""the swap space to avoid this error.")# 更新swap_out,同时将seq状态设置为SWAPPEDmapping = self.block_manager.swap_out(seq_group)blocks_to_swap_out.extend(mapping)for seq in seq_group.get_seqs(status=SequenceStatus.RUNNING):seq.status = SequenceStatus.SWAPPED

7.Scheduler调度举例

为了更形象的说明Scheduler的调度,我们这里举个小例子。假设用户请求有300条,每条的prompt tokens个数是[27, 30, 24, 27, 30, 24, ...]调度预算budgetmax_num_seqs是256,max_num_batched_tokens是2048。
1)self._schedule_prefills()调度self.waiting,从300条请求中按照时间顺序优先级调度了75条去做prefill;为什么是75条?75条的token数量为27 + 30 + 24 + ... = 2025 < 2048,如果是76条就超过了max_num_batched_tokens
2)由于budgetmax_num_seqs是256,而1)中只调度了75条,所以继续执行1)中操作3次(分3个iteration完成),调度请求个数为75 + 75 + 75 + 31 = 256,它们做完prefill之后状态都修改成了RUNNING,剩余44条仍然在self.waiting中;
3)self._schedule_runnings()调度self.running(256条);self.swapped此时还是空的,因此不需要调度;
4)多次执行3),如果存在free gpu blocks不够的情况,则swap out;如果有部分请求已完成,也就是处于RUNNING状态的seq个数少于max_num_seqs,可以继续执行1)做prefill
总结一下:有可以做prefill的请求是尽可能做prefill,否则做decode

8.Scheduler与BlockSpaceManager的关系

Scheduler的调度逻辑基本涵盖在以上几个方法中,但同时需要self.block_manager对内存进行管理。为了更直观简洁的理解SchedulerBlockSpaceManager之间的关系,我将两者的方法做成了一张表格如下。第一列是Scheduler的上层方法;第二列是Scheduler的底层的方法,通常被第一列的上层方法调用;第三列是BlockSpaceManager的方法,和第二列有明显的对应关系,所以说block_managerscheduler中非常重要的部分。
在这里插入图片描述

二、BlockSpaceManager

BlockSpaceManager是我们用来管理逻辑块和物理块的一个类,BlockSpaceManagerV1是它的一个子类,也是本次我们所用到的。我们将对其方法展开说一下,也是完备我们对Scheduler整个过程的理解。

1.初始化

BlockSpaceManagerV1初始化部分如下,创建了self.gpu_allocator以及self.cpu_allocator这两个内存分配器,由于我们这边场景相对简单,上述内存分配器是UncachedBlockAllocator的实例化对象。那么self.gpu_allocator的工作是什么呢?在开始的时候根据可用的显存创建物理块physical blocks,当调度发生时,需要申请物理块,或者释放物理块,统计闲置的物理块,就这么简单。

# vll/core/block_manager_v1.py
class BlockSpaceManagerV1(BlockSpaceManager):"""管理逻辑块和物理块之间的映射"""def __init__(self,block_size: int,        # block大小,默认16num_gpu_blocks: int,    # 之前获取的gpu块的个数num_cpu_blocks: int,    # 之前获取的cpu块的个数watermark: float = 0.01,   # 水位线,用于留存一部分blockssliding_window: Optional[int] = None,   # 是否使用sliding windowenable_caching: bool = False,           # 是否使用chunked prefill) -> None:self.block_size = block_sizeself.num_total_gpu_blocks = num_gpu_blocksself.num_total_cpu_blocks = num_cpu_blocks# 忽略if enable_caching and sliding_window is not None:raise NotImplementedError("Sliding window is not allowed with prefix caching enabled!")self.block_sliding_window = Noneif sliding_window is not None:# Round up to nearest block size to regularize sliding window# allocation sizes.self.block_sliding_window = math.ceil(sliding_window / block_size)self.watermark = watermarkassert watermark >= 0.0self.enable_caching = enable_cachingself.watermark_blocks = int(watermark * num_gpu_blocks)if self.enable_caching:# 该情况下使用CachedBlockAllocator来分配block,我们暂时不涉及logger.info("Automatic prefix caching is enabled.")self.gpu_allocator: BlockAllocatorBase = CachedBlockAllocator(Device.GPU, block_size, num_gpu_blocks)self.cpu_allocator: BlockAllocatorBase = CachedBlockAllocator(Device.CPU, block_size, num_cpu_blocks)else:# 常规情况,使用UncachedBlockAllocatorself.gpu_allocator = UncachedBlockAllocator(Device.GPU, block_size, num_gpu_blocks)self.cpu_allocator = UncachedBlockAllocator(Device.CPU, block_size, num_cpu_blocks)# 记录seq_i到BlockTable的映射关系,也就是该seq使用了哪些blockself.block_tables: Dict[int, BlockTable] = {}# cross-attention涉及的由seqence_group.req_id到BlockTable的映射,只在encoder-decoder模型中才会用到# Note that each SequenceGroup has a unique request IDself.cross_block_tables: Dict[str, BlockTable] = {}

2.can_allocate() & allocate()

Scheduler._schedule_prefills()方法中,同时调用了self.can_allocate()以及self.allocate()方法。self.can_allocate()判断当前是否有足够的空闲的gpu_blocks分配给当前seq_groupprefill;在实际计算时,一般不会打满所有的gpu_blocks而是留存一些,也就是这边的self.watermark_blocks

def BlockSpaceManagerV1(BlockSpaceManager):# ...def can_allocate(self, seq_group: SequenceGroup) -> AllocStatus:"""判断当前是否有足够的空闲的gpu_blocks可以分配"""# FIXME(woosuk): Here we assume that all sequences in the group share# the same prompt. This may not be true for preempted sequences.# 对于encoder-decoder模型的一些限制,跳过check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)# 计算所需的blocks数量self_num_required_blocks = self._get_seq_num_required_blocks(seq_group.get_seqs(status=SequenceStatus.WAITING)[0])cross_num_required_blocks = self._get_seq_num_required_blocks(seq_group.get_encoder_seq())num_required_blocks = self_num_required_blocks + \cross_num_required_blocksif self.block_sliding_window is not None:num_required_blocks = min(num_required_blocks,self.block_sliding_window)# 获取当前空闲的blocks数量num_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()# 使用水位线来防止频繁的缓存驱逐,留点家底缓冲if (self.num_total_gpu_blocks - num_required_blocks <self.watermark_blocks):# 所需超过总的blocks数量,直接舍弃return AllocStatus.NEVERif num_free_gpu_blocks - num_required_blocks >= self.watermark_blocks:# blocks充裕,可以直接分配return AllocStatus.OKelse:# 空闲的blocks不够,需要等待后续释之后才能分配return AllocStatus.LATER

self.can_allocate()返回AllocStatus.OK,也就是有充足的free_gpu_blocks的情况下,可使用self.allocate()方法为当前seq_group分配gpu blocks,我们走的分支非常简单(见代码和注释),通过self.gpu_allocator分配一个free gpu block,并更新其ref_count(表示被几个seq共用)。

def BlockSpaceManagerV1(BlockSpaceManager):# ...def allocate(self, seq_group: SequenceGroup) -> None:"""为seq_group分配gpu blocks"""# 对于encoder-decoder模型的一些限制,跳过is_encoder_decoder = seq_group.is_encoder_decoder()check_no_caching_or_swa_for_blockmgr_encdec(self, seq_group)# Allocate decoder sequencesseq = seq_group.get_seqs(status=SequenceStatus.WAITING)[0]block_table: BlockTable = \self._allocate_sequence(seq,seq_group.num_seqs(),is_encoder_decoder)# Assign the self-attention block tables for each sequence.for seq in seq_group.get_seqs(status=SequenceStatus.WAITING):self.block_tables[seq.seq_id] = block_table.copy()# Allocate encoder sequenceif is_encoder_decoder:# A SequenceGroup has only a single encoder sequence (at most),# thus allocate with a ref count of 1block_table = self._allocate_sequence(seq_group.get_encoder_seq(),1, is_encoder_decoder)# Assign the cross-attention block table for the SequenceGroup.self.cross_block_tables[seq_group.request_id] = block_tabledef _allocate_sequence(self, \seq: Sequence, \ref_count: int, \is_encoder_decoder: bool = True) -> BlockTable:"""为prompt tokens申请新的物理块"""# 根据逻辑块分配/更新物理块num_prompt_blocks = len(seq.logical_token_blocks)block_table: BlockTable = []   # BlockTable = List[PhysicalTokenBlock]for logical_idx in range(num_prompt_blocks):# 前两种情况都不是常规情况,忽略if (self.block_sliding_window is not Noneand logical_idx >= self.block_sliding_window):block = block_table[logical_idx % self.block_sliding_window]# Set the reference counts of the token blocks.block.ref_count = ref_countelif not is_encoder_decoder and self.enable_caching:# 这种对应着CachedBlockAllocatorblock = self.gpu_allocator.allocate(seq.hash_of_block(logical_idx),seq.num_hashed_tokens_of_block(logical_idx))else:# 【走这条分支】非常简单的分配,并且为分配的block更新其ref_count,也就是该block为ref_count个seq共享block = self.gpu_allocator.allocate()block.ref_count = ref_countblock_table.append(block)return block_table

3.can_append_slots() & append_slots()

这两个方法是在Scheduler._schedule_running()Scheduler._schedule_swapped()中使用的,也就是在decode阶段,每次生成一个token,那么每个seq每次需要的是申请一个槽位slot而不是像prefill阶段那样申请若干个blocks
这边讲一下self.append_slot()这个方法。seq的逻辑块logical_token_blocks的更新是要早于物理块physical_token_blocks(在下述代码中用block_table表示)的更新的,调用这个方法的时候,逻辑块已经完成了更新,也就是已经考虑新的token,而物理块还处理待更新状态,这时候就分为两种情况:
1)当前block_table中最后一个block还有槽位,可以直接使用;
2)最后一个blockblock_size个槽位全部使用完,这时就必须使用self._allocate_last_physical_block(seq)新开一个物理块;从代码来看,该情况对应的分支就是len(block_table) < len(logical_blocks),也就是物理块比逻辑块少一块。
最后要说的是,self.append_slot()的返回结果是blocks_to_copy,也就是记录了copy-on-write的信息。

def BlockSpaceManagerV1(BlockSpaceManager):# ...def can_append_slots(self,seq_group: SequenceGroup,num_lookahead_slots: int = 0) -> bool:"""对于该seq_group,是否有充足的空间(槽位slots)以支持继续生成"""assert (num_lookahead_slots == 0), "lookahead allocation not supported in BlockSpaceManagerV1"# Simple heuristic: If there is at least one free block# for each sequence, we can append.# 当每个seq至少能分配一个空闲的block时,可以选择appendnum_free_gpu_blocks = self.gpu_allocator.get_num_free_blocks()num_seqs = seq_group.num_seqs(status=SequenceStatus.RUNNING)return num_seqs <= num_free_gpu_blocksdef append_slots(self,seq: Sequence,num_lookahead_slots: int = 0,) -> List[Tuple[int, int]]:"""为新的token分配一个槽位"""# seq占用的逻辑块 & 物理块logical_blocks = seq.logical_token_blocksblock_table = self.block_tables[seq.seq_id]# 逻辑块个数大于物理块(比如当前token数是33,block_size=16, 逻辑块刚好开始第3块,物理块第2块刚好被填满,需要申请第3块)if len(block_table) < len(logical_blocks):assert len(block_table) == len(logical_blocks) - 1# 该分支忽略if (self.block_sliding_windowand len(block_table) >= self.block_sliding_window):# reuse a blockblock_table.append(block_table[len(block_table) %self.block_sliding_window])else:# 分配一个新的物理块new_block = self._allocate_last_physical_block(seq)block_table.append(new_block)# 这边返回的是blocks_to_copyreturn []# 在最后一个物理块中分配一个槽位给新的tokenlast_block = block_table[-1]assert last_block.device == Device.GPUif last_block.ref_count == 1:# 当前block没有被其他seqs共享,可以直接分配# 忽略if self.enable_caching:# If the last block is now complete, we may reuse an old block to save memory.maybe_new_block = self._maybe_promote_last_block(seq, last_block)block_table[-1] = maybe_new_blockreturn []else:# 当前block被其他seqs共享,需要使用Copy on Write# 分道扬镳,直接独开一个物理块,作为该seq新的最后一个物理块new_block = self._allocate_last_physical_block(seq)block_table[-1] = new_block# 这边free只是将ref_count减去1,而不是直接完全释放成空闲的blockself.gpu_allocator.free(last_block)# 返回copy的src和dst的block信息return [(last_block.block_number, new_block.block_number)]

4.其他方法

BlockSpaceManagerV1还包含其他方法,诸如self.swap_outself.swap_in()self.free(),此处不再一一展开,感兴趣的可以自行阅读源码。

三、CacheEngine

值得一提的是,BlockSpaceManger只是做了一个block的分配,是一种指派或者说是安排。这些block实际存储指定tokenskv caches是在模型执行部分由CacheEngine来完成的,这里就不展开了。


总结

本篇展开讲述了Scheduler以及BlockSpaceManager的工作原理和过程。至此,相信大家对vLLM框架的整个流程有了较为完整清晰的认识。当然,vLLM中还有很多tricks本系列尚未涉及,比如Prefix CachingSpeculative Decoding等等,这些部分可能会单独开一些系列并作为补充链接放在下一篇。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1549485.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

区块链可投会议CCF C--FC 2025 截止10.8 附录用率

Conference&#xff1a;Financial Cryptography and Data Security (FC) CCF level&#xff1a;CCF C Categories&#xff1a;network and information security Year&#xff1a;2025 Conference time&#xff1a;14–18 April 2025, Miyakojima, Japan 录用率&#xff1…

Elasticsearch学习笔记(1)

初识 Elasticsearch 认识和安装 Elasticsearch 是由 Elastic 公司开发的一套强大的搜索引擎技术&#xff0c;属于 Elastic 技术栈的一部分。完整的技术栈包括&#xff1a; Elasticsearch&#xff1a;用于数据存储、计算和搜索。Logstash/Beats&#xff1a;用于数据收集。Kib…

【教学类-18-04】20240508《蒙德里安“黑白格子画” 七款图案挑选》

背景需求 最近有2位客户买了蒙德里安黑白格子画的素材&#xff0c;其中一位问是否是1000张。 【教学类-18-03】20240508《蒙德里安“红黄蓝黑格子画”-A4横版》&#xff08;大小格子&#xff09;_processing简单图形画蒙德里安-CSDN博客文章浏览阅读1.1k次&#xff0c;点赞35次…

Python自动收发邮件的详细步骤与使用方法?

Python自动收发邮件教程&#xff1f;Python怎么实现收发邮件&#xff1f; Python作为一种强大的编程语言&#xff0c;提供了丰富的库和工具&#xff0c;使得自动收发邮件变得简单而高效。AokSend将详细介绍如何使用Python自动收发邮件&#xff0c;帮助读者掌握这一实用技能。 …

测试开发面试题大全(含答案+文档)

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 某基金管理公司线下测试开发面试题总结。 测开题目如下 可以尝试自己先写&#xff0c;写完之后再去看参考解法哦 ~ 1、编写一段代码&#xff0c;把 list 的数平…

安装 Nacos 启动报错 java.lang.IllegalArgumentException: db.num is null

java.io.IOException: java.lang.IllegalArgumentException: db.num is nullat com.alibaba.nacos.config.server.service.datasource.ExternalDataSourceServiceImpl.reload(ExternalDataSourceServiceImpl.java:130)解决办法&#xff1a; 编辑 startup.cmd 文件 找到 set MO…

【Python】Pythonic Data Structures and Algorithms:深入浅出数据结构与算法的 Python 实现

Pythonic Data Structures and Algorithms 是一个开源项目&#xff0c;汇集了各种经典数据结构和算法的 Python 实现。该项目旨在为开发者提供丰富的学习资源&#xff0c;帮助他们通过 Python 代码理解和掌握数据结构与算法的核心原理和应用。项目中的算法涵盖了排序、搜索、图…

添加vscode插件C/C++ snippets,快速生成LVGL .c/.h文件模版

文章目录 一、安装插件二、在安装目录下添加c.json和cpp.json文件①在 C:/Users/yourname/AppData/Roaming/Code/User/snippets/ 目录下创建 c.json 并填入如下内容&#xff1a;②在 C:/Users/yourname/AppData/Roaming/Code/User/snippets/ 目录下创建 cpp.json 并填入如下内容…

一文上手SpringSecurity【七】

之前我们在测试的时候,都是使用的字符串充当用户名称和密码,本篇将其换成MySQL数据库. 一、替换为真实的MySQL 1.1 引入依赖 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</v…

QTday1代码的形式实现登录框

代码注释 main.cpp #include "widget.h"#include <QApplication>int main(int argc, char *argv[]) {QApplication a(argc, argv);//调用应用程序类的有参构造的实例化对象Widget w;//调用自定义的有参构造实例化的对象w.show();//调用该类的父类里的成员函数…

windows系统中后台运行java程序

在windows系统中后台运行java程序&#xff0c;就是在启动java程序后&#xff0c;关闭命令行行窗口执行。 1、命令行方式 命令行方式运行java程序 启动脚本如下&#xff1a; echo off start java -jar app.jar exit启动后的结果如下 这种方式下&#xff0c;会马上启动一个命…

vue3 实现文本内容超过N行折叠并显示“...展开”组件

1. 实现效果 组件内文字样式取决与外侧定义 组件大小发生变化时,文本仍可以省略到指定行数 文本不超过时, 无展开,收起按钮 传入文本发生改变后, 组件展示新的文本 2. 代码 文件名TextEllipsis.vue <template><div ref"compRef" class"wq-text-ellip…

Android性能优化相关的10个经典面试题

本文首发于公众号“AntDream”&#xff0c;欢迎微信搜索“AntDream”或扫描文章底部二维码关注&#xff0c;和我一起每天进步一点点 以下是一些Android性能优化面试问题&#xff0c;包括问题和参考解答&#xff1a; 1. 如何优化Android应用的启动速度&#xff1f; 答案&#…

腾讯云SDK价格总览

计费组成 SDK 授权费用&#xff1a;购买 License 或者套餐包&#xff0c;以获得音视频终端 SDK 的使用授权所需费用。 其他相关云服务费用&#xff1a;配合腾讯云其他云产品使用 SDK 时&#xff0c;产生的对应云服务费用&#xff0c;未使用相关服务不会产生费用。 计费概述 计…

鸿蒙媒体开发系列12——音频输入设备管理(AudioRoutingManager)

如果你也对鸿蒙开发感兴趣&#xff0c;加入“Harmony自习室”吧&#xff01;扫描下方名片&#xff0c;关注公众号&#xff0c;公众号更新更快&#xff0c;同时也有更多学习资料和技术讨论群。 有时设备同时连接多个音频输入设备&#xff0c;需要指定音频输入设备进行音频录制&a…

LaTex符号不好记忆?

总结在Matlab中常用的LaTeX符号如下&#xff1a; 1. **希腊字母**&#xff1a; - \alpha 表示 α - \beta 表示 β - \gamma 表示 γ - \delta 表示 δ - \epsilon 表示 ε - \zeta 表示 ζ - \eta 表示 η - \theta 表示 θ - \iota 表示 ι -…

【代码】Zotero|用文章标题更新 Zotero 的参考文献引用条目信息的 Quicker 动作

如题。 目前只支持期刊和会议文章&#xff0c;并且只支持谷歌学术或 DBLP 能搜到的文章&#xff0c;知网的不支持&#xff0c;如果有人有需要我可以去试着写&#xff0c;但我很懒我看大家也没这个需求。 很早就写完了&#xff0c;一直忘记推了。 刚写完的时候心情是很激动的&a…

Python 学习笔记1 - 认识Python

一、什么是Python 1989 年圣诞节期间&#xff0c;荷兰数学和计算机科学研究学会的Guido van Rossum&#xff08;吉多.范罗苏姆&#xff09;决心开发一个新的解释程序&#xff0c;作为 ABC 语言的替代品。这门ABC语言的替代语言被取名为Python,命名来自Guido爱看的的电视剧Mont…

Secret Configmap

应用启动过程中可能需要一些敏感信息&#xff0c;比如访问数据库的用户名&#xff0c;密码或者秘钥&#xff0c;讲这些信息直接保存在容器镜像中显然不合适&#xff0c;kubernetes提供的解决方案就是Secret Secret会以密文的方式存储数据&#xff0c;避免了直接在配置文件中保…

图说数集相等定义表明“R各元x的对应x+0.0001的全体=R“是几百年重大错误

黄小宁 设集A&#xff5b;x&#xff5d;表A各元均由x代表&#xff0c;&#xff5b;x&#xff5d;中变量x的变域是A。其余类推。因各数x可是数轴上点的坐标故x∈R变为实数yx1的几何意义可是&#xff1a;一维空间“管道”g内R轴上的质点x∈R(x是点的坐标)沿“管道”g平移变为点y…