当前位置:首页 > 信息安全 > 正文内容

(原创)通过Python编写网络爬虫程序并保存结果(SearX) 仅用于学习和方法研究

chanra1n1年前 (2023-09-28)信息安全5975

简介

在今天的数字时代,网络上的信息量庞大,但有时我们需要从多个网站中收集数据。在这篇博客中,我们将介绍如何使用Python编写一个网络爬虫程序,以自动从多个网站中抓取数据,并将结果保存到文件中。我们将使用一个名为SearxCheckList的示例程序来演示这个过程。

第一步:准备工作

在开始编写网络爬虫程序之前,我们需要做一些准备工作。首先,确保你已经安装了Python和所需的依赖库。我们将使用requests库来发送HTTP请求,re库来处理正则表达式,以及socks库来设置代理服务器。你可以使用pip来安装这些库:

pip install requests
pip install pysocks

第二步:编写爬虫程序

接下来,我们将编写一个Python程序来执行网络爬虫任务。以下是示例程序的主要部分:

推荐使用本程序,实时有1000+ SearX节点,嘎嘎好用!请勿用于非法用途,仅用于学习。

import re
import socks
import requests
import urllib.parse
import json
import sys
import time

print('''
 __  __       _____ ____   ____    _                
|  \/  |_   _|  ___|  _ \ / ___|  / \     ___ _ __  
| |\/| | | | | |_  | |_) | |  _  / _ \   / __| '_ \
| |  | | |_| |  _| |  __/| |_| |/ ___ \ | (__| | | |
|_|  |_|\__, |_|   |_|    \____/_/   \_(_)___|_| |_|
        |___/                                      
      SearX search tool {1.7.1#main}
''')

def modify_url(base_url, param1, param2, page):
    modified_url = base_url.replace('test', f'{param1}_{param2}') + f'&pageno={page}'
    return modified_url

def fetch_retVal(url, headers, timeout=10):  # 设置超时时间,默认为10秒
    try:
        response = requests.get(url, headers=headers, timeout=timeout)
        if response.status_code == 200:
            page_content = response.text
            regex = r'<a href="([^"]+)" class="url_wrapper"'
            extracted_urls = [urllib.parse.unquote(match.group(1).replace("&amp;", "&")) for match in re.finditer(regex, page_content, re.I | re.S)]
            if len(extracted_urls) < 2:
                return None
            else:
                return extracted_urls
    except requests.exceptions.RequestException as e:
        print(f'ERROR: 请求超时 - {str(e)}')
    return None

# 请求头信息
headers = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
    "Accept-Encoding": "gzip, deflate, br",
    "Accept-Language": "zh-CN,zh;q=0.9",
    "Cache-Control": "no-cache",
    "Dnt": "1",
    "Pragma": "no-cache",
    "Sec-Ch-Ua": '"Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"',
    "Sec-Ch-Ua-Mobile": "?0",
    "Sec-Ch-Ua-Platform": '"Windows"',
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "same-origin",
    "Sec-Fetch-User": "?1",
    "Upgrade-Insecure-Requests": "1",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}

# 获取命令行参数
if len(sys.argv) == 2:
    param1 = sys.argv[1]
    param2 = ""  # 如果只提供一个参数,则将param2设置为空字符串
elif len(sys.argv) == 3:
    param1 = sys.argv[1]
    param2 = sys.argv[2]
else:
    print("Usage:python script.py <param1> [<param2>]")
    sys.exit(1)

# 获取域名列表
url_domain_list = 'https://data.myfpga.cn/searx.txt'
try:
    response = requests.get(url_domain_list, headers=headers)
    if response.status_code == 200:

        domain_names = response.text.split('\n')

    else:
        print("ERROR:无法获取域名列表,请联系管理员")
        sys.exit(1)
except Exception as e:
    print(f"ERROR:无法获取域名列表:{str(e)}")
    sys.exit(1)

results = []  # 存储所有页面的结果
found_result = False  # 标志变量,用于表示是否已经找到一个可用的域名

