Python中常用的装饰器@classmethod、@abstractmethod、@property和@staticmethod

在Python编程中,装饰器是一种强大而灵活的工具,可以在不修改源代码的情况下修改函数或类的行为。本文将介绍几个常用的装饰器,包括@classmethod@abstractmethod@property@staticmethod,并提供代码示例,以帮助你更好地理解它们的用法。

@classmethod

@classmethod装饰器用于定义类方法(classmethods)。类方法与普通方法不同,它在类层级上操作,而不是在实例层级上。通过类方法,我们可以直接通过类名调用方法,而无需创建类的实例。

以下是一个使用@classmethod装饰器定义类方法的示例:

class MathUtils:
    @classmethod
    def multiply(cls, a, b):
        return a * b

result = MathUtils.multiply(5, 3)
print(result)  # 输出: 15

在上面的示例中,MathUtils类定义了一个类方法multiply,通过@classmethod装饰器标记。类方法的第一个参数通常被命名为cls,它指向类本身。通过类方法,我们可以直接在类层级上进行操作,而无需实例化类。

@abstractmethod

@abstractmethod装饰器用于定义抽象方法(abstract methods)。抽象方法在基类中声明但没有具体实现,需要在派生类中进行实现。如果一个类中包含抽象方法,那么该类必须声明为抽象类,无法实例化。

以下是一个使用@abstractmethod装饰器定义抽象方法的示例:

from abc import ABC, abstractmethod

class Shape(ABC):
    @abstractmethod
    def area(self):
        pass

class Circle(Shape):
    def __init__(self, radius):
        self.radius = radius
    
    def area(self):
        return 3.14 * self.radius ** 2

# 创建 Circle 对象
circle = Circle(5)
print(circle.area())  # 输出: 78.5

在上面的示例中,Shape类是一个抽象基类,其中包含一个抽象方法area。通过使用@abstractmethod装饰器,我们可以声明area方法为抽象方法,无需提供具体实现。派生类Circle继承了Shape类,并实现了area方法,使其具有特定的功能。

@property

@property装饰器用于将一个类方法转换为只读属性(read-only property)。通过使用@property装饰器,我们可以定义一个特殊的方法,使其在使用点符号访问时,像访问属性一样,而不是通过函数调用。

以下是一个使用@property装饰器定义属性的示例:

class Person:
    def __init__(self, name):
        self._name = name
    
    @property
    def name(self):
        return self._name

# 创建 Person 对象
person = Person("John")
print(person.name)  # 输出: John

在上面的示例中,Person类定义了一个属性name,使用@property装饰器将name方法转换为只读属性。这样,我们可以通过属性方式访问name,而无需显式调用方法。

@staticmethod

@staticmethod装饰器用于定义静态方法(staticmethods)。静态方法在类的命名空间中定义,与类的实例无关,因此不需要通过实例来调用。静态方法可以直接通过类名调用。

以下是一个使用@staticmethod装饰器定义静态方法的示例:

class MathUtils:
    @staticmethod
    def add(a, b):
        return a + b

result = MathUtils.add(5, 3)
print(result)  # 输出: 8

在上面的示例中,MathUtils类定义了一个静态方法add,通过@staticmethod装饰器标记。静态方法可以直接通过类名调用,无需实例化类。

总结

装饰器是Python中强大而灵活的工具,可以优化代码结构、提供额外功能,并提高代码的可读性。本文介绍了@classmethod@abstractmethod@property@staticmethod这几个装饰器的使用方法,并提供了相应的代码示例。

希望通过本文的介绍,你能更好地理解这些装饰器的作用,并在自己的代码中灵活应用它们。

Python设计模式-组合模式

组合模式(Composite Pattern)是一种结构型设计模式,它允许你将对象组合成树形结构来表示“部分-整体”的层次结构。组合模式使得客户端可以统一地处理单个对象和对象组合。

组合模式的结构
组合模式主要包含以下几个角色:

组件(Component):定义对象的接口,并实现一些默认行为。声明一个接口,用于访问和管理Leaf和Composite中的子组件。
叶子(Leaf):代表树的叶子节点,叶子节点没有子节点。
组合(Composite):定义有子部件的那些部件的行为,存储子部件。并在组件接口中实现与子部件有关的操作,如添加、删除等。
组合模式的示例
假设我们有一个图形绘制系统,可以绘制简单的形状如圆和方块,也可以将这些形状组合成复杂的图形。我们可以使用组合模式来实现这一需求。

定义组件

from abc import ABC, abstractmethod

class Graphic(ABC):
    @abstractmethod
    def draw(self):
        pass

    def add(self, graphic):
        raise NotImplementedError("This method is not supported")

    def remove(self, graphic):
        raise NotImplementedError("This method is not supported")

    def get_child(self, index):
        raise NotImplementedError("This method is not supported")

定义叶子

class Circle(Graphic):
    def draw(self):
        print("Drawing a circle")

class Square(Graphic):
    def draw(self):
        print("Drawing a square")


定义组合
class CompositeGraphic(Graphic):
    def __init__(self):
        self.children = []

    def draw(self):
        for child in self.children:
            child.draw()

    def add(self, graphic):
        self.children.append(graphic)

    def remove(self, graphic):
        self.children.remove(graphic)

    def get_child(self, index):
        return self.children[index]

使用组合模式
def main():
    # 创建叶子节点
    circle1 = Circle()
    circle2 = Circle()
    square1 = Square()

    # 创建组合节点
    composite1 = CompositeGraphic()
    composite2 = CompositeGraphic()

    # 组合图形
    composite1.add(circle1)
    composite1.add(circle2)

    composite2.add(square1)
    composite2.add(composite1)

    # 绘制组合图形
    composite2.draw()

if __name__ == "__main__":
    main()

在这个示例中,Graphic是抽象组件类,定义了绘制方法。Circle和Square是叶子类,分别实现了绘制方法。CompositeGraphic是组合类,实现了管理子组件的方法,并重写了绘制方法来递归绘制子组件。客户端通过组合叶子节点和组合节点来创建复杂的图形结构,并统一调用draw方法进行绘制。

组合模式的优缺点
优点
统一处理单个对象和组合对象:组合模式使得客户端可以统一地处理单个对象和对象组合,提高了代码的灵活性和可扩展性。
简化客户端代码:客户端代码可以一致地使用组件接口,而不需要关心处理的是单个对象还是组合对象。
符合开闭原则:可以通过增加新的叶子和组合类来扩展系统,而不需要修改现有代码。
缺点
增加复杂性:组合模式会增加系统中类和对象的数量,可能会使系统变得复杂。
难以限制组合层次:有时需要对组合层次进行限制,但组合模式本身没有提供这样的机制。
组合模式的适用场景
表示部分-整体层次结构:当需要表示对象的部分-整体层次结构时,可以使用组合模式。
统一处理单个对象和组合对象:当需要统一处理单个对象和组合对象时,可以使用组合模式。
构建递归结构:当需要构建递归结构(如树形结构)时,可以使用组合模式。
总结
组合模式是一种结构型设计模式,通过将对象组合成树形结构来表示“部分-整体”的层次结构,使得客户端可以统一地处理单个对象和对象组合。组合模式适用于表示部分-整体层次结构、统一处理单个对象和组合对象以及构建递归结构的场景。合理应用组合模式,可以提高系统的灵活性和可扩展性,简化客户端代码。理解并掌握组合模式,有助于在实际开发中构建高效、灵活的系统。

                        
原文链接:https://blog.csdn.net/weixin_55252589/article/details/139074443

Python设计模式-工厂方法模式

工厂方法模式(Factory Method Pattern)是一种创建型设计模式,它定义了一个用于创建对象的接口,让子类决定实例化哪一个类。工厂方法使得一个类的实例化延迟到其子类。通过使用工厂方法模式,可以将对象的创建过程与使用过程分离,从而提高代码的灵活性和可扩展性。

