当前位置:首页 > Software > Python > 正文内容

(原创)Python图片爬虫:绕过防盗的巧妙解决方案

chanra1n12个月前 (01-05)Python899

1. 程序概述

介绍这个基于Python的图片爬虫工具,它是一个用于学习和测试的代码,旨在帮助用户从指定网站下载图片。该爬虫具有以下特点:

  • 使用Selenium和BeautifulSoup进行网页解析和内容提取。

  • 利用多线程提高图片下载效率。

  • 实现了图片的去重,避免下载相同的图片。

  • 支持设置爬取深度,防止无限递归。

2. 程序结构

该程序主要由以下几个部分组成:

  • ImageCrawler类: 爬虫的核心类,负责启动爬取和处理每个页面的信息。

  • crawl方法: 递归地爬取页面,提取图片链接和页面链接。

  • download_image方法: 下载图片并保存到本地,实现了图片的去重。

  • is_large_enough方法: 判断图片是否足够大,避免下载过小的图片。

  • update_stats方法: 更新爬取统计信息,显示已爬取的图片数量。

3. 程序运行

程序通过Chrome浏览器驱动实现页面加载,支持设置超时时间。使用连接池发送请求,携带请求头信息,确保下载的是目标图片。

4. 日志记录

引入日志记录,记录页面加载超时、下载错误等异常情况,方便用户追踪和解决问题。

5. 优势和不足

优势:
  • 简单易用:用户只需实例化ImageCrawler类并调用start_crawling方法即可开始爬取。

  • 强大的功能:支持多线程、页面深度控制、图片大小过滤等功能,满足不同需求。

  • 可扩展性:用户可以根据实际需求轻松扩展功能或进行定制。

不足:
  • 依赖性:需要安装Selenium、BeautifulSoup等第三方库,并下载对应浏览器的驱动。

  • 可配置性:虽然提供了一些配置选项,但仍有一些硬编码的地方,不够灵活。

6. 使用示例

用户只需在主程序中实例化ImageCrawler类,并调用start_crawling方法即可开始爬取图片。具体使用方法可参考主程序末尾的if __name__ == "__main__":部分。

7. 总结

该图片爬虫工具是一个简单而强大的学习和测试工具,提供了基本的图片爬取功能,同时具备一些高级特性。用户可以根据实际需求灵活调整配置,快速实现定制化的图片爬取任务。

代码

如果你喜欢使用GUI,请使用这个代码,否则请往下翻。

import logging
import threading
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
import hashlib
import os
import re
from PIL import Image, ImageTk
from io import BytesIO
from urllib.parse import urlparse
import tkinter as tk
from tkinter import ttk
import time
from datetime import timedelta

class ImageCrawlerGUI:
    def __init__(self):
        self.root = tk.Tk()
        self.root.title("Image Crawler GUI")

        # Variables for user inputs
        self.base_url_var = tk.StringVar()
        self.base_url_var.set("https://abc.abc")
        self.image_directory_var = tk.StringVar()
        self.image_directory_var.set("downloaded_images")
        self.max_depth_var = tk.IntVar()
        self.max_depth_var.set(5)
        self.page_threads_var = tk.IntVar()
        self.page_threads_var.set(3)
        self.image_threads_var = tk.IntVar()
        self.image_threads_var.set(3)

        # Labels and Entry for user inputs
        tk.Label(self.root, text="Base URL:").grid(row=0, column=0, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.base_url_var, width=50).grid(row=0, column=1, columnspan=5, pady=5)
        tk.Label(self.root, text="Image Directory:").grid(row=1, column=0, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.image_directory_var, width=50).grid(row=1, column=1, columnspan=5, pady=5)
        tk.Label(self.root, text="Max Depth:").grid(row=2, column=0, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.max_depth_var, width=5).grid(row=2, column=1, pady=5)
        tk.Label(self.root, text="Page Threads:").grid(row=2, column=2, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.page_threads_var, width=5).grid(row=2, column=3, pady=5)
        tk.Label(self.root, text="Image Threads:").grid(row=2, column=4, sticky=tk.W, padx=5, pady=5)
        tk.Entry(self.root, textvariable=self.image_threads_var, width=5).grid(row=2, column=5, pady=5)

        # Start button
        tk.Button(self.root, text="Start Crawling", command=self.start_crawling).grid(row=3, column=0, columnspan=6, pady=10)

        # Stats label
        self.stats_label = tk.Label(self.root, text="已爬取图片数: 0", padx=5, pady=5)
        self.stats_label.grid(row=4, column=0, columnspan=6, pady=5)

        # Running time label
        self.time_label = tk.Label(self.root, text="运行时间: 0秒", padx=5, pady=5)
        self.time_label.grid(row=5, column=0, columnspan=6, pady=5)

        self.start_time = 0

    def start_crawling(self):
        self.start_time = time.time()
        self.stats_label.config(text="已爬取图片数: 0")
        self.time_label.config(text="运行时间: 0秒")

        crawler = ImageCrawler(
            base_url=self.base_url_var.get(),
            image_directory=self.image_directory_var.get(),
            max_depth=self.max_depth_var.get(),
            page_threads=self.page_threads_var.get(),
            image_threads=self.image_threads_var.get(),
            debug_mode=True,
            gui=self
        )
        threading.Thread(target=crawler.start_crawling).start()

    def update_stats(self, num_images):
        elapsed_time = time.time() - self.start_time
        elapsed_time_str = str(timedelta(seconds=round(elapsed_time)))
        self.stats_label.config(text=f"已爬取图片数: {num_images}")
        self.time_label.config(text=f"运行时间: {elapsed_time_str}")

