Golang优雅关闭gRPC实践

本文主要讨论了在 Go 语言中实现gRPC服务优雅关闭的技术和方法,从而确保所有连接都得到正确处理,防止数据丢失或损坏。原文: Go Concurrency — Graceful Shutdown

alt
问题

我在上次做技术支持的时候,遇到了一个有趣的错误。我们的服务在 Kubernetes 上运行,有一个容器在重启时不断出现以下错误信息--"Error bind: address already in use"。对于大多数程序员来说,这是一个非常熟悉的错误信息,表明一个进程正试图绑定到另一个进程正在使用的端口上。

背景

我的团队维护一个 Go 服务,启动时会在各自的 goroutine 中生成大量不同的 gRPC 服务。

Goroutine - Go 运行时管理的轻量级线程,运行时只需要几 KB 内存,是 Go 并发性的基础。

以下是我们服务架构的简化版本,以及以前启动和停止服务器时所执行的任务。

package main

type GrpcServerInterface interface{
  Run(stopChan chan <-struct{})
}

type Server struct {
  ServerA GrpcServerIface
  ServerB GrpcServerIface
}

func NewServer() *Server {
  return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(stopChan <-chan struct{}){
  go ServerA.Run(stopChan)
  go ServerB.Run(stopChan)
  <- stopChan
}

func main() {
  stopChan := make(chan struct{})
  server := NewServer()
  server.Start(stopChan)
 
  // Wait for program to terminate and then signal servers to stop
  ch := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  <-ch
  close(stopChan)
}
package internal

type ServerA struct {
  stopChan <-chan struct{}
}

// Start runs each of the grpc servers
func (s *ServerA) Run(stopChan <-chan struct{}){
  grpcServer := grpc.NewServer()
  
  var listener net.Listener
  ln, err := net.Listen("tcp"":8080")
  if err != nil {
   // handle error
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     return 
   }
  }
  
  <- stopChan
  grpcServer.Stop() // Gracefully terminate connections and close listener
}

我首先想到这可能是 Docker 或 Kubernetes 运行时的某种偶发性错误。这个错误让我觉得很奇怪,原因如下:1.)查看代码,我们似乎确实在主程序退出时关闭了所有监听,端口怎么可能在重启时仍在使用?2.)错误信息持续出现了几个小时,以至于需要人工干预。我原以为在最坏情况下,操作系统会在尝试重启容器之前为我们清理资源。或许是清理速度不够快?

团队成员建议我们再深入调查一下。

解决方案

经过仔细研究,发现我们的代码实际上存在一些问题...

通道(Channel)与上下文(Context)

通道用于在程序之间发送信号,通常以一对一的方式使用,当一个值被发送到某个通道时,只能从该通道读取一次。在我们的代码中,使用的是一对多模式。我们将在 main 中创建的通道传递给多个不同的 goroutine,每个 goroutine 都在等待 main 关闭通道,以便知道何时运行清理函数。

从 Go 1.7 开始,上下文被认为是向多个 goroutine 广播信号的标准方式。虽然这可能不是我们遇到问题的根本原因(我们是在等待通道关闭,而不是试图让每个 goroutine 从通道中读取相同的值),但考虑到这是最佳实践,还是希望采用这种模式。

以下是从通道切换到上下文后更新的代码。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context){
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp"":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server"
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  // create new context from parent context
  s.serverCtx, stopServer := context.WithCancel(ctx) 
  go ServerA.Run(s.serverCtx)
  go ServerB.Run(s.serverCtx)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}
基于等待组(WaitGroup)的优雅停机

虽然我们通过取消主上下文向 goroutine 发出了退出信号,但并没有等待它们完成工作。当主程序收到退出信号时,即使我们发送了取消信号,也不能保证它会等待生成的 goroutine 完成工作。因此我们必须明确等待每个 goroutine 完成工作,以避免任何泄漏,为此我们使用了 WaitGroup。

WaitGroup 是一种计数器,用于阻止函数(或者说是 goroutine)的执行,直到其内部计数器变为 0。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1// Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
  
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp"":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server"
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
  fmt.Println("ServerA has stopped")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  go ServerA.Run(s.serverCtx, &s.wg)
  go ServerB.Run(s.serverCtx, &s.wg)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}
基于通道的启动信号

在测试过程中,又发现了一个隐藏错误。我们未能在接受流量之前等待所有服务端启动,而这在测试中造成了一些误报,即流量被发送到服务端,但没有实际工作。为了向主服务发送所有附属服务都已准备就绪的信号,我们使用了通道。

package internal