工厂方法模式的结构
工厂方法模式主要包括以下几个角色:

抽象产品(Product):定义产品的接口。
具体产品(ConcreteProduct):实现抽象产品接口的具体产品类。
抽象工厂(Creator):声明工厂方法,用于返回一个产品对象。可以定义一个工厂方法的默认实现。
具体工厂(ConcreteCreator):实现抽象工厂接口,重定义工厂方法以返回一个具体产品实例。
示例
假设我们有一个日志系统,可以记录日志到控制台或文件。我们可以使用工厂方法模式来实现不同日志记录方式的选择和创建。

定义抽象产品和具体产品

from abc import ABC, abstractmethod

class Logger(ABC):
    @abstractmethod
    def log(self, message: str):
        pass

class ConsoleLogger(Logger):
    def log(self, message: str):
        print(f"Console: {message}")

class FileLogger(Logger):
    def __init__(self, filename: str):
        self.filename = filename

    def log(self, message: str):
        with open(self.filename, 'a') as f:
            f.write(f"File: {message}\n")

定义抽象工厂和具体工厂

class LoggerFactory(ABC):
    @abstractmethod
    def create_logger(self) -> Logger:
        pass

class ConsoleLoggerFactory(LoggerFactory):
    def create_logger(self) -> Logger:
        return ConsoleLogger()

class FileLoggerFactory(LoggerFactory):
    def __init__(self, filename: str):
        self.filename = filename

    def create_logger(self) -> Logger:
        return FileLogger(self.filename)

使用工厂方法模式

def main():
    # 创建控制台日志记录器
    console_factory = ConsoleLoggerFactory()
    console_logger = console_factory.create_logger()
    console_logger.log("This is a console log message.")

    # 创建文件日志记录器
    file_factory = FileLoggerFactory("app.log")
    file_logger = file_factory.create_logger()
    file_logger.log("This is a file log message.")

if __name__ == "__main__":
    main()

在这个示例中,Logger是抽象产品,ConsoleLogger和FileLogger是具体产品。LoggerFactory是抽象工厂,ConsoleLoggerFactory和FileLoggerFactory是具体工厂。通过工厂方法模式,我们可以灵活地选择和创建不同类型的日志记录器,而不需要修改客户端代码。

工厂方法模式的优缺点
优点
遵循开闭原则:可以在不修改现有代码的情况下增加新产品。
提高灵活性:可以根据需要在运行时选择和创建具体的产品。
封装对象创建过程:将对象的创建过程封装在工厂类中,减少了客户端代码的复杂性。
缺点
增加代码复杂性:引入更多的类和接口,增加了代码的复杂性。
难以管理:当产品种类增多时,可能会导致工厂类的数量增加,管理起来较为困难。
工厂方法模式的适用场景
创建对象需要较复杂的过程:对象的创建过程较为复杂,包含多个步骤或涉及多个依赖时,可以使用工厂方法模式。
需要灵活地创建不同类型的对象:根据不同的条件或环境,在运行时选择和创建不同类型的对象。
遵循开闭原则:需要在不修改现有代码的情况下增加新产品。
工厂方法模式与简单工厂模式的区别
简单工厂模式:由一个工厂类负责创建所有产品,工厂类通常包含一个静态方法,根据传入的参数来创建具体产品。简单工厂模式不符合开闭原则。
工厂方法模式:将对象创建的职责分散到多个具体工厂类中,每个具体工厂类负责创建一种具体产品。工厂方法模式符合开闭原则。
总结
工厂方法模式是一种创建型设计模式,通过定义一个用于创建对象的接口,将对象的创建过程延迟到子类,从而提高代码的灵活性和可扩展性。通过使用工厂方法模式,可以在不修改现有代码的情况下增加新产品,减少了代码耦合,提高了系统的可维护性。合理应用工厂方法模式,可以显著提升代码质量和设计水平。

原文链接:https://blog.csdn.net/weixin_55252589/article/details/139072230

Python设计模式-简单工厂模式

简单工厂模式(Simple Factory Pattern)是一种创建型设计模式,它通过专门定义一个工厂类来负责创建其他类的实例,而不是在客户端代码中直接实例化对象。这样可以将对象创建的过程与使用对象的过程分离,提高代码的可维护性和可扩展性。

简单工厂模式的结构
简单工厂模式包含以下角色:

工厂类(Factory):负责创建对象的类。根据不同的条件,实例化并返回不同类型的对象。
产品类(Product):由工厂创建的对象,所有创建的对象应实现相同的接口或继承相同的基类。
客户端(Client):使用工厂类来获取产品对象,而不直接实例化产品类。
示例
假设我们要创建一个简单的工厂类来生成不同类型的动物对象。首先,我们定义一个Animal基类,然后定义两个具体的产品类Dog和Cat,最后创建一个工厂类AnimalFactory来生成这些对象。

定义产品类

class Animal:
    def speak(self):
        pass

class Dog(Animal):
    def speak(self):
        return "Woof!"

class Cat(Animal):
    def speak(self):
        return "Meow!"

定义工厂类

class AnimalFactory:
    @staticmethod
    def create_animal(animal_type: str) -> Animal:
        if animal_type == 'dog':
            return Dog()
        elif animal_type == 'cat':
            return Cat()
        else:
            raise ValueError(f"Unknown animal type: {animal_type}")

使用工厂类

def main():
    factory = AnimalFactory()

    dog = factory.create_animal('dog')
    print(dog.speak())  # 输出:Woof!

    cat = factory.create_animal('cat')
    print(cat.speak())  # 输出:Meow!

    try:
        unknown = factory.create_animal('bird')
    except ValueError as e:
        print(e)  # 输出:Unknown animal type: bird

if __name__ == "__main__":
    main()

简单工厂模式的优缺点
优点
单一职责原则:工厂类负责对象的创建,客户端代码负责使用对象,各自关注自己的职责。
提高可维护性:将对象创建的逻辑集中在一个地方,便于修改和维护。
提高可扩展性:如果需要添加新的产品,只需修改工厂类而不需要修改客户端代码。
缺点
违反开闭原则:每次添加新产品时,都需要修改工厂类,增加了工厂类的复杂性。
单一工厂类过于复杂:随着产品种类的增加,工厂类可能变得臃肿,难以维护。
简单工厂模式的适用场景
对象创建过程复杂:如果对象的创建过程较为复杂,或者需要根据不同条件创建不同类型的对象,可以使用简单工厂模式。
客户端不需要知道具体产品类:客户端只需要使用工厂类来获取对象,不需要了解具体的产品类。
总结
简单工厂模式是一种创建型设计模式,通过定义一个工厂类来创建不同类型的对象,将对象创建的过程与使用对象的过程分离,提高代码的可维护性和可扩展性。尽管简单工厂模式有一些缺点,如违反开闭原则,但在某些场景下仍然非常有用。通过合理使用简单工厂模式,可以有效地简化对象的创建过程,提升代码的质量。

原文链接:https://blog.csdn.net/weixin_55252589/article/details/139070003

Python中类的组合使用-组件

组合:在一个类的属性中调用了另一个类,将另一个类的对象作为数据属性,称为类的组合。

这种组合使用的好处在于,它允许你将复杂的功能分解为更小、更易于管理的部分。每个类可以专注于自己的职责,从而提高代码的可读性和可维护性。

这种模式允许你创建一个对象,这个对象可以与创建它的对象进行交互。这是一种常见的设计模式,特别是在复杂的软件系统中,它允许你将不同的功能组织成不同的类,同时保持这些类之间的协作和通信。

简单来说,这种初始化方式是说:“我(当前类的实例)需要一个组件,所以我创建了一个实例,并且把我自己(self)作为参数传给了它,这样它就可以使用我的功能和数据了。”

再来一个例子

# 1、组合实例
class Ojb_1:
    '''假设Ojb_1是一个装备库类,func_name是其中一件装备,装备后加1000战力。'''
    def __init__(self, agg):
        self.agg = agg

    def func_name(self):
        self.agg += 1000                # 在本身的战力上+1000
        return self.agg                 # 返回最终战力

