【p2p、分布式,区块链笔记 Torrent】bittorrent-protocol对等连接管理和bittorrent-protocol扩展实现

bittorrent-protocol简介

  • bittorrent-protocol 是 BitTorrent 协议的一个底层实现库。

  • 此库主要用于管理对等连接和消息传递(握手、数据块请求、数据传输、完成通知)。它可以用于构建 BitTorrent 客户端、种子服务器,或支持 BitTorrent 协议的其他应用。

  • 官方提供的https://github.com/webtorrent/bittorrent-protocol简单示例如下:

import Protocol from 'bittorrent-protocol'  // 导入 bittorrent-protocol 库
import net from 'net'  // 导入 Node.js 的 net 模块,用于创建 TCP 服务器// 创建一个 TCP 服务器,监听连接
net.createServer(socket => {const wire = new Protocol()  // 创建一个新的 Protocol 实例,用于处理 BitTorrent 协议// 将 socket 和协议 wire 连接起来socket.pipe(wire).pipe(socket)// 将 socket 的数据流通过 wire 进行双向传输// 监听握手事件wire.on('handshake', (infoHash, peerId) => { // 接收到握手信息,infoHash 和 peerId 是十六进制字符串// 向对等方发送我们自己的握手信息wire.handshake('my info hash (hex)', 'my peer id (hex)')})// 监听 unchoke 事件,表示对等方不再阻塞我们。注: bittorrent-protocol可管理连接的状态信息,包括双方的 choke/uninterested 状态、数据块请求状态等。wire.on('unchoke', () => {// 输出对等方的 choke 状态console.log('对等方不再阻塞我们: ' + wire.peerChoking)})
}).listen(6881)  // 服务器监听 6881 端口

bittorrent-protocol的扩展机制

  • bittorrent-protocol还允许开发者通过扩展来实现复杂功能,比如元数据交换或自定义消息格式( ut_metadata扩展实现直接与其他对等节点交换种子文件的元数据,而无需传统的 .torrent 文件。)。

  • 当前版本的bittorrent-protocol为一个1300行的js文件。在 Node.js 中安装时,Node提供了 @types 类型定义文件夹(此文件方便了在TypeScript 项目中使用 JavaScript 库),其中包括bittorrent-protocol中所用到的类型的 TypeScript 方式定义文件bittorrent-protocol/index.d.ts 。代码如下:

实现逻辑