type ServerA struct {
  startChan  
}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1// Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
   
  go func(){
    grpcServer := grpc.NewServer()
    
    var listener net.Listener
    ln, err := net.Listen("tcp"":8080")
    if err != nil {
     log.Fatal("ServerA - Failed to create listener")
    }
    
    for {
     err := grpcServer.Serve(listener)
     if err != nil {
       log.Fatal("ServerA - Failed to start server"
     }
    }
    close(s.startChan) // Signal that we are done starting server to exit function
    // Wait in the background for mina program to exit
    <- ctx.Done()
    // Clean up logic 
    grpcServer.Stop() // Gracefully terminate connections and close listener
    fmt.Println("ServerA has stopped")
  }()
  <- s.StartChan // Wait for signal before exiting function
  fmt.Println("ServerA has started")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
 startChan chan <-struct{}
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
    startChan: make(chan <-struct{}),
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  ServerA.Run(s.serverCtx, &s.wg)
  ServerB.Run(s.serverCtx, &s.wg)
  close(s.startChan)
  <- s.startChan // wait for each server to Start before returning
  fmt.Println("Main Server has started")
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}
结论

不瞒你说,刚开始学习 Go 时,并发会让你头疼不已。调试这个问题让我有机会看到这些概念的实际用途,并强化了之前不确定的主题,建议你自己尝试简单的示例!


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。为了方便大家以后能第一时间看到文章,请朋友们关注公众号"DeepNoMind",并设个星标吧,如果能一键三连(转发、点赞、在看),则能给我带来更多的支持和动力,激励我持续写下去,和大家共同成长进步!

本文由 mdnice 多平台发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/1548554.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

webshell-HTTP常见特征

一、总体特点 二、蚁剑 数据中可以看到一些明文字符串函数&#xff0c;响应中可以看到响应的明文数据。 ant特征以及对数据base64可以解码 chr类别的会出现大量的chr编码 大量的百分号字符 三、哥斯拉 第一个请求包很大 响应为0 密钥被拆分到数据前后 响应包cookie带&#xf…

SUP-NeRF-ECCV2024: 单目3D对象重建的新突破

2024-09-25&#xff0c;由Bosch Research North America和Michigan State University联合发布的SUP-NeRF&#xff0c;是一个基于单目图像进行3D对象重建的新型方法。一个无缝集成姿态估计和物体重建的统一网格。 ECCV&#xff1a;欧洲计算机视觉会议的缩写&#xff0c;它是计算…

Rust 语言开发 ESP32C3 并在 Wokwi 电子模拟器上运行(esp-hal 非标准库、LCD1602、I2C)

文章目录 esp-rs 简介GithubRust 包仓库Wokwi 电子模拟器开发环境Rust 环境esp-rs 环境创建 ESP32C3 项目项目结构编译项目命令运行模拟器ESP32C3 烧录 esp-rs 简介 esp-rs 是一个专注于为 Espressif 系列芯片&#xff08;如 ESP32、ESP32-S2、ESP32-C3 等&#xff09;提供 Ru…

学日语必备神器!这4款翻译APP你用过吗?

小伙伴们&#xff0c;你们有没有在日常生活或工作中遇到过需要翻译日语的场景呢&#xff1f;无论是阅读日本原著、工作文档还是和日本小伙伴交流&#xff0c;一个好的翻译工具绝对能成为你的贴心小助手&#xff1b;今天&#xff0c;我就来跟大家分享几款我个人非常喜欢的日语翻…

2024年合肥市职业院校技能大赛(中职组)赛 网络安全 竞赛样题

2024年合肥市职业院校技能大赛(中职组)赛 网络安全 竞赛样题 (总分100分) 培训、环境、资料、考证 公众号&#xff1a;Geek极安云科 网络安全群&#xff1a;624032112 网络系统管理群&#xff1a;223627079 网络建设与运维群&#xff1a;870959784 极安云科专注于技能提升&am…

数据结构与算法——Java实现 19.队列

目录 一、概述 二、链表实现队列 接口定义 接口实现类 测试类 三、环形数组实现队列 优点 下标计算 判满和判空 判满 判空 辅助变量size判空和判满 方法1 接口定义 接口实现类 测试类 方式2 接口定义 接口实现类 测试类 方法3 接口定义 接口实现类 测试类 生活鲜少给人留下退…

Nginx配置及部署前端项目,安排!

哈喽小伙伴们大家好&#xff01;我是程序媛小李&#xff0c;一位正在往全栈方向发展的前端小姐姐&#xff0c;今天给大家分享一下在日常编码中我们是怎么使用nginx来部署前端项目的&#xff01; Nginx安装 浏览器输入nginx&#xff0c;来到官网 右边找到doewnload&#xff0c…

短剧向左,体育向右,快手前途未卜?

最近&#xff0c;辗转于多项业务的快手收到了来自于市场“寓褒于贬”的评价。 麦格理发表报告表示&#xff0c;短剧业务正成为快手近期新的增长动力&#xff0c;亦维持对快手的正面看法&#xff0c;给予“跑赢大市”评级&#xff0c;预期上市前投资者出售2%股份对基本面没有太…

掌握AI提示词的艺术:应用、防护与成为提示词专家的策略

