当前位置:首页 > Python > 正文内容

(原创)使用Python对任意网站图片进行爬取,仅用于学习

chanra1n1年前 (2023-05-14)Python2844
import os
import time
import argparse
import requests
import re
import io
from urllib.parse import urljoin
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from PIL import Image
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import ssl

# 设置代理和浏览器UA
proxies = {
    'http': 'http://127.0.0.1:20171',
    'https': 'http://127.0.0.1:20171'
}
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36'
}

# 定义正则表达式,用于匹配图片链接
img_regex = re.compile(r'(http|https)?://[^\s]*\.(jpg|jpeg|png|gif|bmp)')

# 定义函数,用于获取网站中的所有图片链接和页面链接
def get_links(url, timeout=10, use_proxy=False):
    try:
        if use_proxy:
            response = requests.get(url, proxies=proxies, headers=headers, timeout=timeout, verify=False)
        else:
            response = requests.get(url, headers=headers, timeout=timeout, verify=False)
        response.raise_for_status()  # 检查响应状态码,如果不是 200,抛出异常
    except requests.exceptions.RequestException as e:
        print(f"请求 {url} 时出错:{e}")
        return ([], [])

    html = response.text
    soup = BeautifulSoup(html, 'html.parser')
    img_links = []
    page_links = []
    for img in soup.find_all('img'):
        img_links.append(img.get('src'))
    for a in soup.find_all('a', href=True):
        page_links.append(a.get('href'))
    return (img_links, page_links)

# 定义函数,用于下载图片
def download_img(img_url, save_path, timeout=10, use_proxy=False):
    try:
        img_name = os.path.basename(img_url)
        if use_proxy:
            img_data = requests.get(img_url, proxies=proxies, headers=headers, timeout=timeout, verify=False).content
        else:
            img_data = requests.get(img_url, headers=headers, timeout=timeout, verify=False).content
    except requests.exceptions.RequestException as e:
        print(f"下载 {img_url} 时出错:{e}")
        return

    # 校验图片是否完整
    if not is_valid_image(img_data):
        print(f"下载 {img_url} 时出错:图片不完整或者损坏")
        return

    # 获取图片尺寸
    img = Image.open(io.BytesIO(img_data))
    width, height = img.size

    # 过滤掉尺寸小于 224x224 的图片
    if width < 224 or height < 224:
        return

    # 保存图片
    with open(os.path.join(save_path, img_name), 'wb') as f:
        f.write(img_data)

# 定义函数,用于校验图片是否完整
def is_valid_image(img_data):
    try:
        Image.open(io.BytesIO(img_data)).verify()
        return True
    except:
        return False

# 定义函数,用于下载所有页面的图片
def download_all_images(url, save_path, max_depth=3, delay=0.5, timeout=10, use_proxy=False):
    visited_links = set()  # 用集合来保存已经访问过的链接
    download_queue = [(url, 0)]  # 用队列来保存待下载的链接和深度
    page_count = 0  # 记录已经成功访问的页面数量
    img_count = 0  # 记录已经成功下载的图片数量

    # 创建一个 Chrome 浏览器实例
    chrome_options = Options()
    chrome_options.add_argument('--headless')  # 设置无头模式,不显示浏览器窗口
    driver = webdriver.Chrome(options=chrome_options)

    while download_queue:
        url, depth = download_queue.pop(0)
        if depth > max_depth:
            continue
        if url in visited_links:
            continue
        # 使用 selenium 打开页面,让浏览器执行 JavaScript 代码
        try:
            driver.get(url)
            time.sleep(1)
            html = driver.page_source
            soup = BeautifulSoup(html, 'html.parser')
            img_links = []
            for img in soup.find_all('img'):
                img_src = img.get('src')
                if img_src and img_regex.match(img_src):
                    img_links.append(img_src)
        except Exception as e:
            print(f"访问 {url} 时出错:{e}")
            continue

        # 下载当前页面的所有图片
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for img_url in img_links:
                if not img_url.startswith('http'):
                    img_url = urljoin(url, img_url)
                try:
                    download_img(img_url, save_path, timeout=timeout, use_proxy=use_proxy)
                    img_count += 1
                except requests.exceptions.RequestException:
                    download_img(img_url, save_path, timeout=timeout, use_proxy=True)
                    img_count += 1
                futures.append(executor.submit(download_img, img_url, save_path, timeout=timeout, use_proxy=use_proxy))
            for future in as_completed(futures):
                if future.exception() is not None:
                    print(f"下载图片时出错:{future.exception()}")

        # 将当前页面中的所有链接加入待下载队列
        for page_link in set(get_links(url, use_proxy=use_proxy)[1]):
            if not page_link.startswith('http'):
                page_link = urljoin(url, page_link)
            if page_link not in visited_links:
                download_queue.append((page_link, depth + 1))

        visited_links.add(url)
        page_count += 1
        print(f"已成功访问 {page_count} 个页面,已成功下载 {img_count} 张图片")

        # 暂停一段时间,防止访问过快被封 IP
        time.sleep(delay)

    driver.quit()