for domain in domain_names:
    domain = domain.strip()  # 去除域名两端的空格
    if not domain:
        continue  # 忽略空白域名

    print(f'INFO:尝试更换域名为 {domain}')
    if found_result:
        break  # 如果已经找到一个可用的域名,则停止搜索其他域名
    page = 1
    url = modify_url(f'{domain}/search?q=test&language=auto&time_range=&safesearch=0&theme=simple', param1, param2, page)

    # 获取 retVal
    try:
        retVal = fetch_retVal(url, headers)
    except Exception as e:
        print(f'ERROR:确认域名 {domain} 不可用 - {str(e)}')

    if retVal is not None:
        found_result = True  # 找到可用的域名,将标志变量设置为True
        while retVal:
            results.extend(retVal)  # 将结果添加到结果列表
            page += 1
            url = modify_url(f'https://{domain}/search?q=test&language=auto&time_range=&safesearch=0&theme=simple', param1, param2, page)
            print(f"INFO:正在获取第{page}页数据")
            retVal = fetch_retVal(url, headers)

if found_result:
    # 输出结果为JSON格式
    result_json = json.dumps(results, ensure_ascii=False, indent=4)
    print(result_json)

    # 保存结果到文件
    timestamp = int(time.time())
    output_filename = f'SearxCheckList_{timestamp}.txt'
    with open(output_filename, 'w') as output_file:
        for result in results:
            output_file.write(result + '\n')
    print(f"INFO:结果已保存到文件:{output_filename}")
else:
    print("ERROR:程序无法获取结果,请联系管理员")

以下是本地searx.txt的版本,

import re
import socks
import requests
import urllib.parse
import json
import sys

import time


print('''
 __  __       _____ ____   ____    _                
|  \/  |_   _|  ___|  _ \ / ___|  / \     ___ _ __  
| |\/| | | | | |_  | |_) | |  _  / _ \   / __| '_ \
| |  | | |_| |  _| |  __/| |_| |/ ___ \ | (__| | | |
|_|  |_|\__, |_|   |_|    \____/_/   \_(_)___|_| |_|
        |___/                                      
      SearX search tool {1.6.0#main}
''')


def modify_url(base_url, param1, param2, page):
    modified_url = base_url.replace('test', f'{param1}_{param2}') + f'&pageno={page}'
    return modified_url

def extract_domain_names(filename):
    domain_names = []
    with open(filename, 'r') as file:
        for line in file:
            domain_names.append(line.strip())
    return domain_names

def fetch_retVal(url, headers):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        page_content = response.text
        regex = r'<a href="([^"]+)" class="url_wrapper"'
        extracted_urls = [urllib.parse.unquote(match.group(1).replace("&amp;", "&")) for match in re.finditer(regex, page_content, re.I | re.S)]
        if len(extracted_urls) < 2:
            return None
        else:
            return extracted_urls
    else:
        return None

# 请求头信息
headers = {
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
    "Accept-Encoding": "gzip, deflate, br",
    "Accept-Language": "zh-CN,zh;q=0.9",
    "Cache-Control": "no-cache",
    "Dnt": "1",
    "Pragma": "no-cache",
    "Sec-Ch-Ua": '"Not.A/Brand";v="8", "Chromium";v="114", "Google Chrome";v="114"',
    "Sec-Ch-Ua-Mobile": "?0",
    "Sec-Ch-Ua-Platform": '"Windows"',
    "Sec-Fetch-Dest": "document",
    "Sec-Fetch-Mode": "navigate",
    "Sec-Fetch-Site": "same-origin",
    "Sec-Fetch-User": "?1",
    "Upgrade-Insecure-Requests": "1",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}

# 获取命令行参数
if len(sys.argv) != 3:
    print("Usage: python script.py <param1> <param2>")
    sys.exit(1)

param1 = sys.argv[1]
param2 = sys.argv[2]

# 从文件提取域名列表
domain_names = extract_domain_names('searx.txt')

results = []  # 存储所有页面的结果
found_result = False  # 标志变量,用于表示是否已经找到一个可用的域名

for domain in domain_names:
    print("域名不可用,正在更换域名")
    if found_result:
        break  # 如果已经找到一个可用的域名,则停止搜索其他域名
    page = 1
    url = modify_url(f'https://{domain}/search?q=test&language=auto&time_range=&safesearch=0&theme=simple', param1, param2, page)

    # 获取 retVal
    try:
        retVal = fetch_retVal(url, headers)
    except:
        print("域名访问错误,正在更换")

    if retVal is not None:
        found_result = True  # 找到可用的域名,将标志变量设置为True
        while retVal:
            results.extend(retVal)  # 将结果添加到结果列表
            page += 1
            url = modify_url(f'https://{domain}/search?q=test&language=auto&time_range=&safesearch=0&theme=simple', param1, param2, page)
            retVal = fetch_retVal(url, headers)