/// <reference types="node" />  // 引入 Node.js 类型定义,以便使用 Node.js 的类型import * as stream from "stream";  // 导入 stream 模块,提供可读和可写流的基本功能// 声明 BittorrentProtocol 的常量,它是 BittorrentProtocol 命名空间中的一个构造函数
declare const BittorrentProtocol: BittorrentProtocol.BittorrentProtocol;// 声明 BittorrentProtocol 命名空间,c++中不用declare关键字namespace BittorrentProtocol {}
declare namespace BittorrentProtocol {// 定义 BittorrentProtocol 接口interface BittorrentProtocol {new(): Wire;  // 构造函数,返回一个 Wire 实例(): Wire;     // 允许将该接口作为函数调用,同样返回 Wire 实例}// 定义扩展构造函数接口interface ExtensionConstructor {new(wire: Wire): Extension;  // 构造函数,接受 Wire 实例作为参数,返回一个扩展实例}// 定义扩展接口interface Extension {onHandshake?(infoHash: string, peerId: string, extensions: { [name: string]: boolean }): void;onExtendedHandshake?(handshake: { [key: string]: any }): void;onMessage?(buf: Buffer): void;name: string; }// 定义请求接口interface Request {piece: number;  // 请求的数据块索引offset: number; // 请求的偏移量length: number; // 请求的长度callback(): void; // 回调函数,用于响应请求}// 定义 Wire 接口,继承自 stream.Duplex双向流,Duplex 流通常用于网络通信、数据压缩加密解密处理和数据转换等需要读写双向数据流处理的场景,常见的 Duplex 流有Tcp Scoket、Zlib、Cryptointerface Wire extends stream.Duplex {readonly peerId: string; // 对等方的 ID,十六进制字符串readonly peerIdBuffer: Buffer; // 对等方的 ID,以 Buffer 形式存储readonly type: "webrtc" | "tcpIncoming" | "tcpOutgoing" | "webSeed"; // 连接类型readonly amChoking: boolean; // 是否在阻塞对等方readonly amInterested: boolean; // 是否对对等方感兴趣readonly peerChoking: boolean; // 对等方是否在阻塞我们readonly peerInterested: boolean; // 对等方是否对我们感兴趣readonly requests: Request[]; // 当前请求列表readonly peerRequests: Request[]; // 对等方请求列表readonly extendedMapping: { [key: number]: string }; // 扩展映射readonly peerExtendedMapping: { [key: string]: number }; // 对等方扩展映射setKeepAlive(enable: boolean): void;// 设置保持连接活动,setKeepAlive输入boolean返回voidsetTimeout(ms: number, unref?: boolean): void;// 设置超时destroy(): any;// 销毁连接end(): any;// 结束连接// 使用扩展构造函数use(ext: ExtensionConstructor): void;[key: string]: any; // Wire对象可以有任意多数量的字符串键值属性handshake(infoHash: string | Buffer, peerId: string | Buffer, extensions?: any): void;// 握手方法choke(): void;// 阻塞对等方unchoke(): void;// 解除对等方的阻塞interested(): void;uninterested(): void;have(index: number): void; bitfield(bitfield: Buffer | any): void;// 发送比特字段request<T extends any>(index: number, offset: number, length: number, cb?: (err: Error) => T): T | void;// 请求数据块piece(index: number, offset: number, buffer: Buffer): void;cancel(index: number, offset: number, length: number): void;port(port: number): void;// 设置dht端口信息,wire.port(dhtPort) wire.on('port', dhtPort => {})extended(ext: number | string, obj: any): void;// 发送扩展消息Message: "extended" <len=0005+X><id=20><ext-number><payload>// 监听特定事件,由emit触发的事件on(event: "bitfield", listener: (bitfield: any) => void): this;on(event: "keep-alive" | "choke" | "unchoke" | "interested" | "uninterested" | "timeout",listener: () => void,): this;on(event: "upload" | "have" | "download" | "port", listener: (length: number) => void): this;on(event: "handshake", listener: (infoHash: string, peerId: string, extensions: Extension[]) => void): this;on(event: "request",listener: (index: number, offset: number, length: number, respond: () => void) => void,): this;on(event: "piece", listener: (index: number, offset: number, buffer: Buffer) => void): this;on(event: "cancel", listener: (index: number, offset: number, length: number) => void): this;on(event: "extended", listener: (ext: "handshake" | string, buf: any) => void): void;on(event: "unknownmessage", listener: (buffer: Buffer) => void): this;on(event: string, listener: (...args: any[]) => void): this; // 支持任意事件}
}// 导出 BittorrentProtocol 作为模块
export = BittorrentProtocol;
  • 这段代码定义了 BittorrentProtocol 的类型结构,Wire 是这个协议的核心接口,继承自 stream.Duplex,所以它具备双向流的能力。它包含许多属性和方法,用于 BitTorrent 的数据传输、连接控制和扩展支持:

扩展支持:ExtensionConstructorExtension 接口

  • use() 方法接受一个 ExtensionConstructor,用于注册协议扩展。扩展模块可以增强协议的功能,允许通过 Extension 接口自定义连接行为和消息处理。
interface ExtensionConstructor {new(wire: Wire): Extension;// 返回值类型为 Extension
}
interface Extension {// 三个可选方法onHandshake?(infoHash: string, peerId: string, extensions: { [name: string]: boolean }): void;onExtendedHandshake?(handshake: { [key: string]: any }): void;onMessage?(buf: Buffer): void;name: string;
}
  • ExtensionConstructor:定义了扩展的构造器接口,扩展实例通过 new(wire: Wire) 的形式创建,并接收一个 Wire 实例。
  • Extension:定义扩展实例的接口,提供了几个可选的回调方法:
    • onHandshake:在协议握手时触发。
    • onExtendedHandshake:在扩展握手时触发。
    • onMessage:在接收到扩展消息时触发。

6. 事件监听和处理

Wire 接口定义了许多事件,通过 .on() 方法来监听,如:

  • 状态变化事件chokeunchokeinteresteduninterested 等。
  • 数据事件uploaddownload 等,用于数据流量的监控。
  • 协议事件handshakerequestpiececancel 等,用于数据请求和数据传输。
  • 扩展事件extendedunknownmessage,用于扩展的消息通信和处理未识别消息。

示例:使用扩展

