(原创)通过Python编写Xray批量测试脚本 仅用于学习和方法研究
以上介绍来自本站自建的GPT服务,自打网站被黑之后,一直在研究如何加强网站的防御,以及如何对网站进行尽可能的测试,所以写了这一系列的文章,能够使得一些工具实现串成串的效果。
代码如下:
新版(1、支持跨平台操作2、线程数决定于CPU数量):
# -*- coding: utf-8 -*- import os import re import time import sys import platform import multiprocessing from queue import Queue from threading import Thread print(''' __ __ _____ ____ ____ _ | \/ |_ _| ___| _ \ / ___| / \ ___ _ __ | |\/| | | | | |_ | |_) | | _ / _ \ / __| '_ \ | | | | |_| | _| | __/| |_| |/ ___ \ | (__| | | | |_| |_|\__, |_| |_| \____/_/ \_(_)___|_| |_| |___/ Xray Batch Control tool {1.7.2#main} *Internal version code sensitive and confidential* ''') def readfile(): # 创建一个队列用于存储扫描目标 target_queue = Queue() # 从文件 'targets.txt' 读取目标并加入队列 targets_list = [line.strip() for line in open('targets.txt', encoding='utf-8')] for target in targets_list: target_queue.put(target) return target_queue def format_time(seconds): # 将秒数格式化为时:分:秒的形式 hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def scan(target_queue, xray_path, total_targets): start_time = time.time() while True: try: target = target_queue.get() # 从目标中提取域名部分 domain_match = re.search(r'(.*?//)?(.*)', target) if domain_match: domain = domain_match.group(2) # 替换域名中的特殊字符,用于生成输出文件名 domain = domain.replace(":", "_").replace("/", "_") # 构建扫描命令 command = f"{xray_path} webscan --basic-crawler {target} --html-output ./output/{domain}.html" output = os.popen(command) processed_targets = total_targets - target_queue.qsize() elapsed_time = time.time() - start_time targets_per_second = processed_targets / elapsed_time remaining_targets = target_queue.qsize() estimated_time = remaining_targets / targets_per_second if targets_per_second > 0 else 0 print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 正在处理目标:{processed_targets}/{total_targets},估计还需要 {format_time(estimated_time)}。") try: result = output.read() if "[Vuln:" in result: print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 发现漏洞:{result}") except UnicodeDecodeError: pass finally: target_queue.task_done() def get_platform_info(): system = platform.system() # 获取操作系统名称 architecture = platform.architecture() # 获取架构和位数信息 if system == 'Linux': # 在Linux上,可以使用os.uname()获取更详细的架构信息 uname = os.uname() machine = uname.machine if 'aarch64' in machine: return f"Linux Arm64 {architecture[0]}" elif 'arm' in machine: return f"Linux Arm {architecture[0]}" elif 'x86_64' in machine: return f"Linux x86_64 {architecture[0]}" elif 'i386' in machine: return f"Linux i386 {architecture[0]}" else: return f"Linux {machine} {architecture[0]}" elif system == 'Windows': machine = platform.machine() if machine == 'AMD64': return f"Windows x64 {architecture[0]}" elif machine == 'i386': return f"Windows x86 {architecture[0]}" else: return f"Windows {machine} {architecture[0]}" else: return f"Unsupported OS: {system}" def select_xray_executable(): platform_info = get_platform_info() if "Linux" in platform_info: if "Arm64" in platform_info: return "xray_linux_arm64" elif "i386" in platform_info: return "xray_linux_386" elif "x86_64" in platform_info: return "xray_linux_amd64" elif "Windows" in platform_info: if "i386" in platform_info: return "xray_windows_386.exe" elif "x64" in platform_info: return "xray_windows_amd64.exe" elif "Darwin" in platform_info: if "Arm64" in platform_info: return "xray_darwin_arm64" elif "x86_64" in platform_info: return "xray_darwin_amd64" return "Unsupported platform" if __name__ == "__main__": # 根据操作系统选择 xray 可执行文件路径 xray_path = select_xray_executable() target_queue = readfile() total_targets = target_queue.qsize() # 使用适量的线程来并行扫描目标 usr_threads = multiprocessing.cpu_count() #线程数,可以固定修改为10等 for i in range(usr_threads): t = Thread(target=scan, args=(target_queue, xray_path, total_targets)) t.daemon = True t.start() target_queue.join() print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 所有任务已完成!")
历史版本:
# -*- coding: utf-8 -*- import os import re import time import sys from queue import Queue from threading import Thread print(''' __ __ _____ ____ ____ _ | \/ |_ _| ___| _ \ / ___| / \ ___ _ __ | |\/| | | | | |_ | |_) | | _ / _ \ / __| '_ \ | | | | |_| | _| | __/| |_| |/ ___ \ | (__| | | | |_| |_|\__, |_| |_| \____/_/ \_(_)___|_| |_| |___/ Xray Batch Control tool {1.7.1#main} *Internal version code sensitive and confidential* ''') def readfile(): # 创建一个队列用于存储扫描目标 target_queue = Queue() # 从文件 'targets.txt' 读取目标并加入队列 targets_list = [line.strip() for line in open('targets.txt', encoding='utf-8')] for target in targets_list: target_queue.put(target) return target_queue def format_time(seconds): # 将秒数格式化为时:分:秒的形式 hours, remainder = divmod(int(seconds), 3600) minutes, seconds = divmod(remainder, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d}" def scan(target_queue, xray_path, total_targets): start_time = time.time() while True: try: target = target_queue.get() # 从目标中提取域名部分 domain_match = re.search(r'(.*?//)?(.*)', target) if domain_match: domain = domain_match.group(2) # 替换域名中的特殊字符,用于生成输出文件名 domain = domain.replace(":", "_").replace("/", "_") # 构建扫描命令 command = f"{xray_path} webscan --basic-crawler {target} --html-output ./output/{domain}.html" output = os.popen(command) processed_targets = total_targets - target_queue.qsize() elapsed_time = time.time() - start_time targets_per_second = processed_targets / elapsed_time remaining_targets = target_queue.qsize() estimated_time = remaining_targets / targets_per_second if targets_per_second > 0 else 0 print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 正在处理目标:{processed_targets}/{total_targets},估计还需要 {format_time(estimated_time)}。") try: result = output.read() if "[Vuln:" in result: print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 发现漏洞:{result}") except UnicodeDecodeError: pass finally: target_queue.task_done() if __name__ == "__main__": # 根据操作系统选择 xray 可执行文件路径 xray_path = "xray_windows_amd64.exe" if sys.platform == 'win32' else "xray_linux_amd64" target_queue = readfile() total_targets = target_queue.qsize() # 使用适量的线程来并行扫描目标 for i in range(10): t = Thread(target=scan, args=(target_queue, xray_path, total_targets)) t.daemon = True t.start() target_queue.join() print(f"[{time.strftime('%H:%M:%S', time.localtime(time.time()))}] [INFO] 所有任务已完成!")