class Ojb_2:
    '''假设Ojb_2是一个角色类,每个角色都有名称等信息'''
    def __init__(self, name, agg):
        self.name = name                # 角色名称
        self.agg = agg                  # 本身的战力
        self.Ojb_1 = Ojb_1(self.agg)    # 重点在这里,将该人物原来的战力传到装备库类,把自己作为参数传给ojb_1的这个实例,这样他能使用自己的数据

if __name__ == '__main__':
    '''假设广深小龙原来战力=500'''
    r1 = Ojb_2('广深小龙', 500)
    res = r1.Ojb_1.func_name()
    print(res)

①Ojb_1是装备库类,func_name是一件装备,人物装备后会增加1000的战力,self.agg=原人物的战力

②Ojb_2是角色类,有角色的名称与原角色的战力等信息,self.Ojb_1是将原人物的战力先传至装备库,只要人物调用装备了func_name就会在原战力基础上增加1000

下面来一个完整的例子:

class ResearchConductor:
    def __init__(self, researcher):
        self.researcher = researcher

    async def conduct_research(self):
        # 这里使用 researcher 对象的属性和方法来执行研究
        print(f"Starting research on: {self.researcher.query}")
        # 假设这里进行了一些异步研究操作
        # 返回研究结果
        return f"Research results for {self.researcher.query}"


class ContextManager:
    def __init__(self, researcher):
        self.researcher = researcher

    async def get_context(self):
        # 获取研究上下文的逻辑
        return "Context for research"


class ReportGenerator:
    def __init__(self, researcher):
        self.researcher = researcher

    async def generate_report(self, context):
        # 生成报告的逻辑
        return f"Report based on context: {context}"


class Researcher:
    def __init__(self, query, config):
        self.query = query
        self.config = config
        self.research_conductor = ResearchConductor(self)  # 组合使用 ResearchConductor
        self.context_manager = ContextManager(self)  # 组合使用 ContextManager
        self.report_generator = ReportGenerator(self)  # 组合使用 ReportGenerator

    async def perform_research(self):
        # 执行研究流程
        context = await self.context_manager.get_context()  # 从 ContextManager 获取上下文
        research_results = await self.research_conductor.conduct_research()  # 从 ResearchConductor 获取研究结果
        report = await self.report_generator.generate_report(research_results)  # 从 ReportGenerator 生成报告
        return report


# 假设这是主程序
import asyncio

async def main():
    # 创建 Researcher 实例
    researcher = Researcher(query="How to learn Python", config={"setting": "value"})
    # 执行研究并生成报告
    report = await researcher.perform_research()
    print(report)

# 运行主程序
asyncio.run(main())

在这个例子中:

  • Researcher 类是主类,它代表了一个研究者,拥有查询和配置。
  • ResearchConductor 类是 Researcher 的一个组件,负责执行研究任务。
  • ContextManager 类是另一个组件,负责获取研究的上下文。
  • ReportGenerator 类是第三个组件,负责基于研究结果生成报告。

每个组件都接收一个 researcher 对象作为参数,并在它们的构造函数中保存这个引用。这样,每个组件都可以访问主 Researcher 实例的属性和方法。

Researcher 类中的 perform_research 方法展示了如何使用这些组件来执行一个完整的研究流程:获取上下文、进行研究、生成报告。

最后,main 函数创建了一个 Researcher 实例,并调用 perform_research 方法来执行研究流程,并打印出生成的报告。这个例子展示了类的组合如何在实际的异步编程中被用来构建模块化的系统。

理解初始化 def __init__(self, researcher):
self.researcher = researcher 你将 self(即当前 Researcher 实例的引用)传递给 ResearchConductor 的构造函数。这意味着 ResearchConductor 类需要一个参数来初始化,这个参数是 Researcher 的一个实例。在 ResearchConductor 类中,这个参数被赋值给 self.researcher,所以,当你在 Researcher 类中创建 ResearchConductor 的实例时,你是在告诉 ResearchConductor:“嘿,这个 Researcher 实例是你的 researcher 对象。”这样,ResearchConductor 类就可以通过 self.researcher 访问 Researcher 实例的所有属性和方法

再来一个例子

class Engine:
    """这是一个简单的引擎类"""
    def __init__(self, power):
        self.power = power
 
    def start(self):
        print(f"引擎启动,功率:{self.power}")
 
class Car:
    """这是一个简单的汽车类,它组合了Engine类"""
    def __init__(self, make, model, engine):
        self.make = make
        self.model = model
        self.engine = engine  # 引擎作为Car类的一个属性
 
    def start_car(self):
        self.engine.start()  # 使用Engine类的start方法
        print(f"汽车 {self.make} {self.model} 启动了。")
 
# 创建一个Engine实例
engine = Engine(200)
 
# 创建一个Car实例,并使用上面创建的Engine实例
car = Car("Ford", "Mustang", engine)
 
# 启动汽车
car.start_car()

在这个例子中,Engine 类定义了一个引擎的基本属性和方法。Car 类则通过将 Engine 实例作为其属性来组合 Engine 类,从而拥有启动汽车所需的功能。当你创建一个 Car 实例并调用 start_car 方法时,它将使用内部 Engine 实例的 start 方法来输出引擎启动的信息。

Selenium 隐藏浏览器指纹特征的几种方式

对一些做了反爬的网站,做了特征检测,用来阻止一些恶意爬虫

https://bot.sannysoft.com这个网站可以检测指纹特征

本篇文章将介绍几种常用的隐藏浏览器指纹特征的方式

1. 直接爬取

目标对象:

aHR0cHM6Ly9xaWthbi5jcXZpcC5jb20vUWlrYW4vU2VhcmNoL0FkdmFuY2U=

我们使用 Selenium 直接爬取目标页面

# selenium 直接爬取

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import time

chrome_options = Options()

s = Service(r"chromedriver.exe路径")

driver = webdriver.Chrome(service=s, options=chrome_options)

driver.get(url='URL')

driver.save_screenshot('result.png')

# 保存
source = driver.page_source
with open('result.html', 'w') as f:
    f.write(source)

time.sleep(200)

页面明显做了反爬,网页返回直接返回空白内容

图片

2. CDP

CDP 全称为 Chrome Devtools-Protocol

https://chromedevtools.github.io/devtools-protocol

通过执行 CDP 命令,可以在网页加载前运行一段代码,进而改变浏览器的指纹特征

比如,window.navigator.webdriver 在 Selenium 直接打开网页时返回结果为 true;而手动打开网页时,该对象值为 undefined

因此,我们可以利用 CDP 命令修改该对象的值,达到隐藏指纹特征的目的

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import time

chrome_options = Options()

s = Service(r"chromedriver.exe路径")

driver = webdriver.Chrome(service=s, options=chrome_options)

# 执行cdp命令,修改(window.navigator.webdriver )对象的值
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
    "source": """
            Object.defineProperty(navigator, 'webdriver', {
              get: () => undefined
            })
            """
})

driver.get(url='URL')

driver.save_screenshot('result.png')

# 保存
source = driver.page_source
with open('result.html', 'w', encoding='utf-8') as f:
    f.write(source)

time.sleep(200)

需要指出的是,浏览器的指纹特征很多,使用该方法存在一些局限性

3. stealth.min.js

该文件包含了常用的浏览器特征,我们只需要读取该文件,然后执行 CDP 命令即可

下载地址:

https://github.com/berstend/puppeteer-extra/tree/stealth-js
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import time

chrome_options = Options()

# 无头模式
# chrome_options.add_argument("--headless")

# 添加请求头
chrome_options.add_argument(
    'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36')

s = Service(r"chromedriver.exe路径")

driver = webdriver.Chrome(service=s, options=chrome_options)