# 定义函数,用于从 txt 文件中读取要下载图片的网站 URL
def read_urls_from_file(file_path):
    urls = []
    with open(file_path, 'r') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            urls.append(line)
    return urls

# 定义命令行参数
parser = argparse.ArgumentParser(description='Download all images from a website.')
parser.add_argument('-u', '--url', help='The URL of the website to download images from.')
parser.add_argument('-f', '--file', help='The path to a file containing URLs of websites to download images from.')
parser.add_argument('-d', '--depth', type=int, default=3, help='The maximum depth to crawl.')
parser.add_argument('-o', '--output', default='images', help='The output directory for the downloaded images.')
parser.add_argument('-t', '--timeout', type=int, default=10, help='The timeout for requests.')
parser.add_argument('-p', '--proxy', action='store_true', help='Use proxy to download images.')
args = parser.parse_args()

# 读取要下载图片的网站 URL
urls = []
if args.url:
    urls.append(args.url)
elif args.file:
    urls = read_urls_from_file(args.file)
else:
    print('请指定要下载图片的网站 URL 或者包含网站 URL 的文件路径')

# 创建输出目录
if not os.path.exists(args.output):
    os.makedirs(args.output)

# 爬取所有网站中的图片
for url in urls:
    print(f'开始爬取 {url} 中的图片...')
    download_all_images(url, args.output, max_depth=args.depth, timeout=args.timeout, use_proxy=args.proxy)
    print(f'已完成 {url} 中的图片爬取')

使用方法:

python download_images.py -u

也可以使用-d指定爬取深度,默认是3层。

扫描二维码推送至手机访问。

版权声明:本文由我的FPGA发布,如需转载请注明出处。

本文链接:https://myfpga.cn/index.php/post/304.html

分享给朋友:

“(原创)使用Python对任意网站图片进行爬取,仅用于学习” 的相关文章

for循环

for循环

range()函数range(start,end,step)range()函数返回一个可迭代对象(可理解为一个序列,序列中的数包括start,不包括end)例如range(1,101),返回1-100的序列。range(101),范围0-100的序列。range(1,100,2),返回1,3,5.....

  索引运算符【】

索引运算符【】

选择字符串的子序列语法【start:finish】        start:子序列开始位置的索引值        finish:子序列结束位置的下一个字符的索引值如果不提供start或者finish,默认start为第一个字符,finish为最后一个字符。例如>>>my_str=...

顺序查找

顺序查找

如果需要查找某个特定值的位置(以便能够替换或删除它),可以直接使用index方法。searchedValue=100 #values是之前定义好的一个列表 if searchedValue in walues:     pos=values.index(searchedValue)     ...

列表作为函数参数

列表作为函数参数

列表作为函数参数,函数中可以修改原列表def multiply(values,factor):     for i in range(len(values)):        values[i]*=factor          aList=[1,2,3,4,5]  multiply(aL...

anaconda打不开的解决方法

anaconda打不开的解决方法

报错内容Navigator Error An unexpected error occurred on Navigator start-up Report Please report this ...

Python自动清理错误图片,深度学习训练数据集准备

Python自动清理错误图片,深度学习训练数据集准备

使用python运行from PIL import Image from pathlib import Path import os   path = r'.'  ...