掌握好提示词的编写&#xff0c;可以用来做的事情&#xff1a; 写简历、输出面试题、输出ppt、思维导图、提取摘要、翻译、总结会议纪要、总结审计报告、数据分析、写广告/营销/请假等跟文字相关的文案、爆款文章、小说、写周报/月报。 如何写提示词 4大原则 1、 指令要精简…

Python精选200Tips:176-180

针对图像的经典卷积网络结构进化史及可视化 P176--LeNet-5【1988】模型结构说明模型结构代码模型结构可视化 P177--AlexNet【2012】模型结构及创新性说明模型结构代码模型结构可视化 P178--VGGNet【2014】VGG19模型结构及创新性说明VGG19模型结构代码VGG19模型结构可视化 P179-…

广东自闭症寄宿学校:为大龄自闭症儿童提供全寄宿制教育

在广东这片温暖的土地上&#xff0c;有一类特殊的孩子&#xff0c;他们以自己独特的方式感知世界&#xff0c;却往往因为自闭症的障碍而在成长的道路上步履维艰。为了给予这些大龄自闭症儿童更加全面、专业的关怀与教育&#xff0c;广东地区涌现出了一批自闭症寄宿学校&#xf…

Java中的Junit、类加载时机与机制、反射、注解及枚举

目录 Java中的Junit、类加载时机与机制、反射、注解及枚举 Junit Junit介绍与使用 Junit注意事项 Junit其他注解 类加载时机与机制 类加载时机 类加载器介绍 获取类加载器对象 双亲委派机制和缓存机制 反射 获取类对象 获取类对象的构造方法 使用反射获取的构造方法创建对象 获…

从0-1搭建海外社媒矩阵,详细方案深度拆解

做买卖&#xff0c;好的产品质量固然是关键&#xff0c;但如何让更多的消费者知道&#xff1f;营销推广是必不可少的。在互联网时代&#xff0c;通过社交媒体推广已经成为跨境电商卖家常用的广告手段。 如何通过海外社交媒体矩阵扩大品牌影响力&#xff0c;实现营销目标&#…

【开源项目】数字孪生智慧停车场——开源工程及源码

飞渡科技数字孪生停车场管理平台&#xff0c;基于国产数字孪生3D渲染引擎&#xff0c;结合数字孪生、物联网IOT&#xff0c;以及车牌自动识别、视频停车诱导等技术&#xff0c;实现停车场的自动化、可视化和无人化值守管理。 以3D可视化技术为基础&#xff0c;通过三维场景完整…

240927-各种卷积最清晰易懂blender动画展示

240927-一些常用卷积清晰易懂的blender动画展示&#xff08;Conv、GConv、DWConv、1*1Conv、Shuffle&#xff09; 在几个月前&#xff0c;写过一篇关于卷积过程中输入图像维度变化的博客240627_关于CNN中图像维度变化问题_图像的尺寸为什么又四个维度-CSDN博客&#xff0c;但是…

猫鱼分干(模拟---拆分步骤)

算法分析&#xff1a; 注意&#xff1a;总是更新遍历方向上的元素&#xff08;eg. 左 i-1 和 i &#xff1a;更新i&#xff09;区分水平和分配量从左向右&#xff1a;只要右侧水平大于左侧&#xff0c;即右侧等于左侧值加一从右向左&#xff1a;若左侧水平大于右侧&#xf…

一次实践:给自己的手机摄像头进行相机标定

文章目录 1. 问题引入2. 准备工作2.1 标定场2.2 相机拍摄 3. 基本原理3.1 成像原理3.2 畸变校正 4. 标定解算4.1 代码实现4.2 详细解析4.2.1 解算实现4.2.2 提取点位 4.3 解算结果 5. 问题补充 1. 问题引入 不得不说&#xff0c;现在的计算机视觉技术已经发展到足够成熟的阶段…

c++day08

思维导图 栈 #include <iostream>using namespace std;template <typename T> class Stack { private:static const size_t MAX 100; // 定义固定容量T data[MAX]; // 存储栈元素的数组size_t len; // 当前栈的大小public:…

浅谈电气火灾监控系统在变电所的应用

摘要&#xff1a;阐述电气火灾监控系统在变电所的应用&#xff0c;电气火灾监控系统的管理措施&#xff0c;包括运行标准、运行模式、运行原则、警报阈值、监控显示。安科瑞叶西平1870*6160015 关键词:监控系统&#xff1b;警报阀值&#xff1b;运行模式&#xff1b;医院&…

高效免费!PDF秒变Word,在线免费转换工具推荐!!!

#创作灵感# 工作中&#xff0c;总是需要将pdf文件转换成word文件&#xff0c;便于后期编辑、处理、使用&#xff0c;但是又没有wps会员&#xff0c;虽然去淘宝买&#xff0c;一天也就8毛钱左右&#xff0c;但是转换文件的工作几乎每天都需要做&#xff0c;长此以往&#xff0c;…