# 利用stealth.min.js隐藏浏览器指纹特征,先下载,并与文件放同一目录下
# stealth.min.js下载地址:https://github.com/berstend/puppeteer-extra/tree/stealth-js
with open('./stealth.min.js') as f:
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
        "source": f.read()
    })

driver.get(url='URL')
# driver.get(url='https://bot.sannysoft.com/')

# 保存图片
driver.save_screenshot('result.png')

time.sleep(200)

4. undetected_chromedriver

这是一个防止浏览器指纹特征被识别的依赖库,可以自动下载驱动配置再运行

项目地址:

https://github.com/ultrafunkamsterdam/undetected-chromedriver

使用步骤也很方便

首先,我们安装依赖库

# 安装依赖
pip3 install undetected-chromedriver

然后,通过下面几行代码就能完美隐藏浏览器的指纹特征

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import time
import undetected_chromedriver as uc

chrome_options = Options()
# chrome_options.add_argument("--headless")

s = Service(r"chromedriver.exe")

driver = uc.Chrome(service=s, options=chrome_options)

driver.get(url='URL')
# driver.get(url='https://bot.sannysoft.com/')

driver.save_screenshot('result.png')
time.sleep(100)

5. 操作已开启的浏览器

最后一种方式上篇文章已经介绍过

如何利用 Selenium 对已打开的浏览器进行爬虫!

我们只需要通过命令行启动一个浏览器

import subprocess

# 1、打开浏览器
# 指定端口号为:1234
# 配置用户数据路径:--user-data-dir
cmd = 'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe --remote-debugging-port=1234 --user-data-dir="C:\\selenum\\user_data"'

subprocess.run(cmd)

然后,利用 Selenium 直接操作上面的浏览器即可模拟正常操作浏览器的行为

import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service

# 操作上面已经打开的浏览器,进行百度搜索
chrome_options = Options()

# 指定已经打开浏览器的地址及端口号
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:1234")

# 注意:chrome版本与chromedirver驱动要保持一致
# 下载地址:http://chromedriver.storage.googleapis.com/index.html
s = Service(r"chromedriver.exe")

driver = webdriver.Chrome(service=s, options=chrome_options)

# 打开目标网站
driver.get(url="URL")

time.sleep(200)

实操:自动化秒杀抢购

一文读懂QUIC 协议:更快、更稳、更高效的网络通信

原文

你是否也有这样的困扰:打开APP巨耗时、刷剧一直在缓冲、追热搜打不开页面、信号稍微差点就直接加载失败……

如果有一个协议能让你的上网速度,在不需要任何修改的情况下就能提升20%,特别是网络差的环境下能够提升30%以上;如果有一个协议可以让你在WiFi和蜂窝数据切换时,网络完全不断开、直播不卡顿、视频不缓冲;你愿意去了解一下它吗?它就是QUIC协议。本文将从QUIC的背景、原理、实践部署等方面来详细介绍。

一:网络协议栈  

1.1 什么叫网络协议?  

类似于我们生活中签署的合同一样,比如买卖合同是为了约束买卖双方的行为按照合同的要求履行,网络协议是为了约束网络通信过程中各方(客户端、服务端及中间设备)必须按照协议的规定进行通信,它制定了数据包的格式、数据交互的过程等等,网络中的所有设备都必须严格遵守才可以全网互联。

在网络协议栈中,是有分层的,每一层负责不同的事务。我们讨论最多的有三个:应用层、传输层、网络层。应用层主要是针对应用软件进行约束,比如你访问网站需要按照HTTP协议格式和要求进行,你发送电子邮件需要遵守SMTP等邮件协议的格式和要求;传输层主要负责数据包在网络中的传输问题,比如如何保证数据传输的时候的安全性和可靠性、数据包丢了怎么处理;网络层,也叫路由转发层,主要负责数据包从出发地到目的地,应该怎样选择路径才能更快的到达。合理的网络协议能够让用户上网更快!    

1.2 HTTP/3协议  

HTTP/3是第三个主要版本的HTTP协议。与其前任HTTP/1.1和HTTP/2不同,在HTTP/3中,弃用TCP协议,改为使用基于UDP协议的QUIC协议实现。所以,HTTP/3的核心在于QUIC协议。显然,HTTP/3属于应用层协议,而它使用的QUIC协议属于传输层协议。

1.3 我们需要HTTP/3协议吗  

很多人可能都会有这样一个疑问,为什么在 2015 年才标准化了 HTTP/2 ,这么快就需要 HTTP/3?

我们知道,HTTP/2通过引入“流”的概念,实现了多路复用。简单来说,假设你访问某个网站需要请求10个资源,你使用HTTP1.1协议只能串行地发请求,资源1请求成功之后才能发送资源2的请求,以此类推,这个过程是非常耗时的。如果想10个请求并发,不需要串行等待的话,在HTTP1.1中,应用就需要为一个域名同时建立10个TCP连接才行(一般浏览器不允许建立这么多),这无疑是对资源的极大的浪费。HTTP/2的多路复用解决了这一问题,能使多条请求并发。

但现实很残酷,为什么很多业务用了HTTP/2,反倒不如HTTP1.1呢?

第一:多流并发带来了请求优先级的问题,因为有的请求客户端(比如浏览器)希望它能尽快返回,有的请求可以晚点返回;又或者有的请求需要依赖别的请求的资源来展示。流的优先级表示了这个请求被处理的优先级,比如客户端请求的关键的CSS和JS资源是必须高优先级返回的,图片视频等资源可以晚一点响应。         
流的优先级的设置是一个难以平衡或者难以做到公平合理的事情,如果设置稍微不恰当,就会导致有些请求很慢,这在用户看来,就是用了HTTP/2之后,怎么有的请求变慢了。    

第二:HTTP/2解决了HTTP协议层面的队头阻塞,但是TCP的队头阻塞仍然没有解决,所有的流都在一条TCP连接上,如果万一序号小的某个包丢了,那么TCP为了保证到达的有序性,必须等这个包到达后才能滑动窗口,即使后面的序号大的包已经到达了也不能被应用程序读取。这就导致了在多条流并发的时候,某条流的某个包丢了,序号在该包后面的其他流的数据都不能被应用程序读取。这种情况下如果换做HTTP1.1,由于HTTP1.1是多条连接,某个连接上的请求丢包了,并不影响其他连接。所以在丢包比较严重的情况下,HTTP/2整体效果大概率不如HTTP1.1

事实上,我们并不是真的需要新的 HTTP 版本,而是需要对底层传输控制协议(TCP) 进行升级。

1.4 QUIC协议栈  

图片

图0-QUIC协议栈    

QUIC协议实现在用户态,建立在内核态的UDP的基础之上,集成了TCP的可靠传输特性,集成了TLS1.3协议,保证了用户数据传输的安全。

二:QUIC协议的优秀特性  

2.1 建连快  

数据的发送和接收,要想保证安全和可靠,一定是需要连接的。TCP需要,QUIC也同样需要。连接到底是什么?连接是一个通道,是在一个客户端和一个服务端之间的唯一一条可信的通道,主要是为了安全考虑,建立了连接,也就是建立了可信通道,服务器对这个客户端“很放心”,对于服务器来说:你想跟我进行通信,得先让我认识一下你,我得先确认一下你是好人,是有资格跟我通信的。那么这个确认对方身份的过程,就是建立连接的过程。

传统基于TCP的HTTPS的建连过程为什么如此慢?它需要TCP和TLS两个建连过程。如图1所示(传统HTTPS请求流程图):    

图片

图1-传统HTTPS请求流程图

对于一个小请求(用户数据量较小)而言,传输数据只需要1个RTT,但是光建连就花掉了3个RTT,这是非常不划算的,这里建连包括两个过程:TCP建连需要1个RTT,TLS建连需要2个RTT。RTT:Round Trip Time,数据包在网络上一个来回的时间。