class ImageCrawler:
    def __init__(self, base_url, image_directory, max_depth, page_threads, image_threads, debug_mode=True, gui=None):
        self.base_url = base_url
        self.image_directory = image_directory
        self.max_depth = max_depth
        self.page_threads = page_threads
        self.image_threads = image_threads
        self.visited_urls = set()
        self.image_hashes = set()
        self.debug_mode = debug_mode
        self.gui = gui

        logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s')

        self.session = requests.Session()
        retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': self.base_url,
        }

        if not os.path.exists(self.image_directory):
            os.makedirs(self.image_directory)

    def start_crawling(self):
        print("开始爬取...")
        self.crawl(self.base_url, 0)

    def crawl(self, url, depth):
        if depth > self.max_depth or url in self.visited_urls:
            return

        try:
            self.visited_urls.add(url)

            response = self.session.get(url, headers=self.headers, timeout=10)
            if self.debug_mode:
                self.log_request_info(response.request)

            soup = BeautifulSoup(response.text, 'html.parser')

            img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE)
            page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE)

            image_urls = img_pattern.findall(str(soup))
            page_urls = page_pattern.findall(str(soup))

            thread_pool = []

            for img_url in image_urls:
                if not img_url.startswith(("data:image", "javascript")):
                    thread = threading.Thread(target=self.download_image, args=(img_url, url))
                    thread.start()
                    thread_pool.append(thread)

                    # Limit the number of threads
                    if len(thread_pool) >= self.image_threads:
                        for t in thread_pool:
                            t.join()
                        thread_pool = []

            for next_url in page_urls:
                if next_url:
                    base_domain = urlparse(self.base_url).hostname
                    next_domain = urlparse(next_url).hostname

                    if next_domain == base_domain:
                        self.crawl(next_url, depth + 1)

            # Join any remaining threads
            for t in thread_pool:
                t.join()

        except Exception as e:
            error_msg = f"Error crawling {url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def download_image(self, img_url, referer_url):
        try:
            if not img_url.startswith(("http", "https")):
                img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/')

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Referer': referer_url,
            }

            response = self.session.get(img_url, headers=headers, verify=False)
            if self.debug_mode:
                self.log_request_info(response.request)

            image_data = response.content

            if self.is_large_enough(image_data):
                image_hash = hashlib.md5(image_data).hexdigest()
                if image_hash not in self.image_hashes:
                    self.image_hashes.add(image_hash)
                    image_extension = img_url.split('.')[-1]
                    image_name = f"{image_hash}.{image_extension}"
                    image_path = os.path.join(self.image_directory, image_name)

                    threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start()

        except Exception as e:
            error_msg = f"Error downloading image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def _save_image(self, image_data, image_path, img_url):
        try:
            with open(image_path, 'wb') as f:
                f.write(image_data)

            print(f"Downloaded image: {img_url}")
            self.update_stats()

        except Exception as e:
            error_msg = f"Error saving image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def is_large_enough(self, image_data):
        try:
            image = Image.open(BytesIO(image_data))
            width, height = image.size
            return width >= 300 and height >= 300
        except Exception as e:
            print(f"Error checking image dimensions for image data: {str(e)}")
            return False

    def update_stats(self):
        if self.gui:
            num_images = len(self.image_hashes)
            self.gui.update_stats(num_images)

    def log_request_info(self, request):
        print(f"Request URL: {request.url}")
        print("Request Headers:")
        for key, value in request.headers.items():
            print(f"{key}: {value}")
        print("\n")

if __name__ == "__main__":
    gui = ImageCrawlerGUI()
    gui.root.mainloop()