import Protocol from 'bittorrent-protocol';const wire = new Protocol();
wire.use(class ExampleExtension {constructor(wire) {wire.on('handshake', (infoHash, peerId) => {console.log('Handshake received:', infoHash, peerId);});}
});
// 导入所需模块
import Protocol from 'bittorrent-protocol';
import { Buffer } from 'buffer';// 创建一个新的 Protocol 实例
const wire = new Protocol();// 使用自定义的扩展
wire.use(class ExampleExtension {constructor(wire) {// 注册握手事件的监听器wire.on('handshake', (infoHash, peerId) => {console.log('Handshake received:', infoHash.toString('hex'), peerId.toString('utf8'));});}
});// 模拟握手过程
// 假设 infoHash 和 peerId 是从其他节点获得的
const exampleInfoHash = Buffer.from('1234567890abcdef1234567890abcdef12345678', 'hex');
const examplePeerId = Buffer.from('-AZ2060-abcdefghij1234567890abcdef', 'utf8');// 触发握手事件
wire.emit('handshake', exampleInfoHash, examplePeerId);// 其他初始化(如连接到 peers 等)
// wire.connect(peer)  // 连接到某个 peer 的示例代码

示例:使用扩展

// !!! 代码来源 https://github.com/sampi/p2p-messages !!!
import { Buffer } from 'safe-buffer';
// var debug = require('debug')('ut_messages');
import { EventEmitter } from 'events';
import inherits from 'inherits';const UT_MESSAGES = 'ut_messages';export default function() {inherits(utMessages, EventEmitter);function utMessages(wire) {EventEmitter.call(this);this._wire = wire;this._id = Math.random();}// Name of the bittorrent-protocol extensionutMessages.prototype.name = UT_MESSAGES;utMessages.prototype.onMessage = function(buf) {console.log("onMessage of utMessage")try {var str = buf.toString();var data = JSON.parse(str);if (data.hack_client_id) {this.emit('hack_client_id', data.hack_client_id);} else {}this.emit('hack_message', data);} catch (err) {// drop invalid messagesconsole.log("some error on utMessage")return;}};utMessages.prototype.handleMessage = function() {};utMessages.prototype.sendMessage = function(msg) {var buf = new Buffer(JSON.stringify(msg));this._wire.extended(UT_MESSAGES, buf);};return utMessages;
}
  • clinet
import WebTorrent from 'webtorrent';
import utMessages from './utMessages.js';
import constants from './constants.js';export default function setup(opts = {}) {return new Promise((resolve, reject) => {try {const clientTorrent = new WebTorrent({maxConns: constants.MAX_CONNECTIONS});clientTorrent.add(opts.magnetURI,{maxWebConns: constants.MAX_CONNECTIONS},torrent => {const wire = torrent.wires[0];wire.use(utMessages());wire.ut_messages.on('hack_message', opts.onMessage);wire.ut_messages.on('hack_client_id', clientId =>resolve({clientId,sendMessage: message => {message.clientId = clientId;return wire.ut_messages.sendMessage.apply(wire.ut_messages, [message]);}}));});} catch (e) {console.error('Something went wrong', e);reject(e);}});
}
import setupHost, { sendMessages } from './host.js';
import setup from './client.js';const opts = {magnetURI: 'magnet:?xt=urn:btih:ccbbc5da5ee83d6b15a031fba2ef00e10fd5ffab&dn=28007a0c-8cc8-4cd5-a145-57825396d786&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com',// window.location.hash.substring(1),onMessage: data => {console.log('Message from Host:', data);}
};
setup(opts).then(({ clientId, sendMessage }) => {console.log(`[CLNT] ${clientId} connected to Host`);setInterval(() => sendMessage({ data: 'stuff from client to host every second' }), 1000);
});
  • host
import WebTorrent from 'webtorrent';
import uuid from './uuid.js';
import utMessages from './utMessages.js';
import constants from './constants.js';const clients = {};export default function setup(opts = {}) {try {const hostTorrent = new WebTorrent({maxConns: constants.MAX_CONNECTIONS});const id = uuid();const buf = new Buffer(id);buf.name = id;return new Promise((resolve, reject) => {hostTorrent.seed(buf, torrent => {resolve(torrent.magnetURI);torrent.on('wire', (wire, addr) => {let clientId = null;wire.use(utMessages());wire.peerExtendedMapping.ut_messages = 2; //HACK!if (opts.onConnectionRequest()) {clientId = wire.peerId;console.log(123)clients[clientId] = {clientId,sendMessage: wire.ut_messages.sendMessage.bind(wire.ut_messages)};setTimeout(() => {wire.ut_messages.on('hack_message', opts.onMessage);clients[clientId].sendMessage({hack_client_id: clientId});}, constants.CLIENT_SETUP_TIMEOUT);opts.onNewClient(clients[clientId]);} else {console.log('Client rejected from pool.');}wire.on('end', () => {if (clientId) {console.log(`${clientId} disconnected.`);clients[clientId].sendMessage = () => {};}});});torrent.on('error', () => console.log('error', arguments));torrent.on('warn', () => console.log('warn', arguments));});});} catch (e) {console.error('Something went wrong', e);}
}export function sendMessages(data) {Object.keys(clients).forEach(clientId => {clients[clientId].sendMessage(data);});
}export function getClients() {return clients;
}
import setupHost, { sendMessages } from './host.js';
import setupCLient from './client.js';const opts = {onConnectionRequest: () => true,onNewClient: client => {console.log('New client connected', client.clientId);return true;},onMessage: data => console.log('Message from Client:', data)
};
setupHost(opts).then(magnetURI => {console.log('[HOST] Now accepting connections!');console.log('http://localhost:3000/#' + magnetURI);
});setInterval(() => {sendMessages({ data: 'sent to all clients every second' });
}, 1000);setTimeout(() => {console.log('[HOST] 30seconds have passed, not allowing connections anymore!');opts.allowConnections = () => false;
}, 60000);
  • torrent.on(‘wire’, function (wire) {}) 是在 WebTorrent 客户端中,当一个新的对等体(peer)连接时,触发的事件。当这个事件被触发时,wire 会作为参数传递给回调函数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.xdnf.cn/news/5430.html

如若内容造成侵权/违法违规/事实不符,请联系一条长河网进行投诉反馈,一经查实,立即删除!

相关文章

使用Rust实现http/https正向代理

相关库的安装 利用vcpkg安装openssl库 vcpkg install openssl:x64-windows并设置openssl库位置的环境变量 $Env:OPENSSL_DIR"D:/vcpkg/packages/openssl_x64-windows/"安装openssl软件&#xff0c;因为需要利用openssl生成自签名证书 Cargo依赖 [dependencies] …

vue3如何使用pinia设置全局状态,附常见面试题

1. stores/index.ts 文件 在 index.ts 中创建 store 实例并封装了注册逻辑&#xff0c;这样可以方便地将整个 Pinia 实例注册到 Vue 应用中。代码如下&#xff1a; import type { App } from vue import { createPinia } from piniaconst store createPinia()// 全局注册 st…

【微知】Nvida Mellanox网卡中速率SDR、DDR、QDR、FDR、EDR、HDR、NDR全称与速率?

文章目录 综述背景全称早期速率&#xff1a;中期当前 其他 综述 Single Data Rate (SDR) 10Gbps Double Data Rate (DDR) 20Gbps Quad Data Rate (QDR) 40Gbps Fourteen Data Rate (FDR) 56Gbps Enhanced Data Rate (EDR) 100Gbps High Data Rate (HDR) 200Gbps Next Data Rat…

融合虚拟化与容器技术,打造灵活又安全的AI算力服务

随着人工智能技术的不断进步&#xff0c;AI企业在迅速推进大模型业务时&#xff0c;往往会倾向于采用容器化的轻量部署方案。相较于传统的虚拟机部署&#xff0c;容器化在快速部署、资源利用、环境一致性和自动化编排等方面具备显著优势。 然而&#xff0c;容器技术所固有的隔…

Hunyuan-Large:推动AI技术进步的下一代语言模型

腾讯近期推出了基于Transformer架构的混合专家&#xff08;MoE&#xff09;模型——Hunyuan-Large&#xff08;Hunyuan-MoE-A52B&#xff09;。该模型目前是业界开源的最大MoE模型之一&#xff0c;拥有3890亿总参数和520亿激活参数&#xff0c;展示了极强的计算能力和资源优化优…

岛屿数量 广搜版BFS C#

和之前的卡码网深搜版是一道题 力扣第200题 99. 岛屿数量 题目描述 给定一个由 1&#xff08;陆地&#xff09;和 0&#xff08;水&#xff09;组成的矩阵&#xff0c;你需要计算岛屿的数量。岛屿由水平方向或垂直方向上相邻的陆地连接而成&#xff0c;并且四周都是水域。…

本地使用conda创建django虚拟环境

1、首先本地安装好conda。 2、创建django的虚拟环境 conda create -n django # 这里的 django只是虚拟的名称&#xff0c;自己随便名字就行&#xff0c;只要你自己知道这个是django的虚拟环境就行。 3、安装成功&#xff0c;查看虚拟环境 conda env list 4、激活虚拟环境…

rabbitMQ

官网&#xff1a;https://www.rabbitmq.com/ 一 介绍与安装 1 安装 我们同样基于Docker来安装RabbitMQ&#xff0c;使用下面的命令即可&#xff1a; docker run \-e RABBITMQ_DEFAULT_USERitheima \-e RABBITMQ_DEFAULT_PASS123321 \-v mq-plugins:/plugins \--name rabbi…

reg注册表研究与物理Hack

reg注册表研究与物理Hack 声明&#xff1a;内容的只是方便各位师傅学习知识&#xff0c;以下网站只涉及学习内容&#xff0c;其他的都与本人无关&#xff0c;切莫逾越法律红线&#xff0c;否则后果自负。 目录 reg注册表研究与物理HackWindows注册表修改注册表实现应用程序开机…

【黑盒测试】等价类划分法及实例

本文主要介绍黑盒测试之等价类划分法&#xff0c;如什么是等价类划分法&#xff0c;以及如何划分&#xff0c;设计等价类表。以及关于三角形案例的等价类划分法。 文章目录 一、什么是等价类划分法 二、划分等价类和列出等价类表 三、确定等价类的原则 四、建立等价类表 …

适用于个人或团队的文档管理和知识库系统,NAS快速部署『BookStack』

适用于个人或团队的文档管理和知识库系统&#xff0c;NAS快速部署『BookStack』 哈喽小伙伴们好&#xff0c;我是Stark-C~ 知识库对于很多需要和文字打交道的个人或者团队都不陌生对吧&#xff1f;对于我们个人来说&#xff0c;它可以将常用的学习资料、工作笔记、项目计划和…

delphi fmx android 自动更新(一)

12.2 android10测试通过 一,安卓权限设置 1,REQUEST_INSTALL_PACKAGES 权限 2,INTERNET 权限 3,READ_EXTERNAL_STORAGE 权限 4,WRITE_EXTERNAL_STORAGE 权限 5,READ_PHONE_STATE 二,安卓下载过程 一般是从http下载安装包 apk 所以,如果是http 则,manife…

《JVM第7课》堆区

文章目录 1.概念2.指定堆大小3.新生代和老年代3.1 新生代3.2 老年代3.3 动画演示 4.分代收集理念 1.概念 堆是JVM中最重要的一块区域&#xff0c;JVM规范中规定所有的对象和数组都应该存放在堆中&#xff0c;在执行字节码指令时&#xff0c;会把创建的对象存入堆中&#xff0c…

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …

Spring Boot框架下的知识管理与多维分类

4 系统设计 系统分析接下来的操作步骤就是系统的设计&#xff0c;这部分内容也是不能马虎对待的。因为生活都是在不断产生变化&#xff0c;人们需求也是在不断改变&#xff0c;开发技术也是在不断升级&#xff0c;所以程序也需要考虑在今后可以方便进行功能扩展&#xff0c;完成…

LeetCode17. 电话号码的字母组合(2024秋季每日一题 59)

给定一个仅包含数字 2-9 的字符串&#xff0c;返回所有它能表示的字母组合。答案可以按 任意顺序 返回。 给出数字到字母的映射如下&#xff08;与电话按键相同&#xff09;。注意 1 不对应任何字母。 示例 1&#xff1a; 输入&#xff1a;digits “23” 输出&#xff1a;[“…

Nature Methods | 基于流形约束的RNA速度推断精准解析细胞周期动态调节规律

生信碱移 VeloCycle算法 VeloCycle&#xff1a;基于流形约束的RNA速度推断在细胞周期动态中的精准解析 今天给各位老铁们分享一篇于2024年10月31号发表在 Nature Methods [IF: 36.1] 的文章&#xff1a;"Statistical inference with a manifold-constrained RNA velocity…

Spring挖掘:(AOP篇)

学习AOP时,我们首先来了解一下何为AOP 一. 概念 AOP&#xff08;面向切面编程&#xff0c;Aspect Oriented Programming&#xff09;是一种编程技术&#xff0c;旨在通过预编译方式或运行期动态代理实现程序功能的统一管理和增强。AOP的主要目标是在不改变原有业务逻辑代码的…

【机器学习】k最近邻分类

&#x1f4dd;本文介绍 本文为作者阅读鸢尾花书籍以及一些其他资料的k最近邻分类后&#xff0c;所作笔记 &#x1f44b;作者简介&#xff1a;一个正在积极探索的本科生 &#x1f4f1;联系方式&#xff1a;943641266(QQ) &#x1f6aa;Github地址&#xff1a;https://github.com…

《深度学习》bert自然语言处理框架

目录 一&#xff0c;关于bert框架 1、什么是bert 2、模型结构 自注意力机制&#xff1a; 3、预训练任务 4、双向性 5、微调&#xff08;Fine-tuning&#xff09; 6、表现与影响 二、Transformer 1、传统RNN网络计算时存在的问题 1&#xff09;串联 2&#xff09;并行…