为什么需要两个过程?可恶就可恶在这个地方,TCP和TLS没办法合并,因为TCP是在内核里完成的,TLS是在用户态。也许有人会说把干掉内核里的TCP,把TCP挪出来放到用户态,然后就可以和TLS一起处理了。首先,你干不掉内核里的TCP,TCP太古老了,全世界的服务器的TCP都固化在内核里了。所以,既然干不掉TCP,那我不用它了,我再自创一个传输层协议,放到用户态,然后再结合TLS,这样不就可以把两个建连过程合二为一了吗?是的,这就是QUIC。

2.1.1 QUIC的1-RTT建连    

如图2所示,是QUIC的连接建立过程:初次建连只需要1个RTT即可完成建连。后续再次建连就可以使用0-RTT特性

图片

图2-QUIC建连过程图

QUIC的1-RTT建连:客户端与服务端初次建连(之前从未进行通信过),或者长时间没有通信过(0-RTT过期了),只能进行1-RTT建连。只有先进行一次完整的1-RTT建连,后续一段时间内的通信才可以进行0-RTT建连。

如图3所示:QUIC的1-RTT建连可以分成两个部分。QUIC连接信息部分和TLS1.3握手部分。    

图片

图3-QUIC建连抓包

QUIC连接:协商QUIC版本号、协商quic传输参数、生成连接ID、确定Packet Number等信息,类似于TCP的SYN报文;保证通信的两端确认过彼此,是对的人。

TLS1.3握手:标准协议,非对称加密,目的是为了协商出 对称密钥,然后后续传输的数据使用这个对称密钥进行加密和解密,保护数据不被窃取。

我们重点看QUIC的TLS1.3握手过程。    

图片

图4-QUIC的1-RTT握手流程

我们通过图4可以看到,整个握手过程需要 2次握手(第三次握手是带了数据的),所以整个握手过程只需要1-RTT(RTT是指数据包在网络上的一个来回)的时间。

1-RTT的握手主要包含两个过程:

1.客户端发送Client Hello给服务端;

2.服务端回复Server Hello给客户端;

我们通过下图中图5和图6来看Client Hello和Server Hello具体都做了啥:

第一次握手(Client Hello报文)    

图片

图5-Client Hello报文

首先,Client Hello在扩展字段里标明了支持的TLS版本(Supported Version:TLS1.3)。值得注意的是Version字段必须要是TLS1.2,这是因为TLS1.2已经在互联网上存在了10年。网络中大量的网络中间设备都十分老旧,这些网络设备会识别中间的TLS握手头部,所以TLS1.3的出现如果引入了未知的TLS Version 必然会存在大量的握手失败。    

图片

图6-Client Hello报文

其次,ClientHello中包含了非常重要的key_share扩展:客户端在发送之前,会自己根据DHE算法生成一个公私钥对。发送Client Hello报文的时候会把这个公钥发过去,那么这个公钥就存在于key_share中,key_share还包含了客户端所选择的曲线X25519。总之,key_share是客户端提前生成好的公钥信息。

最后,Client Hello里还包括了:客户端支持的算法套、客户端所支持的椭圆曲线以及签名算法、psk的模式等等,一起发给服务端。    

图片

图7-Client Hello报文

第二次握手:(Server Hello报文)    

图片

图8-Server Hello报文

服务端自己根据DHE算法也生成了一个公私钥对,同样的,Key_share扩展信息中也包含了 服务端的公钥信息。服务端通过ServerHello报文将这些信息发送给客户端。

至此为止,双方(客户端服务端)都拿到了对方的公钥信息,然后结合自己的私钥信息,生成pre-master key,在这里官方的叫法是(client_handshake_traffic_secret和server_handshake_traffic_secret),然后根据以下算法进行算出key和iv,使用key和iv对Server Hello之后所有的握手消息进行加密。

注意:在握手完成之后,服务端会发送一个New Session Ticket报文给客户端,这个包非常重要,这是0-RTT实现的基础。    

图片

图9-New Session Ticket报文

2.1.2 QUIC的0-RTT握手  

这个功能类似于TLS1.2的会话复用,或者说0-RTT是基于会话复用功能的。

图片

图10- QUIC的0-RTT流程图

通过上面图10我们可以看到,client和server在建连时,仍然需要两次握手,仍然需要1个rtt,但是为什么我们说这是0-rtt呢,是因为client在发送第一个包client hello时,就带上了数据(HTTP 请求),从什么时候开始发送数据这个角度上来看,的确是0-RTT。

我们通过抓包来看0-RTT的过程:

图片

图11- QUIC的0-RTT抓包

所以真正在实现0-RTT的时候,请求的数据并不会跟Initial报文(内含Client Hello)一起发送,而是单独一个数据包(0-RTT包)进行发送,只不过是跟Initial包同时进行发送而已。

图片

图12- QUIC的0-RTT包    

我们单独看Initial报文发现,除了pre_share_key、early-data标识等信息与1-RTT时不同,其他并无区别。

2.1.3 QUIC建连需要注意的问题  

第一,QUIC实现的时候,必须缓存收到的乱序加密帧,这个缓存至少要大于4096字节。当然可以选择缓存更多的数据,更大的缓存上限意味着可以交换更大的密钥或证书。终端的缓存区大小不必在整个连接生命周期内保持不变。这里记住:乱序帧一定要缓存下来。如果不缓存,会导致连接失败。如果终端的缓存区不够用了,则其可以通过暂时扩大缓存空间确保握手完成。如果终端不扩大其缓存,则其必须以错误码CRYPTO_BUFFER_EXCEEDED关闭连接。

第二,0-RTT存在前向安全问题,请慎用!

2.2连接迁移  

QUIC通过连接ID实现了连接迁移。

我们经常需要在WiFi和4G之间进行切换,比如我们在家里时使用WiFi,出门在路上,切换到4G或5G,到了商场,又连上了商场的WiFi,到了餐厅,又切换到了餐厅的WiFi,所以我们的日常生活中需要经常性的切换网络,那每一次的切换网络,都将导致我们的IP地址发生变化。

传统的TCP协议是以四元组(源IP地址、源端口号、目的ID地址、目的端口号)来标识一条连接,那么一旦四元组的任何一个元素发生了改变,这条连接就会断掉,那么这条连接中正在传输的数据就会断掉,切换到新的网络后可能需要重新去建立连接,然后重新发送数据。这将会导致用户的网络会“卡”一下。    

但是,QUIC不再以四元组作为唯一标识,QUIC使用连接ID来标识一条连接,无论你的网络如何切换,只要连接ID不变,那么这条连接就不会断,这就叫连接迁移!

图片

图13-QUIC连接迁移介绍

2.2.1连接ID  

每条连接拥有一组连接标识符,也就是连接ID,每个连接ID都能标识这条连接。连接ID是由一端独立选择的,每个端(客户端和服务端统称为端)选择连接ID供对端使用。也就是说,客户端生成的连接ID供服务端使用(服务端发送数据时使用客户端生成的连接ID作为目的连接ID),反过来一样的。

连接ID的主要功能是确保底层协议(UDP、IP及更底层的协议栈)发生地址变更(比如IP地址变了,或者端口号变了)时不会导致一个QUIC连接的数据包被传输到错误的QUIC终端(客户端和服务端统称为终端)上。

2.2.2 QUIC的连接迁移过程    

QUIC限制连接迁移为仅客户端可以发起,客户端负责发起所有迁移。如果客户端接收到了一个未知的服务器发来的数据包,那么客户端必须丢弃这些数据包。

如图14所示,连接迁移过程总共需要四个步骤。

1.连接迁移之前,客户端使用IP1和服务端进行通信;

2.客户端IP变成IP2,并且使用IP2发送非探测帧给服务端;

3.启动路径验证(双方都需要互相验证),通过PATH_CHANLLENGE帧和PATH_RESPONSE帧进行验证。

4.验证通过后,使用IP2进行通信。

图片

图14- 连接迁移流程图   

2.3 解决TCP队头阻塞问题  

