bittorrent-protocol简介
-
bittorrent-protocol 是 BitTorrent 协议的一个底层实现库。
-
此库主要用于管理对等连接和消息传递(握手、数据块请求、数据传输、完成通知)。它可以用于构建 BitTorrent 客户端、种子服务器,或支持 BitTorrent 协议的其他应用。
-
官方提供的
https://github.com/webtorrent/bittorrent-protocol
简单示例如下:
import Protocol from 'bittorrent-protocol' // 导入 bittorrent-protocol 库
import net from 'net' // 导入 Node.js 的 net 模块,用于创建 TCP 服务器// 创建一个 TCP 服务器,监听连接
net.createServer(socket => {const wire = new Protocol() // 创建一个新的 Protocol 实例,用于处理 BitTorrent 协议// 将 socket 和协议 wire 连接起来socket.pipe(wire).pipe(socket)// 将 socket 的数据流通过 wire 进行双向传输// 监听握手事件wire.on('handshake', (infoHash, peerId) => { // 接收到握手信息,infoHash 和 peerId 是十六进制字符串// 向对等方发送我们自己的握手信息wire.handshake('my info hash (hex)', 'my peer id (hex)')})// 监听 unchoke 事件,表示对等方不再阻塞我们。注: bittorrent-protocol可管理连接的状态信息,包括双方的 choke/uninterested 状态、数据块请求状态等。wire.on('unchoke', () => {// 输出对等方的 choke 状态console.log('对等方不再阻塞我们: ' + wire.peerChoking)})
}).listen(6881) // 服务器监听 6881 端口
bittorrent-protocol的扩展机制
-
bittorrent-protocol还允许开发者通过扩展来实现复杂功能,比如元数据交换或自定义消息格式(
ut_metadata
扩展实现直接与其他对等节点交换种子文件的元数据,而无需传统的.torrent
文件。)。 -
当前版本的bittorrent-protocol为一个1300行的js文件。在 Node.js 中安装时,Node提供了
@types 类型定义文件夹
(此文件方便了在TypeScript 项目中使用 JavaScript 库),其中包括bittorrent-protocol中所用到的类型的 TypeScript 方式定义文件bittorrent-protocol/index.d.ts
。代码如下:
实现逻辑
/// <reference types="node" /> // 引入 Node.js 类型定义,以便使用 Node.js 的类型import * as stream from "stream"; // 导入 stream 模块,提供可读和可写流的基本功能// 声明 BittorrentProtocol 的常量,它是 BittorrentProtocol 命名空间中的一个构造函数
declare const BittorrentProtocol: BittorrentProtocol.BittorrentProtocol;// 声明 BittorrentProtocol 命名空间,c++中不用declare关键字namespace BittorrentProtocol {}
declare namespace BittorrentProtocol {// 定义 BittorrentProtocol 接口interface BittorrentProtocol {new(): Wire; // 构造函数,返回一个 Wire 实例(): Wire; // 允许将该接口作为函数调用,同样返回 Wire 实例}// 定义扩展构造函数接口interface ExtensionConstructor {new(wire: Wire): Extension; // 构造函数,接受 Wire 实例作为参数,返回一个扩展实例}// 定义扩展接口interface Extension {onHandshake?(infoHash: string, peerId: string, extensions: { [name: string]: boolean }): void;onExtendedHandshake?(handshake: { [key: string]: any }): void;onMessage?(buf: Buffer): void;name: string; }// 定义请求接口interface Request {piece: number; // 请求的数据块索引offset: number; // 请求的偏移量length: number; // 请求的长度callback(): void; // 回调函数,用于响应请求}// 定义 Wire 接口,继承自 stream.Duplex双向流,Duplex 流通常用于网络通信、数据压缩加密解密处理和数据转换等需要读写双向数据流处理的场景,常见的 Duplex 流有Tcp Scoket、Zlib、Cryptointerface Wire extends stream.Duplex {readonly peerId: string; // 对等方的 ID,十六进制字符串readonly peerIdBuffer: Buffer; // 对等方的 ID,以 Buffer 形式存储readonly type: "webrtc" | "tcpIncoming" | "tcpOutgoing" | "webSeed"; // 连接类型readonly amChoking: boolean; // 是否在阻塞对等方readonly amInterested: boolean; // 是否对对等方感兴趣readonly peerChoking: boolean; // 对等方是否在阻塞我们readonly peerInterested: boolean; // 对等方是否对我们感兴趣readonly requests: Request[]; // 当前请求列表readonly peerRequests: Request[]; // 对等方请求列表readonly extendedMapping: { [key: number]: string }; // 扩展映射readonly peerExtendedMapping: { [key: string]: number }; // 对等方扩展映射setKeepAlive(enable: boolean): void;// 设置保持连接活动,setKeepAlive输入boolean返回voidsetTimeout(ms: number, unref?: boolean): void;// 设置超时destroy(): any;// 销毁连接end(): any;// 结束连接// 使用扩展构造函数use(ext: ExtensionConstructor): void;[key: string]: any; // Wire对象可以有任意多数量的字符串键值属性handshake(infoHash: string | Buffer, peerId: string | Buffer, extensions?: any): void;// 握手方法choke(): void;// 阻塞对等方unchoke(): void;// 解除对等方的阻塞interested(): void;uninterested(): void;have(index: number): void; bitfield(bitfield: Buffer | any): void;// 发送比特字段request<T extends any>(index: number, offset: number, length: number, cb?: (err: Error) => T): T | void;// 请求数据块piece(index: number, offset: number, buffer: Buffer): void;cancel(index: number, offset: number, length: number): void;port(port: number): void;// 设置dht端口信息,wire.port(dhtPort) wire.on('port', dhtPort => {})extended(ext: number | string, obj: any): void;// 发送扩展消息Message: "extended" <len=0005+X><id=20><ext-number><payload>// 监听特定事件,由emit触发的事件on(event: "bitfield", listener: (bitfield: any) => void): this;on(event: "keep-alive" | "choke" | "unchoke" | "interested" | "uninterested" | "timeout",listener: () => void,): this;on(event: "upload" | "have" | "download" | "port", listener: (length: number) => void): this;on(event: "handshake", listener: (infoHash: string, peerId: string, extensions: Extension[]) => void): this;on(event: "request",listener: (index: number, offset: number, length: number, respond: () => void) => void,): this;on(event: "piece", listener: (index: number, offset: number, buffer: Buffer) => void): this;on(event: "cancel", listener: (index: number, offset: number, length: number) => void): this;on(event: "extended", listener: (ext: "handshake" | string, buf: any) => void): void;on(event: "unknownmessage", listener: (buffer: Buffer) => void): this;on(event: string, listener: (...args: any[]) => void): this; // 支持任意事件}
}// 导出 BittorrentProtocol 作为模块
export = BittorrentProtocol;
- 这段代码定义了
BittorrentProtocol
的类型结构,Wire
是这个协议的核心接口,继承自stream.Duplex
,所以它具备双向流的能力。它包含许多属性和方法,用于 BitTorrent 的数据传输、连接控制和扩展支持:
扩展支持:ExtensionConstructor
和 Extension
接口
use()
方法接受一个ExtensionConstructor
,用于注册协议扩展。扩展模块可以增强协议的功能,允许通过Extension
接口自定义连接行为和消息处理。
interface ExtensionConstructor {new(wire: Wire): Extension;// 返回值类型为 Extension
}
interface Extension {// 三个可选方法onHandshake?(infoHash: string, peerId: string, extensions: { [name: string]: boolean }): void;onExtendedHandshake?(handshake: { [key: string]: any }): void;onMessage?(buf: Buffer): void;name: string;
}
ExtensionConstructor
:定义了扩展的构造器接口,扩展实例通过new(wire: Wire)
的形式创建,并接收一个Wire
实例。Extension
:定义扩展实例的接口,提供了几个可选的回调方法:onHandshake
:在协议握手时触发。onExtendedHandshake
:在扩展握手时触发。onMessage
:在接收到扩展消息时触发。
6. 事件监听和处理
Wire
接口定义了许多事件,通过 .on()
方法来监听,如:
- 状态变化事件:
choke
、unchoke
、interested
、uninterested
等。 - 数据事件:
upload
、download
等,用于数据流量的监控。 - 协议事件:
handshake
、request
、piece
、cancel
等,用于数据请求和数据传输。 - 扩展事件:
extended
和unknownmessage
,用于扩展的消息通信和处理未识别消息。
示例:使用扩展
import Protocol from 'bittorrent-protocol';const wire = new Protocol();
wire.use(class ExampleExtension {constructor(wire) {wire.on('handshake', (infoHash, peerId) => {console.log('Handshake received:', infoHash, peerId);});}
});
// 导入所需模块
import Protocol from 'bittorrent-protocol';
import { Buffer } from 'buffer';// 创建一个新的 Protocol 实例
const wire = new Protocol();// 使用自定义的扩展
wire.use(class ExampleExtension {constructor(wire) {// 注册握手事件的监听器wire.on('handshake', (infoHash, peerId) => {console.log('Handshake received:', infoHash.toString('hex'), peerId.toString('utf8'));});}
});// 模拟握手过程
// 假设 infoHash 和 peerId 是从其他节点获得的
const exampleInfoHash = Buffer.from('1234567890abcdef1234567890abcdef12345678', 'hex');
const examplePeerId = Buffer.from('-AZ2060-abcdefghij1234567890abcdef', 'utf8');// 触发握手事件
wire.emit('handshake', exampleInfoHash, examplePeerId);// 其他初始化(如连接到 peers 等)
// wire.connect(peer) // 连接到某个 peer 的示例代码
示例:使用扩展
// !!! 代码来源 https://github.com/sampi/p2p-messages !!!
import { Buffer } from 'safe-buffer';
// var debug = require('debug')('ut_messages');
import { EventEmitter } from 'events';
import inherits from 'inherits';const UT_MESSAGES = 'ut_messages';export default function() {inherits(utMessages, EventEmitter);function utMessages(wire) {EventEmitter.call(this);this._wire = wire;this._id = Math.random();}// Name of the bittorrent-protocol extensionutMessages.prototype.name = UT_MESSAGES;utMessages.prototype.onMessage = function(buf) {console.log("onMessage of utMessage")try {var str = buf.toString();var data = JSON.parse(str);if (data.hack_client_id) {this.emit('hack_client_id', data.hack_client_id);} else {}this.emit('hack_message', data);} catch (err) {// drop invalid messagesconsole.log("some error on utMessage")return;}};utMessages.prototype.handleMessage = function() {};utMessages.prototype.sendMessage = function(msg) {var buf = new Buffer(JSON.stringify(msg));this._wire.extended(UT_MESSAGES, buf);};return utMessages;
}
- clinet
import WebTorrent from 'webtorrent';
import utMessages from './utMessages.js';
import constants from './constants.js';export default function setup(opts = {}) {return new Promise((resolve, reject) => {try {const clientTorrent = new WebTorrent({maxConns: constants.MAX_CONNECTIONS});clientTorrent.add(opts.magnetURI,{maxWebConns: constants.MAX_CONNECTIONS},torrent => {const wire = torrent.wires[0];wire.use(utMessages());wire.ut_messages.on('hack_message', opts.onMessage);wire.ut_messages.on('hack_client_id', clientId =>resolve({clientId,sendMessage: message => {message.clientId = clientId;return wire.ut_messages.sendMessage.apply(wire.ut_messages, [message]);}}));});} catch (e) {console.error('Something went wrong', e);reject(e);}});
}
import setupHost, { sendMessages } from './host.js';
import setup from './client.js';const opts = {magnetURI: 'magnet:?xt=urn:btih:ccbbc5da5ee83d6b15a031fba2ef00e10fd5ffab&dn=28007a0c-8cc8-4cd5-a145-57825396d786&tr=udp%3A%2F%2Fexplodie.org%3A6969&tr=udp%3A%2F%2Ftracker.coppersurfer.tk%3A6969&tr=udp%3A%2F%2Ftracker.empire-js.us%3A1337&tr=udp%3A%2F%2Ftracker.leechers-paradise.org%3A6969&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.fastcast.nz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com',// window.location.hash.substring(1),onMessage: data => {console.log('Message from Host:', data);}
};
setup(opts).then(({ clientId, sendMessage }) => {console.log(`[CLNT] ${clientId} connected to Host`);setInterval(() => sendMessage({ data: 'stuff from client to host every second' }), 1000);
});
- host
import WebTorrent from 'webtorrent';
import uuid from './uuid.js';
import utMessages from './utMessages.js';
import constants from './constants.js';const clients = {};export default function setup(opts = {}) {try {const hostTorrent = new WebTorrent({maxConns: constants.MAX_CONNECTIONS});const id = uuid();const buf = new Buffer(id);buf.name = id;return new Promise((resolve, reject) => {hostTorrent.seed(buf, torrent => {resolve(torrent.magnetURI);torrent.on('wire', (wire, addr) => {let clientId = null;wire.use(utMessages());wire.peerExtendedMapping.ut_messages = 2; //HACK!if (opts.onConnectionRequest()) {clientId = wire.peerId;console.log(123)clients[clientId] = {clientId,sendMessage: wire.ut_messages.sendMessage.bind(wire.ut_messages)};setTimeout(() => {wire.ut_messages.on('hack_message', opts.onMessage);clients[clientId].sendMessage({hack_client_id: clientId});}, constants.CLIENT_SETUP_TIMEOUT);opts.onNewClient(clients[clientId]);} else {console.log('Client rejected from pool.');}wire.on('end', () => {if (clientId) {console.log(`${clientId} disconnected.`);clients[clientId].sendMessage = () => {};}});});torrent.on('error', () => console.log('error', arguments));torrent.on('warn', () => console.log('warn', arguments));});});} catch (e) {console.error('Something went wrong', e);}
}export function sendMessages(data) {Object.keys(clients).forEach(clientId => {clients[clientId].sendMessage(data);});
}export function getClients() {return clients;
}
import setupHost, { sendMessages } from './host.js';
import setupCLient from './client.js';const opts = {onConnectionRequest: () => true,onNewClient: client => {console.log('New client connected', client.clientId);return true;},onMessage: data => console.log('Message from Client:', data)
};
setupHost(opts).then(magnetURI => {console.log('[HOST] Now accepting connections!');console.log('http://localhost:3000/#' + magnetURI);
});setInterval(() => {sendMessages({ data: 'sent to all clients every second' });
}, 1000);setTimeout(() => {console.log('[HOST] 30seconds have passed, not allowing connections anymore!');opts.allowConnections = () => false;
}, 60000);
- torrent.on(‘wire’, function (wire) {}) 是在 WebTorrent 客户端中,当一个新的对等体(peer)连接时,触发的事件。当这个事件被触发时,wire 会作为参数传递给回调函数