在java中实现protobuf自定义协议
本文是描述protobuf在netty中的使用,所以不详细说明Netty怎么使用,适合有一定netty基础的人!
为什么要使用自定义协议?
💡 一句话总结:
因为 TCP 是流协议,消息是粘在一起的,Netty 只给你字节流,怎么拆、怎么识别消息,必须你自己定义规则(协议)!
📦 背景:TCP 是流,不是消息!
举个例子:
你发送三条消息:
消息1:hello
消息2:world
消息3:zhuzai
TCP 网络层可能这样组合给你:
收到了:hello wor
收到了:ldzhuzai
你根本不知道哪里是一条完整的消息!
✅ 所以需要“协议”来定义结构
我们需要像这样清清楚楚定义格式:
[魔数 magic] + [长度 length] + [内容 content]
比如:
magic: 5 bytes -> 用来识别这是不是我自己系统的协议(比如 "QJJIM")
length: 4 bytes -> 表示后面内容部分的长度
content: N bytes -> 实际消息(可以是 JSON、Protobuf、XML 等)
这样无论消息怎么被拆包粘包,我们都能自己拼回来!
🛠 自定义协议 = 自定义规则 + 编解码器
Netty 中我们要做的就是两件事:
事情 | 实现方式 |
---|---|
发送时,把对象编码成 ByteBuf(带上 magic、length) | MessageToMessageEncoder |
接收时,从 ByteBuf 中还原对象(识别 magic,读取 length,parse) | MessageToMessageDecoder 或 ByteToMessageDecoder |
🧠 为啥不用现成协议?比如 HTTP?
好问题:
- HTTP 是文本协议,适合浏览器通信,不适合二进制高频场景;
- WebSocket 适合浏览器长连接,Java 服务对接自定义客户端就不一定好用;
- **你的业务协议(如客服系统、游戏、IM)更灵活、效率更高,**需要自定义消息类型、字段;
- 你用了 Protobuf,必须自己决定如何传输 Protobuf 二进制数据。
🚀 小结:什么时候必须自定义协议?
情景 | 是否推荐自定义协议 |
---|---|
高性能、低延迟系统(如 IM、游戏、客服) | ✅ 强烈建议 |
使用 Protobuf / 自定义消息格式 | ✅ 必须自定义 |
用 HTTP 就能满足 | ❌ 不需要 |
内网服务调用(使用 Dubbo、gRPC) | ✅ 通常协议已封装好了 |
protocol使用
1. 安装 Protocol Buffers 编译器 (protoc)
Windows
- 前往 Protobuf GitHub Releases 下载预编译的 Windows 版本(如
protoc-xxx-win64.zip
)。 - 解压后,将
bin/protoc.exe
的路径添加到系统环境变量PATH
中。 - 打开命令提示符,输入
protoc --version
验证是否安装成功。
macOS
使用 Homebrew 安装:
brew install protobuf
验证:
protoc --version
Linux
下载预编译的二进制文件:
# 下载最新版本(替换版本号)
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v21.12/protoc-21.12-linux-x86_64.zip
unzip protoc-21.12-linux-x86_64.zip -d protoc
sudo mv protoc/bin/protoc /usr/local/bin/
sudo mv protoc/include/* /usr/local/include/
验证:
protoc --version
2. 使用 Maven 插件自动生成代码(推荐)
如果不想手动执行 protoc
,可以通过 Maven 插件自动生成代码。
在 pom.xml
中添加 Protobuf 插件
<build><plugins><plugin><groupId>org.xolstice.maven.plugins</groupId><artifactId>protobuf-maven-plugin</artifactId><version>0.6.1</version><configuration><protocArtifact>com.google.protobuf:protoc:3.21.12:exe:${os.detected.classifier}</protocArtifact><outputDirectory>${project.build.directory}/generated-sources/protobuf</outputDirectory></configuration><executions><execution><goals><goal>compile</goal><goal>test-compile</goal></goals></execution></executions></plugin></plugins>
</build>
重新构建项目
执行 Maven 命令生成代码:
mvn clean compile
#或者
mvn clean install -U
生成的 Java 类会出现在 target/generated-sources/protobuf
目录中。
3. 手动生成代码(如果必须用 protoc
)
如果已安装 protoc
,可以手动生成代码:
# 假设 .proto 文件在 src/main/proto 目录下
protoc --java_out=src/main/java src/main/proto/MyMessage.proto
安装时问题:
- 版本不匹配
Protobuf 的编译器版本(protoc
)必须与 Java 依赖版本兼容。 - 路径错误
.proto
文件需放在src/main/proto
目录(Maven 默认路径),或手动指定路径。 - 依赖冲突
如果使用 Netty,需确保其版本兼容 Protobuf(推荐使用最新稳定版)。
验证
确保生成的 Java 类(如 MyMessageProto.java
)出现在项目中,且无编译错误。
案例
1. 添加依赖
在 pom.xml
中添加 Protobuf 和 Netty 的依赖:
运行
<!-- Protobuf -->
<dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.21.12</version>
</dependency><!-- Netty -->
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.86.Final</version>
</dependency>
2.🧩 协议结构设计
字段 | 长度 | 说明 |
---|---|---|
Magic Number | 5 字节 | 固定字符串,如 ZhuZi ,用来快速识别 |
Version | 1 字节 | 协议版本 |
MsgType | 1 字节 | 消息类型枚举(如文本、心跳等) |
Length | 4 字节 | Protobuf 消息体长度 |
Payload | N 字节 | Protobuf 序列化后的内容 |
3. 定义 Protobuf 消息
创建 .proto
文件定义协议格式(如 MyMessage.proto
):
syntax = "proto3";option java_package = "com.qjj.protocol";
option java_outer_classname = "MyMessageProto";message Frame {int32 version = 1;string msgId = 2;MsgType type = 3;bytes payload = 4;enum MsgType {TEXT = 0;HEARTBEAT = 1;ACK = 2;COMMAND = 3;}
}message TextMessage {string from = 1;string to = 2;string content = 3;
}
生成后用 MyMessageProto.Frame
和 TextMessage
这两个类。
4. 编码器(Encoder)
public class ChatMessageEncoder extends MessageToByteEncoder<MyMessageProto.Frame> {@Overrideprotected void encode(ChannelHandlerContext ctx, MyMessageProto.Frame msg, ByteBuf out) throws Exception {// Magicout.writeBytes(new byte[]{'Z', 'h', 'u', 'Z', 'i'});// Versionout.writeByte(msg.getVersion());// MsgTypeout.writeByte(msg.getType().getNumber());// Payloadbyte[] payload = msg.toByteArray();out.writeInt(payload.length); // Lengthout.writeBytes(payload); // Body}
}
5. 解码器(Decoder)
public class ChatMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {// 数据长度不足if (in.readableBytes() < 11) return;in.markReaderIndex();byte[] magic = new byte[5];in.readBytes(magic);if (!(magic[0] == 'Z' && magic[1] == 'h' && magic[2] == 'u' && magic[3] == 'Z' && magic[4] == 'i')) {throw new CorruptedFrameException("Invalid magic number");}byte version = in.readByte();byte msgType = in.readByte();int length = in.readInt();if (in.readableBytes() < length) {in.resetReaderIndex();return;}byte[] payload = new byte[length];in.readBytes(payload);MyMessageProto.Frame frame = MyMessageProto.Frame.parseFrom(payload);out.add(frame);}
}
6. Channel Pipeline 配置示例
pipeline.addLast(new ChatMessageDecoder()); // ByteBuf -> Frame
pipeline.addLast(new ChatMessageEncoder()); // Frame -> ByteBuf
pipeline.addLast(new BusinessHandler()); // 业务逻辑处理
7. 业务处理器(BusinessHandler)
public class BusinessHandler extends SimpleChannelInboundHandler<MyMessageProto.Frame> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessageProto.Frame msg) throws Exception {switch (msg.getType()) {case TEXT:MyMessageProto.TextMessage text = MyMessageProto.TextMessage.parseFrom(msg.getPayload());System.out.println("接收到文本消息:" + text.getContent());break;case HEARTBEAT:System.out.println("心跳包");break;case COMMAND:// 处理命令break;}}
}
8.处理 TCP 粘包/拆包
在 Netty 的 Pipeline 中添加粘包处理(如 LengthFieldBasedFrameDecoder
):
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;public class ServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline()// 处理粘包:最大帧长、长度字段偏移、长度字段长度.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4)).addLast(new LengthFieldPrepender(4))// 自定义编解码器.addLast(new MyMessageCodec())// 业务处理器.addLast(new MyMessageHandler());}
}
ProtobufVarint32FrameDecoder和LengthFieldBasedFrameDecoder的作用
ProtobufVarint32FrameDecoder
是 Netty 内置的一个 消息分帧处理器,它的作用是解决 Protobuf 编码时的粘包 / 拆包问题。
为什么需要它?
在 Netty 中进行基于 TCP 的通信时,TCP 是流式协议,没有消息边界的概念:
- 如果你一次发送了两个 Protobuf 消息,可能会合并成一个包(粘包)
- 或者一个消息被拆成两个半包(拆包)
为了防止这种情况,Netty 提供了 ProtobufVarint32FrameDecoder
,它可以通过 Protobuf 的“变长整型 varint32”格式头,正确地识别每一条完整消息的边界。
📦 它的作用:
在你使用 MyMessageProto
(比如 .proto
生成的类)时,Protobuf 会在每条消息的开头自动加上一个“长度字段”,这个字段是以 Varint32 格式存储的消息长度。
ProtobufVarint32FrameDecoder
会读取这个长度字段,然后正确提取出完整的 Frame。
✅ 使用示例(标准 Protobuf 流式通信):
pipeline.addLast(new ProtobufVarint32FrameDecoder()); // 解决半包问题
pipeline.addLast(new ProtobufDecoder(MyMessageProto.Frame.getDefaultInstance())); // 解码
发送端配套使用:
pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); // 添加长度前缀
pipeline.addLast(new ProtobufEncoder()); // 编码
而我们现在使用是自定义的协议,此时就不适合用
ProtobufVarint32FrameDecoder
,而应该使用你自定义的LengthFieldBasedFrameDecoder
或手写一个MyFrameDecoder
来根据你的魔数 + length 解码。