在HTTP/2中引入了流的概念。目的是实现 多个请求在同一个连接上并发,从而提升网页加载的效率。

图片

图15-QUIC解决TCP队头阻塞问题

由图15来看,假设有两个请求同时发送,红色的是请求1,蓝色的是请求2,这两个请求在两条不同的流中进行传输。假设在传输过程中,请求1的某个数据包丢了,如果是TCP,即使请求2的所有数据包都收到了,但是也只能阻塞在内核缓冲区中,无法交给应用层。但是QUIC就不一样了,请求1的数据包丢了只会阻塞请求1,请求2不会受到阻塞。

有些人不禁发问,不是说HTTP2也有流的概念吗,为什么只有QUIC才能解决呢,这个根本原因就在于,HTTP2的传输层用的TCP,TCP的实现是在内核态的,而流是实现在用户态度,TCP是看不到“流”的,所以在TCP中,它不知道这个数据包是请求1还是请求2的,只会根据seq number来判断包的先后顺序。

2.4 更优的拥塞控制算法  

拥塞控制算法中最重要的一个参数是 RTT,RTT的准确性决定了拥塞控制算法的准确性;然而,TCP的RTT测量往往不准确,QUIC的RTT测量是准确的。    

图片

图16-TCP计算RTT

如图16所示:由于网络中经常出现丢包,需要重传,在TCP协议中,初始包和重传包的序号是一样的,拥塞控制算法进行计算RTT的时候,无法区别是初始包还是重传包,这将导致RTT的计算值要么偏大,要么偏小。

图片

图17-QUIC计算RTT    

如图17所示:QUIC通过Packet Number来标识包的序号,而且规定Packet Number只能单调递增,这也就解决了初始包和重传包的二义性。从而保证RTT的值是准确的。

另外,不同于TCP,QUIC的拥塞控制算法是可插拔的,由于其实现在用户态,服务可以根据不同的业务,甚至不同的连接灵活选择使用不同的拥塞控制算法。(Reno、New Reno、Cubic、BBR等算法都有自己适合的场景)

2.5 QUIC的两级流量控制  

很多人搞不清楚流量控制与拥塞控制的区别。二者有本质上的区别。

流量控制要解决的问题是:接收方控制发送方的数据发送的速度,就是我的接收能力就那么大点,你别发太快了,你发太快了我承受不住,会给你丢掉 你还得重新发。         
拥塞控制要解决的问题是:数据在网络的传输过程中,是否网络有拥塞,是否有丢包,是否有乱序等问题。如果中间传输的时候网络特别卡,数据包丢在中间了,发送方就需要重传,那么怎么判断是否拥塞了,重传要怎么重传法,按照什么算法进行发送数据才能尽可能避免数据包在中间路径丢掉,这是拥塞控制的核心。

所以,流量控制解决的是接收方的接收能力问题,一般采用滑动窗口算法;拥塞控制要解决的是中间传输的时候网络是否拥堵的问题,一般采用慢启动、拥塞避免、拥塞恢复、快速重传等算法。                       

图片

图18-QUIC流量控制

QUIC是双级流控,不仅有连接这一个级别的流控,还有流这个级别的流控。如下图所示,每个流都有自己的可用窗口,可用窗口的大小取决于最大窗口数减去发送出去的最大偏移数,跟中间已经发送出去的数据包,是否按顺序收到了对端的ACK 无关。

3.QUIC协议如何优化  

QUIC协议定义了很多优秀的功能,但是在实现的过程中,我们会遇到很多问题导致无法达到预期的性能,比如0-RTT率很低,连接迁移失败率很高等等。

3.1 QUIC的0-RTT成功率不高  

导致0-RTT成功率不高的原因一般有如下几个:

1.服务端一般都是集群,对于客户端来说,处理请求的服务端是不固定的,新的请求到来时,如果当前client没有请求过该服务器,则服务器上没有相关会话信息,会把该请求当做一个新的连接来处理,重新走1-RTT。    

针对此种情况,我们可以考虑集群中所有的服务器使用相同的ticket文件。

2.客户端IP不是固定的,在发生连接迁移时,服务端下发的token融合了客户端的IP,这个IP变化了的话,携带token服务端校验不过,0-RTT会失败。

针对这个问题,我们可以考虑采用如图19所示的方法,使用设备信息或者APP信息来生成token,唯一标识一个客户端。

图片

图19- 使用设备信息提高0-RTT的成功率

3.Session Ticket过期时间默认是2天,超过2天后就会导致0-RTT失败,然后降级走1-RTT。可以考虑增长过期时间。

3.2 实现连接迁移并不容易  

连接迁移的实现,不可避开的两个问题:一个是四层负载均衡器对连接迁移的影响,一个是七层负载均衡器对连接迁移的影响。

四层负载均衡器的影响:LVS、DPVS等四层负载均衡工具基于四元组进行转发,当连接迁移发生时,四元组会发生变化,该组件就会把同一个请求的数据包发送到不同的后端服务器上,导致连接迁移失败;    

七层负载均衡器的影响(QUIC服务器多核的影响):由于多核的影响,一般服务器会有多个QUIC服务端进程,每个进程负载处理不同的连接。内核收到数据包后,会根据二元组(源IP、源port)选择已经存在的连接,并把数据包交给对应的socket。在连接迁移发生时,源地址发生改变,可能会让接下来的数据包去到不同的进程,影响socket数据的接收。

如何解决以上两个问题?DPVS要想支持QUIC的连接迁移,就不能再以四元组进行转发,需要以连接ID进行转发,需要建立 连接ID与对应的后端服务器的对应关系;

QUIC服务器也是一样的,内核就不能用四元组来进行查找socket,四元组查找不到时,就必须使用连接ID进行查找socket。但是内核代码又不能去修改(不可能去更新所有服务器的内核版本),那么我们可以使用eBPF的方法进行解决。如下图20所示:

图片

图20-多核QUIC服务器解决连接迁移问题    

3.3 UDP被限速或禁闭  

业内统计数据全球有7%地区的运营商对UDP有限速或者禁闭,除了运营商还有很多企业、公共场合也会限制UDP流量甚至禁用UDP。这对使用UDP来承载QUIC协议的场景会带来致命的伤害。对此,我们可以采用多路竞速的方式使用TCP和QUIC同时建连。除了在建连进行竞速以外,还可以对网络QUIC和TCP的传输延时进行实时监控和对比,如果有链路对UDP进行了限速,可以动态从QUIC切换到TCP。

图片

图21-QUIC和TCP协议竞速       

3.4 QUIC对CPU消耗大  

相对于TCP,为什么QUIC更消耗资源?

1.QUIC在用户态实现,需要更多的内核空间与用户空间的数据拷贝和上下文切换;

2.QUIC的ACK报文也是加密的,TCP是明文的。

3.内核知道TCP连接的状态,不用为每一个数据包去做诸如查找目的路由、防火墙规则等操作,只需要在tcp连接建立的时候做一次即可,然而QUIC不行;

总的来说,QUIC服务端消耗CPU的地方主要有三个:密码算法的开销;udp收发包的开销;协议栈的开销;    

针对这些,我们可以适当采取优化措施来:

1.使用Intel硬件加速卡卸载TLS握手

2.开启GSO功能。

3.数据在传输过程中,可以将一轮中所有的ACK解析后再同时进行处理,避免处理大量的ACK。

4.适当将QUIC的包长限制调高(比如从默认的1200调到1400个字节)

5.减少协议栈的内存拷贝

4.QUIC的性能  

从公开的数据来看,国内各个厂(腾讯、阿里、字节、华为、OPPO、网易等等)使用了QUIC协议后,都有很大的提升,比如网易上了QUIC后,响应速度提升45%,请求错误率降低50%;比如字节火山引擎使用QUIC后,建连耗时降低20%~30%;比如腾讯使用QUIC后,在腾讯会议、直播、游戏等场景耗时也降低30%;