如果你不喜欢使用GUI的,请使用下面的代码

开发版 V2.1
import logging
import threading
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from bs4 import BeautifulSoup
import hashlib
import os
import re
from PIL import Image
from io import BytesIO
from urllib.parse import urlparse

class ImageCrawler:
    def __init__(self, debug_mode=True):
        self.base_url = "https://www.abc.abc"
        self.image_directory = "downloaded_images"
        self.max_depth = 5
        self.visited_urls = set()
        self.image_hashes = set()
        self.debug_mode = debug_mode

        logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s')

        self.session = requests.Session()
        retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Referer': self.base_url,
        }

        if not os.path.exists(self.image_directory):
            os.makedirs(self.image_directory)

    def start_crawling(self):
        print("开始爬取...")
        self.crawl(self.base_url, 0)

    def crawl(self, url, depth):
        if depth > self.max_depth or url in self.visited_urls:
            return

        try:
            self.visited_urls.add(url)

            response = self.session.get(url, headers=self.headers, timeout=10)
            if self.debug_mode:
                self.log_request_info(response.request)

            soup = BeautifulSoup(response.text, 'html.parser')

            img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE)
            page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE)

            image_urls = img_pattern.findall(str(soup))
            page_urls = page_pattern.findall(str(soup))

            for img_url in image_urls:
                if not img_url.startswith(("data:image", "javascript")):
                    self.download_image(img_url, url)

            for next_url in page_urls:
                if next_url:
                    base_domain = urlparse(self.base_url).hostname
                    next_domain = urlparse(next_url).hostname

                    if next_domain == base_domain:
                        self.crawl(next_url, depth + 1)

        except Exception as e:
            error_msg = f"Error crawling {url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def download_image(self, img_url, referer_url):
        try:
            if not img_url.startswith(("http", "https")):
                img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/')

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Referer': referer_url,
            }

            response = self.session.get(img_url, headers=headers, verify=False)
            if self.debug_mode:
                self.log_request_info(response.request)

            image_data = response.content

            if self.is_large_enough(image_data):
                image_hash = hashlib.md5(image_data).hexdigest()
                if image_hash not in self.image_hashes:
                    self.image_hashes.add(image_hash)
                    image_extension = img_url.split('.')[-1]
                    image_name = f"{image_hash}.{image_extension}"
                    image_path = os.path.join(self.image_directory, image_name)

                    threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start()

        except Exception as e:
            error_msg = f"Error downloading image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def _save_image(self, image_data, image_path, img_url):
        try:
            with open(image_path, 'wb') as f:
                f.write(image_data)

            print(f"Downloaded image: {img_url}")
            self.update_stats()

        except Exception as e:
            error_msg = f"Error saving image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def is_large_enough(self, image_data):
        try:
            image = Image.open(BytesIO(image_data))
            width, height = image.size
            return width >= 300 and height >= 300
        except Exception as e:
            print(f"Error checking image dimensions for image data: {str(e)}")
            return False

    def update_stats(self):
        print(f"已爬取图片数: {len(self.image_hashes)}")

    def log_request_info(self, request):
        print(f"Request URL: {request.url}")
        print("Request Headers:")
        for key, value in request.headers.items():
            print(f"{key}: {value}")
        print("\n")

if __name__ == "__main__":
    crawler = ImageCrawler(debug_mode=True)
    crawler.start_crawling()


稳定版 V1.1
import logging
import threading
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from selenium import webdriver
from bs4 import BeautifulSoup
import hashlib
import os
import re
from PIL import Image
from io import BytesIO
from urllib.parse import urlparse

