Java NIO实现高性能HTTP代理

NIO采用多路复用IO模型,相比传统BIO(阻塞IO),通过轮询机制检测注册的Channel是否有事件发生,可以实现一个线程处理客户端的多个连接,极大提升了并发性能。
在5年前,本人出于对HTTP正向代理的好奇新,那时候也在学习JAVA,了解到了NIO,就想用NIO写一个正向代理软件,当时虽然实现了正向代理,但是代码逻辑及其混乱,而且没有经过测试也许有不少的bug
近期因为找工作,又复习起了以往的一些JAVA知识,包括JVM内存模型、GC垃圾回收机制等等,其中也包括NIO。现在回头再看NIO,理解也更深刻了一点。
在多路复用IO模型中,会有一个线程不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有socket 读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。在Java NIO中,是通过selector.select()去查询每个通道是否有到达事件,如果没有事件,则一直阻塞在那里,因此这种方式会导致用户线程的阻塞。多路复用IO模式,通过一个线程就可以管理多个socket,只有当socket 真正有读写事件发生才会占用资源来进行实际的读写操作。因此,多路复用IO比较适合连接数比较多的情况。
本HTTP代理软件只能代理HTTP和HTTPS协议,分享出来共广大网友参考和学习
1.Bootstrap类
此类用于创建和启动一个HTTP代理服务
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;public class Bootstrap {private final Logger logger = LogManager.getLogger(Bootstrap.class);private AbstractEventLoop serverEventLoop;private int port;public Bootstrap() {port = 8888;serverEventLoop = new ServerEventLoop(this);}public Bootstrap bindPort(int port) {try {this.port = port;this.serverEventLoop.bind(port);} catch (Exception e) {logger.error("open server socket channel error.", e);}return this;}public void start() {serverEventLoop.getSelector().wakeup();logger.info("Proxy server started at port {}.", port);}public AbstractEventLoop getServerEventLoop() {return serverEventLoop;}
}
2.ServerEventLoop
事件循环,单线程处理事件循环。包括客户端的连接和读写请求,目标服务器的连接和读写事件,在同一个事件循环中处理。
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.example.common.HttpRequestParser;import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;public class ServerEventLoop extends AbstractEventLoop {private final Logger logger = LogManager.getLogger(ServerEventLoop.class);public ServerEventLoop(Bootstrap bootstrap) {super(bootstrap);}@Overrideprotected void processSelectedKey(SelectionKey key) {if (key.isValid() && key.isAcceptable()) {if (key.attachment() instanceof Acceptor acceptor) {acceptor.accept();}}if (key.isValid() && key.isReadable()) {if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleRead();}}if (key.isValid() && key.isConnectable()) {key.interestOpsAnd(~SelectionKey.OP_CONNECT);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleConnect();}}if (key.isValid() && key.isWritable()) {key.interestOpsAnd(~SelectionKey.OP_WRITE);if (key.attachment() instanceof ChannelHandler channelHandler) {channelHandler.handleWrite();}}}@Overridepublic void bind(int port) throws Exception {ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);SelectionKey key = serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);key.attach(new Acceptor(serverSocketChannel));serverSocketChannel.bind(new InetSocketAddress(port));}class Acceptor {ServerSocketChannel ssc;public Acceptor(ServerSocketChannel ssc) {this.ssc = ssc;}public void accept() {try {SocketChannel socketChannel = ssc.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ, new ClientChannelHandler(socketChannel));logger.info("accept client connection");} catch (IOException e) {logger.error("accept error");}}}abstract class ChannelHandler {Logger logger;SocketChannel channel;ByteBuffer writeBuffer;public ChannelHandler(SocketChannel channel) {this.logger = LogManager.getLogger(this.getClass());this.channel = channel;this.writeBuffer = null;}abstract void handleRead();public void handleWrite() {doWrite();}public abstract void onChannelClose();public ByteBuffer doRead() {ByteBuffer buffer = ByteBuffer.allocate(4096);try {int len = channel.read(buffer);if (len == -1) {logger.info("read end-of-stream, close channel {}", channel);channel.close();onChannelClose();}if (len > 0) {buffer.flip();}} catch (IOException e) {logger.error("read channel error");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}}return buffer;}public void doWrite() {if (writeBuffer != null) {try {while (writeBuffer.hasRemaining()) {channel.write(writeBuffer);}} catch (IOException e) {logger.error("write channel error.");try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error");}}writeBuffer = null;}}public void handleConnect() {}}class ClientChannelHandler extends ChannelHandler {HttpRequestParser requestParser;private SelectableChannel proxyChannel;public ClientChannelHandler(SocketChannel sc) {super(sc);this.channel = sc;this.requestParser = new HttpRequestParser();this.proxyChannel = null;}@Overridepublic void handleRead() {if (requestParser.isParsed()) {if (proxyChannel != null) {SelectionKey proxyKey = proxyChannel.keyFor(selector);if (proxyKey != null && proxyKey.isValid() && proxyKey.attachment() instanceof ProxyChannelHandler proxyHandler) {//需要等待ProxyHandler的写入缓存为空后才可读取客户端的数据if (proxyHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && proxyKey.isValid()) {proxyHandler.writeBuffer = buffer;proxyKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}} else {ByteBuffer buffer = doRead();requestParser.putFromByteBuffer(buffer);if (requestParser.isParsed()) {//连接到目标服务器ByteBuffer buf = null;if (requestParser.getMethod().equals(HttpRequestParser.HTTP_METHOD_CONNECT)) {//回写客户端连接成功SelectionKey clientKey = channel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {clientHandler.writeBuffer = ByteBuffer.wrap((requestParser.getProtocol() + " 200 Connection Established\r\n\r\n").getBytes());clientKey.interestOpsOr(SelectionKey.OP_WRITE);}} else {//将缓存的客户端的数据通过代理转发byte[] allBytes = requestParser.getAllBytes();buf = ByteBuffer.wrap(allBytes);}this.proxyChannel = connect(requestParser.getAddress(), buf);}}}@Overridepublic void onChannelClose() {try {if (proxyChannel != null) {proxyChannel.close();}} catch (IOException e) {logger.error("close channel error");}}private SocketChannel connect(String address, ByteBuffer buffer) {String host = address;int port = 80;if (address.contains(":")) {host = address.split(":")[0].trim();port = Integer.parseInt(address.split(":")[1].trim());}SocketAddress target = new InetSocketAddress(host, port);SocketChannel socketChannel = null;SelectionKey proxyKey = null;int step = 0;try {socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);step = 1;ProxyChannelHandler proxyHandler = new ProxyChannelHandler(socketChannel);proxyHandler.setClientChannel(channel);proxyHandler.writeBuffer = buffer;proxyKey = socketChannel.register(selector, SelectionKey.OP_CONNECT, proxyHandler);proxyKey.interestOpsOr(SelectionKey.OP_WRITE);step = 2;socketChannel.connect(target);} catch (IOException e) {logger.error("connect error.");switch (step) {case 2:proxyKey.cancel();case 1:try {socketChannel.close();} catch (IOException ex) {logger.error("close channel error.");}socketChannel = null;break;}}return socketChannel;}}class ProxyChannelHandler extends ChannelHandler {private SelectableChannel clientChannel;public ProxyChannelHandler(SocketChannel sc) {super(sc);clientChannel = null;}@Overridepublic void handleConnect() {try {if (channel.isConnectionPending() && channel.finishConnect()) {SelectionKey proxyKey = channel.keyFor(selector);proxyKey.interestOpsOr(SelectionKey.OP_READ);}} catch (IOException e) {try {channel.close();onChannelClose();} catch (IOException ex) {logger.error("close channel error.");}logger.error("finish connection error.");}}@Overridepublic void handleRead() {if (clientChannel != null) {SelectionKey clientKey = clientChannel.keyFor(selector);if (clientKey != null && clientKey.isValid() && clientKey.attachment() instanceof ClientChannelHandler clientHandler) {if (clientHandler.writeBuffer == null) {ByteBuffer buffer = doRead();if (buffer.hasRemaining() && clientKey.isValid()) {clientHandler.writeBuffer = buffer;clientKey.interestOpsOr(SelectionKey.OP_WRITE);}}}}}@Overridepublic void onChannelClose() {try {if (clientChannel != null) {clientChannel.close();}} catch (IOException e) {logger.error("close channel error");}}public void setClientChannel(SocketChannel client) {this.clientChannel = client;}}
}
3.AbstractEventLoop
事件循环的抽象类
package org.example;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executors;public abstract class AbstractEventLoop implements Runnable {private final Logger logger = LogManager.getLogger(AbstractEventLoop.class);protected Selector selector;protected Bootstrap bootstrap;public AbstractEventLoop(Bootstrap bootstrap) {this.bootstrap = bootstrap;openSelector();Executors.newSingleThreadExecutor().submit(this);}public void bind(int port) throws Exception {throw new Exception("not support");}@Overridepublic void run() {while (true) {try {if (selector.select() > 0) {processSelectedKeys();}} catch (Exception e) {logger.error("select error.", e);}}}private void processSelectedKeys() {Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();processSelectedKey(key);}}protected abstract void processSelectedKey(SelectionKey key);public Selector openSelector() {try {this.selector = Selector.open();return this.selector;} catch (IOException e) {logger.error("open selector error.", e);}return null;}public Selector getSelector() {return selector;}
}
4.HttpRequestParser
用于解析HTTP请求报文中的请求头,可以获取主机和端口号
package org.example.common;import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;public class HttpRequestParser {private final Logger logger = LogManager.getLogger(HttpRequestParser.class);public static final String COLON = ":";public static final String REQUEST_HEADER_HOST_PREFIX = "host:";private UnboundedByteBuffer requestBytes = new UnboundedByteBuffer();private List<String> headers = new ArrayList<>();public static final String HTTP_METHOD_GET = "GET";public static final String HTTP_METHOD_POST = "POST";public static final String HTTP_METHOD_PUT = "PUT";public static final String HTTP_METHOD_DELETE = "DELETE";public static final String HTTP_METHOD_TRACE = "TRACE";public static final String HTTP_METHOD_OPTIONS = "OPTIONS";public static final String HTTP_METHOD_HEAD = "HEAD";public static final String HTTP_METHOD_CONNECT = "CONNECT";private String address;private String protocol;private String method;private boolean parsed = false;private StringBuffer reqHeaderBuffer = new StringBuffer();public void putFromByteBuffer(ByteBuffer buffer) {for (; buffer.hasRemaining(); ) {byte b = buffer.get();requestBytes.addByte(b);reqHeaderBuffer.append((char) b);if (b == '\n' && reqHeaderBuffer.charAt(reqHeaderBuffer.length() - 2) == '\r') {if (reqHeaderBuffer.length() == 2) {parsed = true;logger.debug("Request header line end.");break;}String headerLine = reqHeaderBuffer.substring(0, reqHeaderBuffer.length() - 2);logger.debug("Request header line parsed {}", headerLine);headers.add(headerLine);if (headerLine.startsWith(HTTP_METHOD_GET)|| headerLine.startsWith(HTTP_METHOD_POST)|| headerLine.startsWith(HTTP_METHOD_PUT)|| headerLine.startsWith(HTTP_METHOD_DELETE)|| headerLine.startsWith(HTTP_METHOD_TRACE)|| headerLine.startsWith(HTTP_METHOD_OPTIONS)|| headerLine.startsWith(HTTP_METHOD_HEAD)|| headerLine.startsWith(HTTP_METHOD_CONNECT)) {this.protocol = headerLine.split(" ")[2].trim();this.method = headerLine.split(" ")[0].trim();} else if (headerLine.toLowerCase().startsWith(REQUEST_HEADER_HOST_PREFIX)) {this.address = headerLine.toLowerCase().replace(REQUEST_HEADER_HOST_PREFIX, "").trim();}reqHeaderBuffer.delete(0, reqHeaderBuffer.length());}}}public boolean isParsed() {return parsed;}public String getAddress() {return address;}public String getProtocol() {return protocol;}public String getMethod() {return method;}public byte[] getAllBytes() {return requestBytes.toByteArray();}
}
5.UnboundedByteBuffer
无界的字节缓冲区,每次会以两倍的容量扩容,可以用于追加存入客户端的请求数据,实现粘包
package org.example.common;public class UnboundedByteBuffer {private byte[] bytes;private int size;private int cap;private final int DEFAULT_CAP = 4096;private final int MAX_CAP = 1 << 30;public UnboundedByteBuffer() {this.cap = DEFAULT_CAP;this.bytes = new byte[this.cap];this.size = 0;}public void addBytes(byte[] data) {ensureCapacity(data.length);System.arraycopy(data, 0, bytes, size, data.length);this.size += data.length;}private void ensureCapacity(int scale) {if (scale + this.size > this.cap) {int tmpCap = this.cap;while (scale + this.size > tmpCap) {tmpCap = tmpCap << 1;}if (tmpCap > MAX_CAP) {return;}byte[] newBytes = new byte[tmpCap];System.arraycopy(this.bytes, 0, newBytes, 0, this.size);this.bytes = newBytes;}}public byte[] toByteArray() {byte[] ret = new byte[this.size];System.arraycopy(this.bytes, 0, ret, 0, this.size);return ret;}public void addByte(byte b) {ensureCapacity(1);this.bytes[this.size++] = b;}
}
以上实现是在单个事件循环线程中处理所有事件,一个更好的方案是将客户端的Channel和代理服务器与目标服务器的Channel区分开,分别在两个事件循环中处理。基本实现也和本文中的代码大体一致,两者在理论上应该存在性能差距,实际经过本人测试可以每秒处理客户端的上千个连接。代码传送门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/8795.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

知识课堂之域名系统中实现动态代理

怎么在域名系统中解析动态ip&#xff0c;这一直是一个需要解决的问题&#xff0c;人们对与网络的稳定连接与灵活运用已经成为生活和工作中不可或缺的一部分&#xff0c;因此这样的问题的解决迫在眉睫。 大家对于动态ip是什么&#xff0c;应该都有所了解了&#xff0c;所谓的动…

【Go语言】| 第1课:Golang安装+环境配置+Goland下载

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主。 &#x1f913; 同时欢迎大家关注其他专栏&#xff0c;我将分享Web前后端开发、人工智能、机器学习、深…

程序猿要失业了,一行代码没写,1小时嘴搓了一个图片分割插件(好看又好用)

如题&#xff0c;一行代码没写&#xff0c;使用 AI 编程工具实现了一个浏览器图片分割插件的开发&#xff0c;先看效果吧&#xff08; Chrome商店上架审核中~ &#xff09; 支持点击&#xff0c;拖拽&#xff0c;直接粘贴&#xff0c;还支持预览&#xff0c;次数统计&#xff0…

基于SpringBoot+Vue实现新零售商城系统

作者主页&#xff1a;编程千纸鹤 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、多年校企合作经验&#xff0c;被多个学校常年聘为校外企业导师&#xff0c;指导学生毕业设计并参…

【湖南】《湖南省省直单位政府投资信息化项目预算编制与财政评审工作指南(试行)》湘财办〔2024〕10号-省市费用标准解读系列06

2024年4月12日&#xff0c;湖南省财政厅发布实施《湖南省省直单位政府投资信息化项目预算编制与财政评审工作指南&#xff08;试行&#xff09;》湘财办〔2024〕10号&#xff08;以下简称“10号文”&#xff09;&#xff0c;该文件旨在指导提高湖南省直单位政府投资信息化项目预…

攻防靶场(28):通过SNMP进行信息收集 JOY

目录 1.侦查 1.1 获取目标网络信息&#xff1a;IP地址 1.2 主动扫描&#xff1a;扫描IP地址块 1.3 收集受害者主机信息&#xff1a;软件 2. 数据窃取 2.1 通过备用协议窃取&#xff1a;通过未加密的非C2协议窃取 2.2 通过备用协议窃取&#xff1a;通过未加密的非C2协议窃取 3. …

DCDC-LLC谐振电路Q值与系统增益变化相反的原因

1.谐振电路的Q值定义 LLC电路的Q值定义: 它表述的是整个电路的能量存储与耗散的关系。损耗越小&#xff0c;Q值越大&#xff0c;损耗越大&#xff0c;Q值越小。 Q的另一种写法是&#xff1a; 这个公式来由&#xff0c;因为谐振频率&#xff1a; 所以&#xff1a; 所以&#…

【JAVA毕业设计】基于Vue和SpringBoot的图书馆管理系统

本文项目编号 T 044 &#xff0c;文末自助获取源码 \color{red}{T044&#xff0c;文末自助获取源码} T044&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析5.4 用例设计 六、核…

VirtualBox7.0的“网络地址转换(NAT)“ 笔记2400727

VirtualBox7.0的"网络地址转换(NAT)" 每台虚拟机的默认ip都是 10.0.2.15 虚拟机之间不能互通虚拟机能通过主机各ip访问主机(延时高,不稳定), 主机不能直接访问虚拟机, 可以通过端口转发(端口映射)虚拟机能访问宿主机的局域网的其它机器,虚拟机能访问网络, 但不能反过…

PyQt5 详细安装与配置教程及使用

文章目录 Part1&#xff1a;安装 PyQt5Part2&#xff1a;配置 PyQt5 的依赖工具 QtDesigner 和 PyUICPart3&#xff1a;使用QtDesigner设计界面Part4&#xff1a;使用PyUIC将设计好的界面转换为.py文件Part5&#xff1a;通过代码显示ui界面 Part1&#xff1a;安装 PyQt5 需要安…

10.31.2024刷华为OD C题型

文章目录 HJ26HJ27语法知识记录 10.24.2024刷华为OD C题型&#xff08;四) - HJ26 HJ27 def get_dict(str1: str):dic_0 {}for ch in str1:if ch not in dic_0:dic_0[ch] 1else:dic_0[ch] 1return dic_0temp input().split() n int(temp[0]) list [] for i in range(n):l…

基于springboot+mybatis美术馆预约管理系统设计和实现以及文档报告

基于springbootmybatis美术馆预约管理系统设计和实现以及文档报告 &#x1f345; 作者主页 网顺技术团队 &#x1f345; 欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; &#x1f345; 文末获取源码联系方式 &#x1f4dd; &#x1f345; 查看下方微信号获取联系方式 承接各…

气象监测软件的程序设计

老师留了个作业&#xff0c;感觉挺有意思&#xff0c;记录一下 文章目录 气象监测软件的程序设计项目指导书&#xff08;一&#xff09;基本信息&#xff08;二&#xff09;项目目标&#xff08;三&#xff09;任务描述&#xff08;四&#xff09;指导内容任务 1&#xff1a;根…

漫谈MCU优化:从硬件设计优化到可靠性挑战

1.关于MCU 微控制器&#xff08;Microcontroller Unit, MCU&#xff09;&#xff0c;是以微处理器为基础&#xff0c;加上存储器以及计数器、I2C、UART等外设模块与接口电路整合的单芯片微型计算机。 ▲MCU实物图 MCU拥有性能好、可编程、灵活度高、功耗低等优点&#xff0c;…

“动态波动”,背离信号与主力动向的完美结合 源码

使用技巧 精准捕捉买卖信号 这款公式它不仅能够精准捕捉买入信号&#xff0c;还能敏锐识别卖出时机。尤其在市场波动不定的震荡行情中&#xff0c;动态波动展现出了其非凡的价值。 多维信号分析 如图所示&#xff0c;动态波动公式全面覆盖了“买”、“卖”、“洗盘”、“阻…

网站架构知识之密钥认证(day020)

1.密钥认证 两个节点&#xff0c;通过密钥形式进行访问&#xff0c;不要输入密码&#xff0c;单向。 应用场景&#xff1a;部分服务使用前要求我们做密钥认证。 1.使用rsa算法创建公钥私钥 ssh-keygen -t rsa /root/.ssh/id_rsa&#xff0c; 私钥地址 /root/.ssh/…

基于Python的智能旅游推荐系统设计与实现

一、摘要 本毕业设计的内容是设计并且实现一个基于Python技术的智能旅游推荐系统。它是在Windows下&#xff0c;以MYSQL为数据库开发平台&#xff0c;使用Python技术进行设计。智能旅游推荐系统的功能已基本实现&#xff0c;主要实现首页&#xff0c;个人中心&#xff0c;用户…

深入计算机语言之C++:模板初阶

&#x1f511;&#x1f511;博客主页&#xff1a;阿客不是客 &#x1f353;&#x1f353;系列专栏&#xff1a;从C语言到C语言的渐深学习 欢迎来到泊舟小课堂 &#x1f618;博客制作不易欢迎各位&#x1f44d;点赞⭐收藏➕关注 一、泛型编程 1.1 引入 C 语言中实现两数交换&a…

O-RAN前传Spilt Option 7-2x

Spilt Option 7-2x 下行比特处理上行比特处理相关文章&#xff1a; Open Fronthaul wrt ORAN 联盟被称为下层拆分(LLS)&#xff0c;其目标是提高电信市场的灵活性和竞争力。下层拆分是指无线电单元(RU) 和分布式单元(DU) 之间的拆分。 O-RAN前传接口可以在 eCPRI 上传输。eCPR…