总结  

QUIC协议的出现,为HTTP/3奠定了基础。这是近些年在web协议上最大的变革,也是最优秀的一次实践。面对新的协议,我们总是有着各种各样的担忧,诚然,QUIC协议在稳定性上在成熟度上,的确还不如TCP协议,但是经过近几年的发展,成熟度已经相当不错了,Nginx近期也发布了1.25.0版本,支持了QUIC协议。所以面对这样优秀的协议,我们希望更多的公司,更多的业务参与进来使用QUIC,推动QUIC更好的发展,推动用户上网速度更快!         

getattr详解

一、基础用法
getattr() 是 Python 内置的一个函数,可以用来获取一个对象的属性值或方法。其基本语法为:

getattr(object, name[, default])
1
其中,object 是要获取属性值或方法的对象;name 是要获取的属性名或方法名;default 是可选参数,当指定的属性或方法不存在时,会返回 default 的值。

getattr() 可以通过对象实例或类名来获取属性值或方法,也可以获取内置函数、内置类型和标准库中的属性和方法。

下面是一些常见的使用 getattr() 的案例:

获取对象的属性值
class MyClass:
def init(self):
self.x = 1
self.y = 2

obj = MyClass()

print(getattr(obj, ‘x’)) # 输出 1
print(getattr(obj, ‘y’)) # 输出 2

运行运行
获取对象的方法
class MyClass:
def my_method(self):
print(‘Hello, world!’)

obj = MyClass()

method = getattr(obj, ‘my_method’)
method() # 输出 “Hello, world!”

运行运行
获取内置函数和类型
func = getattr(builtins, ‘abs’)
print(func(-1)) # 输出 1

type_name = ‘str’
type_obj = getattr(builtins, type_name)
print(type_obj(‘Hello, world!’)) # 输出 “Hello, world!”

运行运行
获取标准库中的属性和方法
import datetime

now = datetime.datetime.now()

attr_name = ‘year’
attr_value = getattr(now, attr_name)
print(attr_value) # 输出当前年份

method_name = ‘strftime’
method_args = [‘%Y-%m-%d %H:%M:%S’]
method = getattr(now, method_name)
formatted = method(*method_args)
print(formatted) # 输出格式化后的时间字符串,如 “2023-05-06 10:30:00”

运行运行
在实际开发中,getattr() 还可以用于实现动态调用函数或方法的功能,以及在需要处理大量类似属性或方法的代码时,简化代码的编写。

二、复杂案例
getattr() 还可以结合 import_module() 函数,实现动态执行某个文件中某个类的方法的功能。

下面是一个稍微复杂一些的例子。

假设我们有一个 data.py 文件,其中定义了一个 Data 类和一些数据操作方法,代码如下:

data.py

class Data:
def init(self, data):
self.data = data

def get(self):
    return self.data

def add(self, value):
    self.data.append(value)

def remove(self, value):
    self.data.remove(value)

def load_data():
return [1, 2, 3]


运行运行
现在我们需要在另一个模块中,动态加载 data.py 文件,并使用 Data 类的实例对象进行数据操作,同时也需要使用 load_data() 函数进行数据的读操作。

示例代码如下:

import importlib

module_name = ‘data’
class_name = ‘Data’
method_name = ‘get’

module = importlib.import_module(module_name)
class_obj = getattr(module, class_name)(module.load_data())
method = getattr(class_obj, method_name)
print(method()) # 输出 [1, 2, 3]

value = 4
method_name = ‘add’
method = getattr(class_obj, method_name)
method(value)

method_name = ‘get’
method = getattr(class_obj, method_name)
print(method()) # 输出 [1, 2, 3, 4]

value = 2
method_name = ‘remove’
method = getattr(class_obj, method_name)
method(value)

method_name = ‘get’
method = getattr(class_obj, method_name)
print(method()) # 输出 [1, 3, 4]




在上述示例中,我们首先使用 import_module() 函数导入了 data.py 文件,然后使用 getattr() 函数获取了 Data 类的对象。接着,我们使用 () 运算符执行了 get()、add() 和 remove() 方法,从而动态进行了数据操作。

importlib是Python的一个标准库,用于处理模块的导入过程。它提供了对Python导入机制的低级访问,允许你在运行时动态地导入和重载模块。

这个例子中,我们动态地加载了一个 Python 模块,并使用 getattr() 函数获取了模块中的类和函数对象。然后,我们使用这些对象来动态地进行数据操作。这种动态加载和执行的方式,可以使代码更加灵活和可扩展。

三、import_module介绍
import_module() 函数得到的是一个模块对象,即一个表示 Python 模块的对象。模块对象是一个包含模块中所有定义的类、函数、变量等的命名空间,我们可以使用模块对象来访问这些定义。

例如,我们可以使用 module 对象来访问 data.py 文件中定义的 Data 类和 load_data() 函数,代码如下:

import importlib

module_name = ‘data’

module = importlib.import_module(module_name)
data = module.Data([1, 2, 3])
loaded_data = module.load_data()
print(loaded_data)

1
2
3
4
5
6
7
8
9
在上述代码中,我们首先使用 import_module() 函数导入了 data.py 文件,然后通过 module 对象访问了 Data 类和 load_data() 函数。我们还使用 () 运算符创建了 Data 类的实例对象 data,并调用了 load_data() 函数,获取了数据列表 loaded_data。

关于加了括号后相当于调用了相应的方法,这是因为在 Python 中,函数和方法都是可调用对象。我们可以通过使用 () 运算符来调用函数和方法。

例如,在上述代码中,我们使用 () 运算符调用了 Data 类的构造函数和 load_data() 函数,从而创建了 data 对象和获取了 loaded_data 数据。这与直接调用类或函数的方式是相同的,例如:

data = Data([1, 2, 3])
loaded_data = load_data()
print(loaded_data)
1
2
3
在这种情况下,我们直接调用了类和函数,并通过它们来创建了对象和获取了数据,而不需要使用模块对象来访问它们。

原文链接:https://blog.csdn.net/mall_lucy/article/details/130531793

任务队列 ModelQueue

提供一个示例的 model_queue 类的实现,并解释其工作原理。请注意,由于您没有提供 model_queue 的具体实现,我将基于常见的异步任务队列模式进行说明。

示例 model_queue 类

以下是一个可能的 model_queue 类的实现示例

import asyncio

class ModelQueue:
    def __init__(self):
        self.queue = asyncio.Queue()  # 使用 asyncio.Queue 来管理任务

    async def add_task(self, model, task):
        """
        将任务添加到队列并执行。

        Args:
            model: 要使用的模型。
            task: 要执行的异步任务。

        Returns:
            任务的结果。
        """
        # 将任务放入队列
        await self.queue.put((model, task))
        return await self.process_queue()  # 处理队列中的任务

    async def process_queue(self):
        """
        处理队列中的任务。

        Returns:
            任务的结果。
        """
        while not self.queue.empty():
            model, task = await self.queue.get()  # 从队列中获取任务
            try:
                result = await task()  # 执行任务
                return result  # 返回任务的结果
            except Exception as e:
                print(f"Error processing task for model {model}: {e}")
            finally:
                self.queue.task_done()  # 标记任务完成

解释

队列初始化:self.queue = asyncio.Queue():使用 asyncio.Queue 来管理异步任务。这个队列可以存储多个任务,并在需要时逐个处理。

添加任务:async def add_task(self, model, task):这个方法将任务添加到队列中,并调用 process_queue 方法来处理队列中的任务。

await self.queue.put((model, task)):将模型和任务元组放入队列。

处理队列:async def process_queue(self):这个方法处理队列中的任务。

while not self.queue.empty():循环直到队列为空。

model, task = await self.queue.get():从队列中获取任务。

result = await task():执行任务并等待其完成。

self.queue.task_done():标记任务完成。

IP自动切换

未有测试,要用再测试,https://www.xiequ.cn/index.html每天有免费代理