if found_result:
    # 输出结果为JSON格式
    result_json = json.dumps(results, ensure_ascii=False, indent=4)
    print(result_json)

    # 保存结果到文件
    timestamp = int(time.time())
    output_filename = f'SearxCheckList_{timestamp}.txt'
    with open(output_filename, 'w') as output_file:
        for result in results:
            output_file.write(result + '\n')
    print(f"结果已保存到文件: {output_filename}")
else:
    print("无法获取页面内容")

这个程序的主要功能包括:

  • 构建HTTP请求的请求头信息,以模拟浏览器请求。

  • 从命令行参数中获取用户输入的参数。

  • 从文件中提取域名列表。

  • 遍历域名列表,发送HTTP请求并提取数据。

  • 将提取的数据保存到结果列表中。

  • 最后,将结果以JSON格式输出到控制台,并将结果按行保存到一个文本文件中,文件名带有时间戳以确保唯一性。

第三步:运行程序

现在,我们已经完成了编写程序的工作,可以运行程序来收集数据。在命令行中执行以下命令:

python script.py <param1> <param2>

请替换<param1>和<param2>为你的实际参数。之所以设置两个参数,这是为了配合Sql搜集,懂得都懂。

总结

通过这个示例程序,我们展示了如何使用Python编写一个简单的网络爬虫程序,以从多个网站中抓取数据,并将结果保存到文件中。这是一个强大的工具,可用于许多实际应用,如数据采集、分析和报告生成。希望这篇博客能帮助你开始自己的网络爬虫项目!

扫描二维码推送至手机访问。

版权声明:本文由我的FPGA发布,如需转载请注明出处。

本文链接:http://myfpga.cn/index.php/post/336.html

分享给朋友:

“(原创)通过Python编写网络爬虫程序并保存结果(SearX) 仅用于学习和方法研究” 的相关文章

密码学绪论

密码学绪论

密码学(在密码学(在西欧语文中,源于希腊语kryptós“隐藏的”,和gráphein“书写”)是研究如何隐密地传递信息的学科。在现代特别指对信息以及其传输的数学性研究,常被认为是数学和计算机科学的分支,和信息论也密切相关。著名的密码学者Ron Rivest解释道:“密码学是关于如何在敌人存在的环境...

(原创)基于SQLMAP和Google实现自动挖洞 仅用于学习和方法研究

(原创)基于SQLMAP和Google实现自动挖洞 仅用于学习和方法研究

注意,本程序仅用于学习和方法研究,不得进行实际使用。下面的程序是一个Python脚本,用于执行sqlmap命令的多线程批处理工具。以下是对程序的主要特点和功能的介绍:文件检查和初始化:程序开始时会检查是否存在keywords.txt和type.txt文件。如果这两个文件不存在,程序将输出错误消息并退...

(原创)通过Python编写Xray批量测试脚本 仅用于学习和方法研究

(原创)通过Python编写Xray批量测试脚本 仅用于学习和方法研究

简介: Xray 批量控制工具是一款功能强大的网络扫描工具,旨在帮助安全专业人员发现和评估网络应用程序中的漏洞。本工具结合了高度定制化的扫描功能和多线程处理,使用户能够高效地扫描多个目标并快速获取漏洞信息。在这篇介绍中,我们将深入了解Xray批量控制工具的主要特点和用途。主要特点:多线程扫描: Xr...

(原创)Xray多目标检测控制 Python实现 带继续扫描 目标去重 WebHook上传

(原创)Xray多目标检测控制 Python实现 带继续扫描 目标去重 WebHook上传

Web安全是互联网时代一个非常重要的话题。为了确保网站和应用程序的安全性,经常需要进行漏洞扫描和渗透测试。Xray 是一款强大的开源工具,可以用于自动化漏洞扫描。在本篇博客中,我们将介绍如何使用 Python 编写一个自动化脚本,将 Xray 与 Python 结合起来,以实现对多个目标网站的自动化...

(原创)关于钓鱼邮件的识别和溯源分析  本文仅用于学习和分析

(原创)关于钓鱼邮件的识别和溯源分析 本文仅用于学习和分析

首先我想说明,网络不是法外之地,我奉劝某些人好自为之!一天到晚被攻击,从代码审计到SQL注入,从XSS到扫描....没完没了。是真的服了,我一个搞电子的,建个网站分享知识,还得被迫学习信息安全。。。真的是全方位,不同方面来提高我在信息安全方面的认识和理解。近期收到一个邮件,简单分析发现问题,发件方没...