class ImageCrawler:
    def __init__(self):
        self.base_url = "https://www.abc.abc/" #测试链接 
        self.image_directory = "downloaded_images" #下载路径
        self.max_depth = 5
        self.visited_urls = set()
        self.image_hashes = set()

        # 添加日志配置
        logging.basicConfig(filename='crawler_log.txt', level=logging.ERROR, format='%(asctime)s - %(levelname)s: %(message)s')

        # 创建一个带连接池的 Session
        self.session = requests.Session()
        retries = Retry(total=10, backoff_factor=1, status_forcelist=[500, 502, 503, 504])
        self.session.mount('http://', HTTPAdapter(max_retries=retries))
        self.session.mount('https://', HTTPAdapter(max_retries=retries))

    def start_crawling(self):
        print("开始爬取...")
        self.crawl(self.base_url, 0)

    def crawl(self, url, depth):
        if depth > self.max_depth or url in self.visited_urls:
            return

        try:
            self.visited_urls.add(url)
            driver = webdriver.Chrome()
            driver.set_page_load_timeout(10)  # 设置加载超时时间为10秒

            try:
                driver.get(url)
            except Exception as timeout_exception:
                print(f"Page load timed out for {url}: {str(timeout_exception)}")
                logging.error(f"Page load timed out for {url}: {str(timeout_exception)}")
                driver.quit()
                return

            soup = BeautifulSoup(driver.page_source, 'html.parser')
            driver.quit()

            # 使用正则表达式提取图片链接和页面链接
            img_pattern = re.compile(r'(https?://[^\s]+?\.(?:jpg|jpeg|png|gif|bmp|svg|webp))', re.IGNORECASE)
            page_pattern = re.compile(r'href=["\'](https?://[^\s]+?)["\']', re.IGNORECASE)

            image_urls = img_pattern.findall(str(soup))
            page_urls = page_pattern.findall(str(soup))

            for img_url in image_urls:
                if not img_url.startswith(("data:image", "javascript")):
                    self.download_image(img_url,url)

            for next_url in page_urls:
                if next_url:
                    # 提取根域名
                    base_domain = urlparse(self.base_url).hostname
                    next_domain = urlparse(next_url).hostname

                    # 检查根域名是否一致
                    if next_domain == base_domain:
                        self.crawl(next_url, depth + 1)

        except Exception as e:
            error_msg = f"Error crawling {url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def download_image(self, img_url,page_url_now):
        try:
            if not img_url.startswith(("http", "https")):
                img_url = self.base_url.rstrip('/') + '/' + img_url.lstrip('/')

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
                'Referer': page_url_now,
                # Add more headers if needed
            }

            # 使用连接池发送请求,携带请求头信息
            response = self.session.get(img_url, headers=headers, verify=False)
            image_data = response.content

            if self.is_large_enough(image_data):
                image_hash = hashlib.md5(image_data).hexdigest()
                if image_hash not in self.image_hashes:
                    self.image_hashes.add(image_hash)
                    image_extension = img_url.split('.')[-1]  # 获取图片文件扩展名
                    image_name = f"{image_hash}.{image_extension}"
                    image_path = os.path.join(self.image_directory, image_name)

                    threading.Thread(target=self._save_image, args=(image_data, image_path, img_url)).start()

        except Exception as e:
            error_msg = f"Error downloading image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def _save_image(self, image_data, image_path, img_url):
        try:
            with open(image_path, 'wb') as f:
                f.write(image_data)

            print(f"Downloaded image: {img_url}")
            self.update_stats()

        except Exception as e:
            error_msg = f"Error saving image {img_url}: {str(e)}"
            print(error_msg)
            logging.error(error_msg)

    def is_large_enough(self, image_data):
        try:
            image = Image.open(BytesIO(image_data))
            width, height = image.size
            return width >= 300 and height >= 300
        except Exception as e:
            # Handle image processing errors (e.g., non-image content)
            print(f"Error checking image dimensions for image data: {str(e)}")
            return False

    def update_stats(self):
        print(f"已爬取图片数: {len(self.image_hashes)}")

if __name__ == "__main__":
    crawler = ImageCrawler()
    crawler.start_crawling()


扫描二维码推送至手机访问。

版权声明:本文由我的FPGA发布,如需转载请注明出处。

本文链接:http://myfpga.cn/index.php/post/347.html

分享给朋友:

“(原创)Python图片爬虫:绕过防盗的巧妙解决方案” 的相关文章

0.Python环境的搭建

0.Python环境的搭建

   请打开网页  https://www.python.org/downloads/windows/    Windows环境下的Python        这里我选择...

  索引运算符【】

索引运算符【】

选择字符串的子序列语法【start:finish】        start:子序列开始位置的索引值        finish:子序列结束位置的下一个字符的索引值如果不提供start或者finish,默认start为第一个字符,finish为最后一个字符。例如>>>my_str=...

random库

random库

random()            生成一个[0.0,1.0)之间的随机小数randint(a,b)     生成一个[a,b]之间的整数uniform(a,b)     生成一个[a,b]之间的随机小数对random库的引用方法与math库一样,采用下面两种方式实现:import random...

列表实例

列表实例

随机生成100个小写字母存入一个列表中,统计26个字母的出现次数。import random def getRandomLetter():     code_a=ord('a')     code_z=ord('z')     x=random.randint(code_a,code_z)...

(原创)使用Python递归获取网页内的所有URL,并进行清洗

(原创)使用Python递归获取网页内的所有URL,并进行清洗

import argparse import time from urllib.parse import urljoin, urlparse from selenium import webdriver...