https://englishcode.lanzoul.com/i5B6422srfxi

图片

🎈主要功能和源码

可以识别JSON的API接口,可以获取到IP和端口与地址信息,如果地址信息获取不到也没事,IP和端口一般都能获取到,我用了大概两家的JSON 都是不同的格式的,都可以正常代理 谐趣代理每天免费1000个代理,还可以

可以设置自动切换IP时间间隔 自定义API接口地址

import requests
import json
import os
import subprocess
import time
import sys
import logging
import tkinter as tk
from tkinter import scrolledtext
from threading import Thread, Event

# 设置输出编码为 UTF-8


# 自定义日志处理器
class ScrolledTextHandler(logging.Handler):
    def __init__(self, text_widget):
        super().__init__()
        self.text_widget = text_widget

    def emit(self, record):
        msg = self.format(record)

        def append():
            self.text_widget.insert(tk.END, msg + '\n')
            self.text_widget.see(tk.END)

        self.text_widget.after(0, append)


# 通用函数递归提取代理信息
def extract_proxy_info(data):
    logging.debug(f"解析返回的数据: {data}")

    possible_keys = {
        "ip": ["IP", "ip", "IpAddress", "ip_address"],
        "port": ["Port", "port"],
        "address": ["Address", "address", "Location", "location", "IpAddress", "ip_address"]
    }

    def recursive_extract(data, keys):
        if isinstance(data, dict):
            for key, value in data.items():
                if key in keys:
                    return value
                elif isinstance(value, (dict, list)):
                    result = recursive_extract(value, keys)
                    if result is not None:
                        return result
        elif isinstance(data, list):
            for item in data:
                result = recursive_extract(item, keys)
                if result is not None:
                    return result
        return None

    ip = recursive_extract(data, possible_keys["ip"])
    port = recursive_extract(data, possible_keys["port"])
    address = recursive_extract(data, possible_keys["address"])

    if not ip or not port:
        raise Exception("无法从返回的数据中提取IP和端口")

    return ip, port, address


# 从API获取代理IP和端口
def get_proxy(api_url):
    response = requests.get(api_url)
    response.encoding = 'utf-8'  # 确保正确解析中文

    # 添加详细的日志记录
    logging.debug(f"API返回的原始数据: {response.text}")

    try:
        data = response.json()
        logging.debug(f"API返回的数据: {data}")

        ip, port, address = extract_proxy_info(data)
        logging.info(f"获取到的代理IP: {ip}, 端口: {port}, 地址: {address if address else '未知地址'}")
        return ip, port, address
    except Exception as e:
        logging.error(f"解析数据时发生错误: {e}")
        logging.error(f"无法解析的原始数据: {response.text}")
        raise


# 配置HTTP代理
def set_http_proxy(ip, port):
    os.environ['HTTP_PROXY'] = f"http://{ip}:{port}"
    os.environ['HTTPS_PROXY'] = f"http://{ip}:{port}"


# 关闭HTTP代理
def disable_http_proxy():
    if 'HTTP_PROXY' in os.environ:
        del os.environ['HTTP_PROXY']
    if 'HTTPS_PROXY' in os.environ:
        del os.environ['HTTPS_PROXY']
    logging.info("已关闭Python HTTP代理")


# 设置Windows全局代理
def set_windows_proxy(ip, port):
    proxy = f"{ip}:{port}"
    command_enable = [
        'reg', 'add', 'HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings', '/v', 'ProxyServer',
        '/t', 'REG_SZ', '/d', proxy, '/f'
    ]
    command_enable_proxy = [
        'reg', 'add', 'HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings', '/v', 'ProxyEnable',
        '/t', 'REG_DWORD', '/d', '1', '/f'
    ]
    startupinfo = subprocess.STARTUPINFO()
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    subprocess.run(command_enable, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
    subprocess.run(command_enable_proxy, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
    logging.info("已设置Windows全局代理")


# 关闭Windows全局代理
def disable_windows_proxy():
    command_disable_proxy = [
        'reg', 'add', 'HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings', '/v', 'ProxyEnable',
        '/t', 'REG_DWORD', '/d', '0', '/f'
    ]
    startupinfo = subprocess.STARTUPINFO()
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    subprocess.run(command_disable_proxy, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
    logging.info("已关闭Windows全局代理")


# 访问百度并检查HTTP状态码
def check_baidu():
    url = "http://www.baidu.com"
    try:
        response = requests.get(url)
        logging.info(f"HTTP状态码: {response.status_code}")
        logging.info(f"响应头: {response.headers}")
        if response.status_code == 200:
            logging.info("访问成功")
            return True
        else:
            logging.warning("访问失败: HTTP状态码不为200")
            return False
    except Exception as e:
        logging.error(f"访问失败: {e}")
        return False


def run_proxy(api_url, stop_event, switch_interval):
    try:
        while not stop_event.is_set():
            try:
                disable_windows_proxy()
                disable_http_proxy()
                ip, port, address = get_proxy(api_url)
                set_http_proxy(ip, port)
                if check_baidu():
                    set_windows_proxy(ip, port)
                for _ in range(switch_interval):
                    if stop_event.is_set():
                        break
                    time.sleep(1)
            except Exception as e:
                logging.error(f"发生错误: {e}")
                for _ in range(switch_interval):
                    if stop_event.is_set():
                        break
                    time.sleep(1)
    finally:
        disable_windows_proxy()
        disable_http_proxy()
        logging.info("代理已停止,所有代理设置已清除")


class App:
    def __init__(self, root):
        self.root = root
        self.root.title("IP代理")

        self.api_label = tk.Label(root, text="API地址:")
        self.api_label.grid(row=0, column=0, padx=10, pady=10)

        self.api_entry = tk.Entry(root, width=50)
        self.api_entry.grid(row=0, column=1, padx=10, pady=10)

        self.interval_label = tk.Label(root, text="切换间隔(秒):")
        self.interval_label.grid(row=0, column=2, padx=10, pady=10)

        self.interval_entry = tk.Entry(root, width=10)
        self.interval_entry.insert(0, "10")  # 默认值为10秒
        self.interval_entry.grid(row=0, column=3, padx=10, pady=10)

        self.start_button = tk.Button(root, text="启动代理", command=self.toggle_proxy)
        self.start_button.grid(row=0, column=4, padx=10, pady=10)

        self.console = scrolledtext.ScrolledText(root, width=100, height=30)
        self.console.grid(row=1, column=0, columnspan=5, padx=10, pady=10)

        # 添加自定义日志处理器
        handler = ScrolledTextHandler(self.console)
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger().addHandler(handler)

        # 设置日志记录器的级别
        logging.getLogger().setLevel(logging.DEBUG)

        self.proxy_thread = None
        self.stop_event = Event()

    def toggle_proxy(self):
        if self.proxy_thread is None or not self.proxy_thread.is_alive():
            api_url = self.api_entry.get()
            try:
                switch_interval = int(self.interval_entry.get())
                if api_url and switch_interval > 0:
                    self.stop_event.clear()
                    self.proxy_thread = Thread(target=self.run_proxy_thread, args=(api_url, switch_interval),
                                               daemon=True)
                    self.proxy_thread.start()
                    self.start_button.config(text="停止代理")
                    logging.info("代理已启动")
                else:
                    logging.error("请提供有效的API地址和切换间隔")
            except ValueError:
                logging.error("请提供有效的切换间隔")
        else:
            self.stop_event.set()
            self.root.after(100, self.check_thread_status)

    def check_thread_status(self):
        if self.proxy_thread.is_alive():
            self.root.after(100, self.check_thread_status)
        else:
            self.start_button.config(text="启动代理")
            logging.info("代理已停止")

    def run_proxy_thread(self, api_url, switch_interval):
        run_proxy(api_url, self.stop_event, switch_interval)


if __name__ == "__main__":
    root = tk.Tk()
    app = App(root)
    root.mainloop()