Python中类的组合使用-组件

组合:在一个类的属性中调用了另一个类,将另一个类的对象作为数据属性,称为类的组合。

这种组合使用的好处在于,它允许你将复杂的功能分解为更小、更易于管理的部分。每个类可以专注于自己的职责,从而提高代码的可读性和可维护性。

这种模式允许你创建一个对象,这个对象可以与创建它的对象进行交互。这是一种常见的设计模式,特别是在复杂的软件系统中,它允许你将不同的功能组织成不同的类,同时保持这些类之间的协作和通信。

简单来说,这种初始化方式是说:“我(当前类的实例)需要一个组件,所以我创建了一个实例,并且把我自己(self)作为参数传给了它,这样它就可以使用我的功能和数据了。”

再来一个例子

# 1、组合实例
class Ojb_1:
    '''假设Ojb_1是一个装备库类,func_name是其中一件装备,装备后加1000战力。'''
    def __init__(self, agg):
        self.agg = agg

    def func_name(self):
        self.agg += 1000                # 在本身的战力上+1000
        return self.agg                 # 返回最终战力

class Ojb_2:
    '''假设Ojb_2是一个角色类,每个角色都有名称等信息'''
    def __init__(self, name, agg):
        self.name = name                # 角色名称
        self.agg = agg                  # 本身的战力
        self.Ojb_1 = Ojb_1(self.agg)    # 重点在这里,将该人物原来的战力传到装备库类,把自己作为参数传给ojb_1的这个实例,这样他能使用自己的数据

if __name__ == '__main__':
    '''假设广深小龙原来战力=500'''
    r1 = Ojb_2('广深小龙', 500)
    res = r1.Ojb_1.func_name()
    print(res)

①Ojb_1是装备库类,func_name是一件装备,人物装备后会增加1000的战力,self.agg=原人物的战力

②Ojb_2是角色类,有角色的名称与原角色的战力等信息,self.Ojb_1是将原人物的战力先传至装备库,只要人物调用装备了func_name就会在原战力基础上增加1000

下面来一个完整的例子:

class ResearchConductor:
    def __init__(self, researcher):
        self.researcher = researcher

    async def conduct_research(self):
        # 这里使用 researcher 对象的属性和方法来执行研究
        print(f"Starting research on: {self.researcher.query}")
        # 假设这里进行了一些异步研究操作
        # 返回研究结果
        return f"Research results for {self.researcher.query}"


class ContextManager:
    def __init__(self, researcher):
        self.researcher = researcher

    async def get_context(self):
        # 获取研究上下文的逻辑
        return "Context for research"


class ReportGenerator:
    def __init__(self, researcher):
        self.researcher = researcher

    async def generate_report(self, context):
        # 生成报告的逻辑
        return f"Report based on context: {context}"


class Researcher:
    def __init__(self, query, config):
        self.query = query
        self.config = config
        self.research_conductor = ResearchConductor(self)  # 组合使用 ResearchConductor
        self.context_manager = ContextManager(self)  # 组合使用 ContextManager
        self.report_generator = ReportGenerator(self)  # 组合使用 ReportGenerator

    async def perform_research(self):
        # 执行研究流程
        context = await self.context_manager.get_context()  # 从 ContextManager 获取上下文
        research_results = await self.research_conductor.conduct_research()  # 从 ResearchConductor 获取研究结果
        report = await self.report_generator.generate_report(research_results)  # 从 ReportGenerator 生成报告
        return report


# 假设这是主程序
import asyncio

async def main():
    # 创建 Researcher 实例
    researcher = Researcher(query="How to learn Python", config={"setting": "value"})
    # 执行研究并生成报告
    report = await researcher.perform_research()
    print(report)

# 运行主程序
asyncio.run(main())

在这个例子中:

  • Researcher 类是主类,它代表了一个研究者,拥有查询和配置。
  • ResearchConductor 类是 Researcher 的一个组件,负责执行研究任务。
  • ContextManager 类是另一个组件,负责获取研究的上下文。
  • ReportGenerator 类是第三个组件,负责基于研究结果生成报告。

每个组件都接收一个 researcher 对象作为参数,并在它们的构造函数中保存这个引用。这样,每个组件都可以访问主 Researcher 实例的属性和方法。

Researcher 类中的 perform_research 方法展示了如何使用这些组件来执行一个完整的研究流程:获取上下文、进行研究、生成报告。

最后,main 函数创建了一个 Researcher 实例,并调用 perform_research 方法来执行研究流程,并打印出生成的报告。这个例子展示了类的组合如何在实际的异步编程中被用来构建模块化的系统。

理解初始化 def __init__(self, researcher):
self.researcher = researcher 你将 self(即当前 Researcher 实例的引用)传递给 ResearchConductor 的构造函数。这意味着 ResearchConductor 类需要一个参数来初始化,这个参数是 Researcher 的一个实例。在 ResearchConductor 类中,这个参数被赋值给 self.researcher,所以,当你在 Researcher 类中创建 ResearchConductor 的实例时,你是在告诉 ResearchConductor:“嘿,这个 Researcher 实例是你的 researcher 对象。”这样,ResearchConductor 类就可以通过 self.researcher 访问 Researcher 实例的所有属性和方法

再来一个例子

class Engine:
    """这是一个简单的引擎类"""
    def __init__(self, power):
        self.power = power
 
    def start(self):
        print(f"引擎启动,功率:{self.power}")
 
class Car:
    """这是一个简单的汽车类,它组合了Engine类"""
    def __init__(self, make, model, engine):
        self.make = make
        self.model = model
        self.engine = engine  # 引擎作为Car类的一个属性
 
    def start_car(self):
        self.engine.start()  # 使用Engine类的start方法
        print(f"汽车 {self.make} {self.model} 启动了。")
 
# 创建一个Engine实例
engine = Engine(200)
 
# 创建一个Car实例,并使用上面创建的Engine实例
car = Car("Ford", "Mustang", engine)
 
# 启动汽车
car.start_car()

在这个例子中,Engine 类定义了一个引擎的基本属性和方法。Car 类则通过将 Engine 实例作为其属性来组合 Engine 类,从而拥有启动汽车所需的功能。当你创建一个 Car 实例并调用 start_car 方法时,它将使用内部 Engine 实例的 start 方法来输出引擎启动的信息。

Selenium 隐藏浏览器指纹特征的几种方式

对一些做了反爬的网站,做了特征检测,用来阻止一些恶意爬虫

https://bot.sannysoft.com这个网站可以检测指纹特征

本篇文章将介绍几种常用的隐藏浏览器指纹特征的方式

1. 直接爬取

目标对象:

aHR0cHM6Ly9xaWthbi5jcXZpcC5jb20vUWlrYW4vU2VhcmNoL0FkdmFuY2U=

我们使用 Selenium 直接爬取目标页面

# selenium 直接爬取

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import time

chrome_options = Options()

s = Service(r"chromedriver.exe路径")

driver = webdriver.Chrome(service=s, options=chrome_options)

driver.get(url='URL')

driver.save_screenshot('result.png')

# 保存
source = driver.page_source
with open('result.html', 'w') as f:
    f.write(source)

time.sleep(200)

页面明显做了反爬,网页返回直接返回空白内容

图片

2. CDP

CDP 全称为 Chrome Devtools-Protocol

https://chromedevtools.github.io/devtools-protocol

通过执行 CDP 命令,可以在网页加载前运行一段代码,进而改变浏览器的指纹特征

比如,window.navigator.webdriver 在 Selenium 直接打开网页时返回结果为 true;而手动打开网页时,该对象值为 undefined

因此,我们可以利用 CDP 命令修改该对象的值,达到隐藏指纹特征的目的

from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import time

chrome_options = Options()

s = Service(r"chromedriver.exe路径")

driver = webdriver.Chrome(service=s, options=chrome_options)

# 执行cdp命令,修改(window.navigator.webdriver )对象的值
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
    "source": """
            Object.defineProperty(navigator, 'webdriver', {
              get: () => undefined
            })
            """
})

driver.get(url='URL')

driver.save_screenshot('result.png')

# 保存
source = driver.page_source
with open('result.html', 'w', encoding='utf-8') as f:
    f.write(source)

time.sleep(200)

需要指出的是,浏览器的指纹特征很多,使用该方法存在一些局限性

3. stealth.min.js

该文件包含了常用的浏览器特征,我们只需要读取该文件,然后执行 CDP 命令即可

下载地址:

https://github.com/berstend/puppeteer-extra/tree/stealth-js
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.common.by import By
import time

chrome_options = Options()

# 无头模式
# chrome_options.add_argument("--headless")

# 添加请求头
chrome_options.add_argument(
    'user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36')

s = Service(r"chromedriver.exe路径")

driver = webdriver.Chrome(service=s, options=chrome_options)

# 利用stealth.min.js隐藏浏览器指纹特征,先下载,并与文件放同一目录下
# stealth.min.js下载地址:https://github.com/berstend/puppeteer-extra/tree/stealth-js
with open('./stealth.min.js') as f:
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
        "source": f.read()
    })

driver.get(url='URL')
# driver.get(url='https://bot.sannysoft.com/')

# 保存图片
driver.save_screenshot('result.png')

time.sleep(200)

4. undetected_chromedriver

这是一个防止浏览器指纹特征被识别的依赖库,可以自动下载驱动配置再运行

项目地址:

https://github.com/ultrafunkamsterdam/undetected-chromedriver

使用步骤也很方便

首先,我们安装依赖库

# 安装依赖
pip3 install undetected-chromedriver

然后,通过下面几行代码就能完美隐藏浏览器的指纹特征

from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
import time
import undetected_chromedriver as uc

chrome_options = Options()
# chrome_options.add_argument("--headless")

s = Service(r"chromedriver.exe")

driver = uc.Chrome(service=s, options=chrome_options)

driver.get(url='URL')
# driver.get(url='https://bot.sannysoft.com/')

driver.save_screenshot('result.png')
time.sleep(100)

5. 操作已开启的浏览器

最后一种方式上篇文章已经介绍过

如何利用 Selenium 对已打开的浏览器进行爬虫!

我们只需要通过命令行启动一个浏览器

import subprocess

# 1、打开浏览器
# 指定端口号为:1234
# 配置用户数据路径:--user-data-dir
cmd = 'C:\\Program Files\\Google\\Chrome\\Application\\chrome.exe --remote-debugging-port=1234 --user-data-dir="C:\\selenum\\user_data"'

subprocess.run(cmd)

然后,利用 Selenium 直接操作上面的浏览器即可模拟正常操作浏览器的行为

import time
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service

# 操作上面已经打开的浏览器,进行百度搜索
chrome_options = Options()

# 指定已经打开浏览器的地址及端口号
chrome_options.add_experimental_option("debuggerAddress", "127.0.0.1:1234")

# 注意:chrome版本与chromedirver驱动要保持一致
# 下载地址:http://chromedriver.storage.googleapis.com/index.html
s = Service(r"chromedriver.exe")

driver = webdriver.Chrome(service=s, options=chrome_options)

# 打开目标网站
driver.get(url="URL")

time.sleep(200)

实操:自动化秒杀抢购

getattr详解

一、基础用法
getattr() 是 Python 内置的一个函数,可以用来获取一个对象的属性值或方法。其基本语法为:

getattr(object, name[, default])
1
其中,object 是要获取属性值或方法的对象;name 是要获取的属性名或方法名;default 是可选参数,当指定的属性或方法不存在时,会返回 default 的值。

getattr() 可以通过对象实例或类名来获取属性值或方法,也可以获取内置函数、内置类型和标准库中的属性和方法。

下面是一些常见的使用 getattr() 的案例:

获取对象的属性值
class MyClass:
def init(self):
self.x = 1
self.y = 2

obj = MyClass()

print(getattr(obj, ‘x’)) # 输出 1
print(getattr(obj, ‘y’)) # 输出 2

运行运行
获取对象的方法
class MyClass:
def my_method(self):
print(‘Hello, world!’)

obj = MyClass()

method = getattr(obj, ‘my_method’)
method() # 输出 “Hello, world!”

运行运行
获取内置函数和类型
func = getattr(builtins, ‘abs’)
print(func(-1)) # 输出 1

type_name = ‘str’
type_obj = getattr(builtins, type_name)
print(type_obj(‘Hello, world!’)) # 输出 “Hello, world!”

运行运行
获取标准库中的属性和方法
import datetime

now = datetime.datetime.now()

attr_name = ‘year’
attr_value = getattr(now, attr_name)
print(attr_value) # 输出当前年份

method_name = ‘strftime’
method_args = [‘%Y-%m-%d %H:%M:%S’]
method = getattr(now, method_name)
formatted = method(*method_args)
print(formatted) # 输出格式化后的时间字符串,如 “2023-05-06 10:30:00”

运行运行
在实际开发中,getattr() 还可以用于实现动态调用函数或方法的功能,以及在需要处理大量类似属性或方法的代码时,简化代码的编写。

二、复杂案例
getattr() 还可以结合 import_module() 函数,实现动态执行某个文件中某个类的方法的功能。

下面是一个稍微复杂一些的例子。

假设我们有一个 data.py 文件,其中定义了一个 Data 类和一些数据操作方法,代码如下:

data.py

class Data:
def init(self, data):
self.data = data

def get(self):
    return self.data

def add(self, value):
    self.data.append(value)

def remove(self, value):
    self.data.remove(value)

def load_data():
return [1, 2, 3]


运行运行
现在我们需要在另一个模块中,动态加载 data.py 文件,并使用 Data 类的实例对象进行数据操作,同时也需要使用 load_data() 函数进行数据的读操作。

示例代码如下:

import importlib

module_name = ‘data’
class_name = ‘Data’
method_name = ‘get’

module = importlib.import_module(module_name)
class_obj = getattr(module, class_name)(module.load_data())
method = getattr(class_obj, method_name)
print(method()) # 输出 [1, 2, 3]

value = 4
method_name = ‘add’
method = getattr(class_obj, method_name)
method(value)

method_name = ‘get’
method = getattr(class_obj, method_name)
print(method()) # 输出 [1, 2, 3, 4]

value = 2
method_name = ‘remove’
method = getattr(class_obj, method_name)
method(value)

method_name = ‘get’
method = getattr(class_obj, method_name)
print(method()) # 输出 [1, 3, 4]




在上述示例中,我们首先使用 import_module() 函数导入了 data.py 文件,然后使用 getattr() 函数获取了 Data 类的对象。接着,我们使用 () 运算符执行了 get()、add() 和 remove() 方法,从而动态进行了数据操作。

importlib是Python的一个标准库,用于处理模块的导入过程。它提供了对Python导入机制的低级访问,允许你在运行时动态地导入和重载模块。

这个例子中,我们动态地加载了一个 Python 模块,并使用 getattr() 函数获取了模块中的类和函数对象。然后,我们使用这些对象来动态地进行数据操作。这种动态加载和执行的方式,可以使代码更加灵活和可扩展。

三、import_module介绍
import_module() 函数得到的是一个模块对象,即一个表示 Python 模块的对象。模块对象是一个包含模块中所有定义的类、函数、变量等的命名空间,我们可以使用模块对象来访问这些定义。

例如,我们可以使用 module 对象来访问 data.py 文件中定义的 Data 类和 load_data() 函数,代码如下:

import importlib

module_name = ‘data’

module = importlib.import_module(module_name)
data = module.Data([1, 2, 3])
loaded_data = module.load_data()
print(loaded_data)

1
2
3
4
5
6
7
8
9
在上述代码中,我们首先使用 import_module() 函数导入了 data.py 文件,然后通过 module 对象访问了 Data 类和 load_data() 函数。我们还使用 () 运算符创建了 Data 类的实例对象 data,并调用了 load_data() 函数,获取了数据列表 loaded_data。

关于加了括号后相当于调用了相应的方法,这是因为在 Python 中,函数和方法都是可调用对象。我们可以通过使用 () 运算符来调用函数和方法。

例如,在上述代码中,我们使用 () 运算符调用了 Data 类的构造函数和 load_data() 函数,从而创建了 data 对象和获取了 loaded_data 数据。这与直接调用类或函数的方式是相同的,例如:

data = Data([1, 2, 3])
loaded_data = load_data()
print(loaded_data)
1
2
3
在这种情况下,我们直接调用了类和函数,并通过它们来创建了对象和获取了数据,而不需要使用模块对象来访问它们。

原文链接:https://blog.csdn.net/mall_lucy/article/details/130531793

任务队列 ModelQueue

提供一个示例的 model_queue 类的实现,并解释其工作原理。请注意,由于您没有提供 model_queue 的具体实现,我将基于常见的异步任务队列模式进行说明。

示例 model_queue 类

以下是一个可能的 model_queue 类的实现示例

import asyncio

class ModelQueue:
    def __init__(self):
        self.queue = asyncio.Queue()  # 使用 asyncio.Queue 来管理任务

    async def add_task(self, model, task):
        """
        将任务添加到队列并执行。

        Args:
            model: 要使用的模型。
            task: 要执行的异步任务。

        Returns:
            任务的结果。
        """
        # 将任务放入队列
        await self.queue.put((model, task))
        return await self.process_queue()  # 处理队列中的任务

    async def process_queue(self):
        """
        处理队列中的任务。

        Returns:
            任务的结果。
        """
        while not self.queue.empty():
            model, task = await self.queue.get()  # 从队列中获取任务
            try:
                result = await task()  # 执行任务
                return result  # 返回任务的结果
            except Exception as e:
                print(f"Error processing task for model {model}: {e}")
            finally:
                self.queue.task_done()  # 标记任务完成

解释

队列初始化:self.queue = asyncio.Queue():使用 asyncio.Queue 来管理异步任务。这个队列可以存储多个任务,并在需要时逐个处理。

添加任务:async def add_task(self, model, task):这个方法将任务添加到队列中,并调用 process_queue 方法来处理队列中的任务。

await self.queue.put((model, task)):将模型和任务元组放入队列。

处理队列:async def process_queue(self):这个方法处理队列中的任务。

while not self.queue.empty():循环直到队列为空。

model, task = await self.queue.get():从队列中获取任务。

result = await task():执行任务并等待其完成。

self.queue.task_done():标记任务完成。

IP自动切换

未有测试,要用再测试,https://www.xiequ.cn/index.html每天有免费代理

https://englishcode.lanzoul.com/i5B6422srfxi

图片

🎈主要功能和源码

可以识别JSON的API接口,可以获取到IP和端口与地址信息,如果地址信息获取不到也没事,IP和端口一般都能获取到,我用了大概两家的JSON 都是不同的格式的,都可以正常代理 谐趣代理每天免费1000个代理,还可以

可以设置自动切换IP时间间隔 自定义API接口地址

import requests
import json
import os
import subprocess
import time
import sys
import logging
import tkinter as tk
from tkinter import scrolledtext
from threading import Thread, Event

# 设置输出编码为 UTF-8


# 自定义日志处理器
class ScrolledTextHandler(logging.Handler):
    def __init__(self, text_widget):
        super().__init__()
        self.text_widget = text_widget

    def emit(self, record):
        msg = self.format(record)

        def append():
            self.text_widget.insert(tk.END, msg + '\n')
            self.text_widget.see(tk.END)

        self.text_widget.after(0, append)


# 通用函数递归提取代理信息
def extract_proxy_info(data):
    logging.debug(f"解析返回的数据: {data}")

    possible_keys = {
        "ip": ["IP", "ip", "IpAddress", "ip_address"],
        "port": ["Port", "port"],
        "address": ["Address", "address", "Location", "location", "IpAddress", "ip_address"]
    }

    def recursive_extract(data, keys):
        if isinstance(data, dict):
            for key, value in data.items():
                if key in keys:
                    return value
                elif isinstance(value, (dict, list)):
                    result = recursive_extract(value, keys)
                    if result is not None:
                        return result
        elif isinstance(data, list):
            for item in data:
                result = recursive_extract(item, keys)
                if result is not None:
                    return result
        return None

    ip = recursive_extract(data, possible_keys["ip"])
    port = recursive_extract(data, possible_keys["port"])
    address = recursive_extract(data, possible_keys["address"])

    if not ip or not port:
        raise Exception("无法从返回的数据中提取IP和端口")

    return ip, port, address


# 从API获取代理IP和端口
def get_proxy(api_url):
    response = requests.get(api_url)
    response.encoding = 'utf-8'  # 确保正确解析中文

    # 添加详细的日志记录
    logging.debug(f"API返回的原始数据: {response.text}")

    try:
        data = response.json()
        logging.debug(f"API返回的数据: {data}")

        ip, port, address = extract_proxy_info(data)
        logging.info(f"获取到的代理IP: {ip}, 端口: {port}, 地址: {address if address else '未知地址'}")
        return ip, port, address
    except Exception as e:
        logging.error(f"解析数据时发生错误: {e}")
        logging.error(f"无法解析的原始数据: {response.text}")
        raise


# 配置HTTP代理
def set_http_proxy(ip, port):
    os.environ['HTTP_PROXY'] = f"http://{ip}:{port}"
    os.environ['HTTPS_PROXY'] = f"http://{ip}:{port}"


# 关闭HTTP代理
def disable_http_proxy():
    if 'HTTP_PROXY' in os.environ:
        del os.environ['HTTP_PROXY']
    if 'HTTPS_PROXY' in os.environ:
        del os.environ['HTTPS_PROXY']
    logging.info("已关闭Python HTTP代理")


# 设置Windows全局代理
def set_windows_proxy(ip, port):
    proxy = f"{ip}:{port}"
    command_enable = [
        'reg', 'add', 'HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings', '/v', 'ProxyServer',
        '/t', 'REG_SZ', '/d', proxy, '/f'
    ]
    command_enable_proxy = [
        'reg', 'add', 'HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings', '/v', 'ProxyEnable',
        '/t', 'REG_DWORD', '/d', '1', '/f'
    ]
    startupinfo = subprocess.STARTUPINFO()
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    subprocess.run(command_enable, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
    subprocess.run(command_enable_proxy, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
    logging.info("已设置Windows全局代理")


# 关闭Windows全局代理
def disable_windows_proxy():
    command_disable_proxy = [
        'reg', 'add', 'HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Internet Settings', '/v', 'ProxyEnable',
        '/t', 'REG_DWORD', '/d', '0', '/f'
    ]
    startupinfo = subprocess.STARTUPINFO()
    startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
    subprocess.run(command_disable_proxy, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, startupinfo=startupinfo)
    logging.info("已关闭Windows全局代理")


# 访问百度并检查HTTP状态码
def check_baidu():
    url = "http://www.baidu.com"
    try:
        response = requests.get(url)
        logging.info(f"HTTP状态码: {response.status_code}")
        logging.info(f"响应头: {response.headers}")
        if response.status_code == 200:
            logging.info("访问成功")
            return True
        else:
            logging.warning("访问失败: HTTP状态码不为200")
            return False
    except Exception as e:
        logging.error(f"访问失败: {e}")
        return False


def run_proxy(api_url, stop_event, switch_interval):
    try:
        while not stop_event.is_set():
            try:
                disable_windows_proxy()
                disable_http_proxy()
                ip, port, address = get_proxy(api_url)
                set_http_proxy(ip, port)
                if check_baidu():
                    set_windows_proxy(ip, port)
                for _ in range(switch_interval):
                    if stop_event.is_set():
                        break
                    time.sleep(1)
            except Exception as e:
                logging.error(f"发生错误: {e}")
                for _ in range(switch_interval):
                    if stop_event.is_set():
                        break
                    time.sleep(1)
    finally:
        disable_windows_proxy()
        disable_http_proxy()
        logging.info("代理已停止,所有代理设置已清除")


class App:
    def __init__(self, root):
        self.root = root
        self.root.title("IP代理")

        self.api_label = tk.Label(root, text="API地址:")
        self.api_label.grid(row=0, column=0, padx=10, pady=10)

        self.api_entry = tk.Entry(root, width=50)
        self.api_entry.grid(row=0, column=1, padx=10, pady=10)

        self.interval_label = tk.Label(root, text="切换间隔(秒):")
        self.interval_label.grid(row=0, column=2, padx=10, pady=10)

        self.interval_entry = tk.Entry(root, width=10)
        self.interval_entry.insert(0, "10")  # 默认值为10秒
        self.interval_entry.grid(row=0, column=3, padx=10, pady=10)

        self.start_button = tk.Button(root, text="启动代理", command=self.toggle_proxy)
        self.start_button.grid(row=0, column=4, padx=10, pady=10)

        self.console = scrolledtext.ScrolledText(root, width=100, height=30)
        self.console.grid(row=1, column=0, columnspan=5, padx=10, pady=10)

        # 添加自定义日志处理器
        handler = ScrolledTextHandler(self.console)
        handler.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        logging.getLogger().addHandler(handler)

        # 设置日志记录器的级别
        logging.getLogger().setLevel(logging.DEBUG)

        self.proxy_thread = None
        self.stop_event = Event()

    def toggle_proxy(self):
        if self.proxy_thread is None or not self.proxy_thread.is_alive():
            api_url = self.api_entry.get()
            try:
                switch_interval = int(self.interval_entry.get())
                if api_url and switch_interval > 0:
                    self.stop_event.clear()
                    self.proxy_thread = Thread(target=self.run_proxy_thread, args=(api_url, switch_interval),
                                               daemon=True)
                    self.proxy_thread.start()
                    self.start_button.config(text="停止代理")
                    logging.info("代理已启动")
                else:
                    logging.error("请提供有效的API地址和切换间隔")
            except ValueError:
                logging.error("请提供有效的切换间隔")
        else:
            self.stop_event.set()
            self.root.after(100, self.check_thread_status)

    def check_thread_status(self):
        if self.proxy_thread.is_alive():
            self.root.after(100, self.check_thread_status)
        else:
            self.start_button.config(text="启动代理")
            logging.info("代理已停止")

    def run_proxy_thread(self, api_url, switch_interval):
        run_proxy(api_url, self.stop_event, switch_interval)


if __name__ == "__main__":
    root = tk.Tk()
    app = App(root)
    root.mainloop()

git

安装及配置
下载地址: https://github.com/git-for-windows/git/releases/download/v2.41.0.windows.3/Git-2.41.0.3-64-bit.exe

安装注意事项:

傻瓜式安装,一直下一步就好.

安装目录不要有中文.

尽量也不要有空格.

配置环境变量:

找到安装目录,将bin目录配置到path目录即可;

win + r, 输入cmd. 打开黑窗口. git \–-version,如果不报错,表示安装成功;

1.2 配置基本信息
配置你的用户名称和邮箱:

git config –global user.name “Your Name” #用户名

git config –global user.email “email@example.com” #邮箱

git config -l # 检查一下配置是否成功了.

1.3 初始化本地仓库
新建一个文件夹「目录」, 通过打开黑窗口

git init 初始化一个空的git仓库,操作后这个文件夹下会多一个隐藏的.git

1.4 向git仓库添加文件或者目录
git add 文件名称, 一般情况添加所有:

git add . // 为注意这个.g表示目录下的所有文件/目录

1.5 提交到本地仓库
git commit -m ‘提交日志‘

提交日志,之后咱们再说,这个提交日志非常的重要,它并不是随便写的.一般情况.公司都有要求.如果写这个提交日志.如果公司没有要求,各位自己参照一些好的写法.规范起来.

C:\Users\ldcig\Desktop\git study>git commit -m ‘第一次提交哦’ # 提交
hint: core.useBuiltinFSMonitor=true is deprecated;please set core.fsmonitor=true instead
hint: Disable this message with “git config advice.useCoreFSMonitorConfig false”
[master (root-commit) 74346b7] ‘第一次提交哦’ # 7434b… 表示生成了一个版本号.
4 files changed, 0 insertions(+), 0 deletions(-) # 提交文件的详细信息.
create mode 100644 hehe/222.txt
create mode 100644 hello.txt
create mode 100644 tom.txt
create mode 100644 “\344\270\215\350\246\201\347\235\241\350\247\211\345\225\246.txt”
git status

表示暂存区没有东西可以被提交了.

一般情况下,我们可以这样做:

初始化git仓库: git init

提交文件到暂存区: git add .

提交暂存区的文件到本地仓库: git commit – m ‘提交日志‘

查看当前仓库的状态: git status

查看日志: git log, 查看日志

远程仓库
github.com

gitlab

gitee.com , // 以它为例子,说说如何使用;

2.1 gitee使用
注册一个.

gitee.com

新建一个仓库, 用于关联我的本地仓库.

2.2 查看本地仓库是否已经关联了远程仓库
git remote -v # 查看本地仓库是否关联了远程仓库.
2.3 本地仓库关联远程库
git remote -v # 查看一下.
git remote add origin 远程仓库地址; # 将本地库和指定的远程关联起来
2.4 推送到已经关联的远程仓库上
把本地仓库的所有内容,推送到远程仓库上去;

git push origin master
注意事项:

你们第一次操作,肯定会让你填写用户名称和密码.「Gitee的用户名称和密码.」

如果出现了各种问题,百度一下.

一个本地仓库可以关联上多个远程仓库.

2.5 查看帮助手册
-h, 跟在咱们的命令后边,表示查看命令的帮助手册.

git remote -h # 查看remote命令如何使用.
2.6 分支操作
查看分支, 当前在哪个分支上操作呢:

git branch

创建分支

git branch 分支名称

切换分支

git checkout 分支名称

合并分支

git merge 分支名称

删除本地支

git branch -d 分支名称 # 删除已经合并的分支

git branch -D 分支名称 # 不管你合没合并,都能删除.

切换分支,如果发现切换的分支没有存在,则新建一个分支

git checkout -b 分支名称

删除远程分支

git push origin –delete 你要删除的分支名称

2.7 版本回退
git reset –hard HEAD~1, 表示回退一个版本号.

git reset –hard HEAD~3, 表示回退三个版本号.

python 依赖注入_Dependency Injection-依赖注入详解

使用Dependency Injector可以将应用程序结构保存在一个地方。这个地方叫做集装箱。您可以使用容器来管理应用程序的所有组件。所有组件依赖关系都是显式定义的。这提供了对应用程序结构的控制。它很容易理解和改变。

容器就像应用程序的映射。你总是知道什么取决于什么


什么是依赖注入,有什么好处?
依赖注入是一种强大的设计模式,可以帮助使软件更加模块化、更灵活、更容易测试。这种设计思想可以实现低耦合,高内聚。

我的现在的理解就是你先写好一个函数结构,但参数什么的不写死在里面,而是通过依赖注入后面再传入进来(记时的理解,不一定对)

设计案例:
我们之前的写法:

import os
 
 
class ApiClient:
 
    def __init__(self) -> None:
        self.api_key = os.getenv("API_KEY")  # <-- dependency
        self.timeout = int(os.getenv("TIMEOUT"))  # <-- dependency
 
 
class Service:
 
    def __init__(self) -> None:
        self.api_client = ApiClient()  # <-- dependency
 
 
def main() -> None:
    service = Service()  # <-- dependency
    ...
 
 
if __name__ == "__main__":
    main()

依赖注入的写法:(通过函数传参方式)


import os
 
 
class ApiClient:
 
    def __init__(self, api_key: str, timeout: int) -> None:
        self.api_key = api_key  # <-- dependency is injected
        self.timeout = timeout  # <-- dependency is injected
 
 
class Service:
 
    def __init__(self, api_client: ApiClient) -> None:
        self.api_client = api_client  # <-- dependency is injected
 
 
def main(service: Service) -> None:  # <-- dependency is injected
    ...
 
 
if __name__ == "__main__":
    main(
        service=Service(
            api_client=ApiClient(
                api_key=os.getenv("API_KEY"),
                timeout=int(os.getenv("TIMEOUT")),
            ),
        ),
    )

但是这种方式实现的话,函数调用时需要创建很多对象,不便于管理和维护。

这个时候就需要依赖注入器(dependency_injector 框架)来帮助组建对象和函数。大概形式如下所示:

from dependency_injector import containers, providers
from dependency_injector.wiring import Provide, inject
 
class Container(containers.DeclarativeContainer):
 
    config = providers.Configuration()
 
    api_client = providers.Singleton(
        ApiClient,
        api_key=config.api_key,
        timeout=config.timeout,
    )
 
    service = providers.Factory(
        Service,
        api_client=api_client,
    )
 
 
@inject
def main(service: Service = Provide[Container.service]) -> None:
    print('main',service.api_client.api_key)
 
 
if __name__ == "__main__":
    container = Container()
    container.config.api_key.from_env("API_KEY", default='api_key', required=True)
    container.config.timeout.from_env("TIMEOUT", as_=int, default=5)
    container.wire(modules=[__name__])
 
    main()  # <-- dependency is injected automatically
 
    with container.api_client.override(mock.Mock()):
        main()  # <-- overridden dependency is injected automatically

这样实现有几个好处:

  1. 依赖自动组装和注入
  2. 可以通过container.api_client.override(mock.Mock()) 使用测试函数
  3. 可以方便配置多套不同的开发环境的依赖注入。
  4. 函数的结构更加显示,容易看出入参的结构。

dependency_injector框架主要构成
工具集 提供的方法/类 备注
Providers

Factory, Singleton, Callable, Coroutine, Object, List, Dict, Configuration, Resource, Dependency等方法

  1. 帮助组装对象;
  2. 方便Overriding覆盖

Overriding
container.api_client_factory.override
container.api_client_factory.reset_override
覆盖对象改变注入
Configuration 提供了从多种源读取配置:yaml、ini、json、env等
Resource 提供一种类似Singleton的资源加载方式,提供初始化,和卸载资源方法。
container 容器,里面定义了多个实例
Wiring 提供@inject装饰器,指示需要被装饰的函数



解密Tenacity:Python中最强大的重试库

在编写应用程序时,经常需要处理与外部服务通信或其他不稳定操作相关的问题。这些问题可能包括网络错误、服务不可用、超时等。在这些情况下,重试操作是一种常见的解决方案。Tenacity是Python中一个强大且灵活的重试库,它可以帮助你有效地处理这些问题。

这篇文章将介绍Tenacity重试库的使用,包括如何安装和配置Tenacity,以及如何在不同场景下使用它来处理重试操作。还有Tenacity的各种功能和选项,并提供丰富的示例代码来帮助你更好地理解如何应用它。

安装Tenacity

首先,安装Tenacity库。使用pip来安装Tenacity:

pip install tenacity

基本用法

Tenacity的基本思想是定义一个装饰器,该装饰器可以应用于函数或方法,以实现自动重试。

下面是一个简单的示例:

from tenacity import retry, stop_after_attempt

@retry(stop=stop_after_attempt(3))
def do_something():
    print("Doing something...")
    raise Exception("Something went wrong!")

try:
    do_something()
except Exception as e:
    print(f"Exception: {e}")

在上面的示例中,使用@retry装饰器来修饰do_something函数。配置了重试策略,即在前三次尝试后停止重试(stop_after_attempt(3))。在do_something函数中,模拟了一个失败的操作,触发了异常。由于配置了重试,Tenacity将在异常发生时自动重试该函数,最多重试3次。

配置选项

Tenacity提供了许多配置选项,可以满足不同场景的需求。以下是一些常用的配置选项:

  • wait:定义重试之间的等待时间,可以是固定的时间间隔或根据指数递增的时间间隔。
  • stop:定义何时停止重试,可以根据尝试次数、总时间或其他条件停止。
  • retry:定义在哪些异常情况下执行重试,可以根据异常类型、自定义条件或自定义回调函数执行。
  • before_sleep:在每次重试之前执行的操作,可以用于执行清理或日志记录等任务。
  • reraise:是否重新引发异常,如果设置为True,则在达到最大重试次数后会引发原始异常。

示例代码

以下是更多示例代码,演示了Tenacity的不同用法:

自定义重试条件

from tenacity import retry, stop_after_attempt, retry_if_exception_type

@retry(
    stop=stop_after_attempt(5),
    retry=retry_if_exception_type(IOError)
)
def open_file(file_path):
    print(f"Opening file: {file_path}")
    raise IOError("File not found")

try:
    open_file("example.txt")
except IOError as e:
    print(f"Exception: {e}")

在上面的示例中,定义了自定义的重试条件,仅当捕获到IOError异常时才重试,最多重试5次。

配置等待时间

from tenacity import retry, wait_fixed

@retry(wait=wait_fixed(2))
def slow_function():
    print("Slow function running...")
    raise Exception("Something went wrong!")

try:
    slow_function()
except Exception as e:
    print(f"Exception: {e}")

这个示例中,配置了一个固定的等待时间为2秒,表示在每次重试之间等待2秒。

使用before_sleep回调

from tenacity import retry, wait_fixed, before_sleep_log

@retry(wait=wait_fixed(2), before_sleep=before_sleep_log(logger))
def some_operation():
    print("Doing some operation...")
    raise Exception("Failed!")

try:
    some_operation()
except Exception as e:
    print(f"Exception: {e}")

在这个示例中,使用了before_sleep回调函数,它会在每次重试之前执行,并通过日志记录等待时间。这有助于更好地理解Tenacity的工作方式。

高级用法

Tenacity提供了许多高级功能,增强了其灵活性和适用性。

下面简要介绍一些高级用法:

  1. Jitter配置:

Tenacity支持配置Jitter,这是一种随机性的等待时间,有助于避免所有重试操作同时进行。通过配置Jitter,可以使重试操作在一定的时间范围内随机分散执行,减轻了服务的负载。

from tenacity import retry, wait_random

@retry(wait=wait_random(min=1, max=5))
def operation_with_jitter():
    print("Operation with Jitter...")
    raise Exception("Failed!")

try:
    operation_with_jitter()
except Exception as e:
    print(f"Exception: {e}")
  1. 等待可重试条件:

可以定义自定义的可重试条件,以满足特定的应用场景。例如,可以在某个状态满足时才触发重试。

from tenacity import retry, retry_if_result, stop_after_attempt

def should_retry(result):
    return result is not None

@retry(retry=retry_if_result(should_retry), stop=stop_after_attempt(3))
def operation_with_custom_retry_condition():
    result = do_operation()
    return result

def do_operation():
    print("Doing operation...")
    return None

try:
    operation_with_custom_retry_condition()
except Exception as e:
    print(f"Exception: {e}")
  1. 自定义停止策略: Tenacity允许

自定义停止策略,以便在特定条件下停止重试。这可以是基于异常类型、尝试次数、总时间或其他条件。

from tenacity import retry, stop_after_delay, retry_if_exception

def custom_stop_predicate(retry_state):
    return retry_state.outcome.exception is not None

@retry(stop=stop_after_delay(10) | stop_after_attempt(5), retry=retry_if_exception())
def operation_with_custom_stop():
    print("Operation with Custom Stop...")
    raise Exception("Failed!")

try:
    operation_with_custom_stop()
except Exception as e:
    print(f"Exception: {e}")

总结

在开发Python应用程序时,处理不稳定的操作和错误是一个常见的挑战。Tenacity是一个强大的重试库,可以帮助你优雅地应对各种失败和异常情况。通过合理配置Tenacity的参数,可以实现灵活的重试策略,适应不同的应用场景。

这篇文章介绍了Tenacity的基本用法,包括如何装饰函数以启用重试、如何配置重试的等待策略、如何处理特定的异常类型等。还分享了Tenacity的高级功能,如Jitter配置、自定义可重试条件和停止策略,能够更好地适应复杂的应用需求。

无论是处理网络请求、文件操作还是其他可能出现错误的情况,Tenacity都可以帮助你提高应用程序的可靠性。它是一个非常有价值的工具,特别适用于需要处理不稳定操作的应用程序,如分布式系统、微服务和API调用。

通过掌握Tenacity,可以更好地保护你应用程序免受意外错误的影响,提供更好的用户体验。

使用pycharm+conda配置虚拟环境详细步骤

最近用pycharm使用seleniumwire,但在python3.12版本出错,所以要用3.9版本,所以就用到anaconda来管理版本,推荐第一种使用已有的conda环境,可以比较方便的管理版本

一.pycharm使用已有的conda环境

相较于使用pycharm创建虚拟环境,其实使用conda创建更为简单,在创建后只需要在pycharm中添加该虚拟环境即可。

1.创建虚拟环境

首先,打开conda prompt,并创建新的虚拟环境,设置环境名称以及相应python版本。

conda create -n env_name python=3.6.0

创建之后可以通过conda命令查看并激活环境,

conda activate tf1.15

激活进入环境后添加包。

conda install pandas
如果conda安装不成功可试下pip install selenium-wire

2、如果是pycharm新建项目使用刚建的虚拟环境

  1. 第一行的位置一栏填写索要创建项目的目标文件夹位置。
  2. 解释器类型选custom environment
  3. 环境选择已存在的
  4. 类型选择conda
  5. 再选择刚刚自己创建的虚拟环境

就可以使用了

3.在pycharm中已有项目使用虚拟环境

依次打开

  • 文件
  • 设置
  • python解释器
  • 图标
  • 全部显示
  • 加号
  • conda环境

点击现有环境,在解释器中选择anaconda文件夹下的envs文件夹下的刚建好的虚拟环境中的python.exe文件。

然后conda可执行文件中选择anaconda文件夹下的”Scripts”文件夹下的conda.exe,点击确认。

此时就会多出选定的解释器,可在软件包中查看当前环境中的资源包,并点击应用,等待一段时间后即可完成配置。

可以从界面右下角看到已经使用了刚刚创建的虚拟环境下的解释器,此时可以运行一下main函数测试是否配置完成。

输出正常,则配置完成。

还可以参考https://blog.csdn.net/qq_40968179/article/details/128990022把旧电脑的虚拟环境直接迁移到新电脑上用。

(selenium-wire出错提示optmanager里 #import blinker._saferef)

二.使用pycharm创建虚拟环境

首先,打开点击pycharm左上方的“文件”中的“新建项目”。

1.新建项目选项填写

  1. 第一行的位置一栏填写索要创建项目的目标文件夹位置。
  2. 使用此工具新建环境处选择”Conda”
  3. 第二个位置是指存储新建的虚拟环境中的资源包的位置。
  4. python副本根据项目需求进行选择。
  5. conda可执行文件是在自己的anaconda文件夹下的”Scripts”文件夹下的conda.exe。
  6. 可根据自身需求选择是否选择用于所有项目。

在创建之后可能会出现报错,无视即可,直接进入下一步操作。
文件中找到设置,在设置中找到并点击python解释器,点击图中的图标。

点击后选择“展示全部”,在python解释器选择界面点击下图中加号,并进入添加python解释器界面。

点击“新环境”,在conda可执行文件中选择自己的anaconda文件夹下Scripts文件下的conda.exe文件,并点击“确定”。

之后软件会自动安装相应解释器,虚拟环境创建成功。

2.安装所需资源包

我个人比较习惯在conda prompt 中进行资源包的安装(在安装前一定要确认所安装资源包与python解释器版本是否对应,不然后续会很麻烦)。

打开桌面菜单,点击anaconda中的conda prompt。

使用conda指令查看目前所拥有的的conda环境(对应anaconda文件夹下的env文件夹)

1conda envlist 

激活你所创建的虚拟环境

1conda activate tf1.15

此时便进入到了相应的虚拟环境中,会在下图位置有所提示:

此时便可以通过conda命令添加相应所需要的资源包,比如pandas,例:

1conda installpandas

查看当前环境中的资源包:

1conda list

此时则已经安装成功,诸如此类可以安装其他的资源包。

3.可能存在的问题及应对

若你的pycharm因为某些原因无法创建python解释器,则可以通过conda prompt进行创建,并在pycharm中引用此环境,此处可参考后续部分二的内容,

原文

什么是回调函数,同步,异步

1 单线程是对多线程的,对于多线程的开发语言,有一个请求就可以开一个线程处理。那,对于单线程语言,只有通过异步调用程序。【事件 (事件循环机制), 回调】

2 异步是对同步说的, 最大区别就是同步需要等待,异步这不需要等待。

“同步模式”就是指后一个任务等待前一个任务结束,然后再执行,程序的执行顺序与任务的排列顺序是一致的、同步的。
“异步模式”则完全不同,每一个任务有一个或多个回调函数(callback),前一个任务结束后,不是执行后一个任务,而是执行回调函数,后一个任务则是不等前一个任务结束就执行,所以程序的执行顺序与任务的排列顺序是不一致的、异步的。

回调是一个函数,它作为参数传递给另一个函数,并在其父函数完成后执行。

你到一个商店买东西,刚好你要的东西没有货,于是你在店员那里留下了你的电话,过了几天店里有货了,店员就打了你的电话,然后你接到电话后就到店里去取了货。在这个例子里,你的电话号码就叫回调函数,你把电话留给店员就叫登记回调函数,店里后来有货了叫做触发了回调关联的事件,店员给你打电话叫做调用回调函数,你到店里去取货叫做响应回调事件。



我们绕点远路来回答这个问题。

编程分为两类:系统编程(system programming)和应用编程(application programming)。所谓系统编程,简单来说,就是编写;而应用编程就是利用写好的各种库来编写具某种功用的程序,也就是应用。系统程序员会给自己写的库留下一些接口,即API(application programming interface,应用编程接口),以供应用程序员使用。所以在抽象层的图示里,库位于应用的底下。

当程序跑起来时,一般情况下,应用程序(application program)会时常通过API调用库里所预先备好的函数。但是有些库函数(library function)却要求应用先传给它一个函数,好在合适的时候调用,以完成目标任务。这个被传入的、后又被调用的函数就称为回调函数(callback function)。

打个比方,有一家旅馆提供叫醒服务,但是要求旅客自己决定叫醒的方法。可以是打客房电话,也可以是派服务员去敲门,睡得死怕耽误事的,还可以要求往自己头上浇盆水。这里,“叫醒”这个行为是旅馆提供的,相当于库函数,但是叫醒的方式是由旅客决定并告诉旅馆的,也就是回调函数。而旅客告诉旅馆怎么叫醒自己的动作,也就是把回调函数传入库函数的动作,称为登记回调函数(to register a callback function)。如下图所示(图片来源:维基百科):

可以看到,回调函数通常和应用处于同一抽象层(因为传入什么样的回调函数是在应用级别决定的)。而回调就成了一个高层调用底层,底层再过头来用高层的过程。(我认为)这应该是回调最早的应用之处,也是其得名如此的原因。

回调机制的优势

从上面的例子可以看出,回调机制提供了非常大的灵活性。请注意,从现在开始,我们把图中的库函数改称为中间函数了,这是因为回调并不仅仅用在应用和库之间。任何时候,只要想获得类似于上面情况的灵活性,都可以利用回调。

这种灵活性是怎么实现的呢?乍看起来,回调似乎只是函数间的调用,但仔细一琢磨,可以发现两者之间的一个关键的不同:在回调中,我们利用某种方式,把回调函数像参数一样传入中间函数。可以这么理解,在传入一个回调函数之前,中间函数是不完整的。换句话说,程序可以在运行时,通过登记不同的回调函数,来决定、改变中间函数的行为。这就比简单的函数调用要灵活太多了。请看下面这段Python写成的回调的简单示例:

`even.py`

#回调函数1
#生成一个2k形式的偶数
def double(x):
    return x * 2
    
#回调函数2
#生成一个4k形式的偶数
def quadruple(x):
    return x * 4

`callback_demo.py`

from even import *

#中间函数
#接受一个生成偶数的函数作为参数
#返回一个奇数
def getOddNumber(k, getEvenNumber):
    return 1 + getEvenNumber(k)
    
#起始函数,这里是程序的主函数
def main():    
    k = 1
    #当需要生成一个2k+1形式的奇数时
    i = getOddNumber(k, double)
    print(i)
    #当需要一个4k+1形式的奇数时
    i = getOddNumber(k, quadruple)
    print(i)
    #当需要一个8k+1形式的奇数时
    i = getOddNumber(k, lambda x: x * 8)
    print(i)
    
if __name__ == "__main__":
    main()

运行`callback_demp.py`,输出如下:

3
5
9

上面的代码里,给`getOddNumber`传入不同的回调函数,它的表现也不同,这就是回调机制的优势所在。值得一提的是,上面的第三个回调函数是一个匿名函数。

易被忽略的第三方

通过上面的论述可知,中间函数和回调函数是回调的两个必要部分,不过人们往往忽略了回调里的第三位要角,就是中间函数的调用者。绝大多数情况下,这个调用者可以和程序的主函数等同起来,但为了表示区别,我这里把它称为起始函数(如上面的代码中注释所示)。

之所以特意强调这个第三方,是因为我在网上读相关文章时得到一种印象,很多人把它简单地理解为两个个体之间的来回调用。譬如,很多中文网页在解释“回调”(callback)时,都会提到这么一句话:“If you call me, I will call you back.”我没有查到这句英文的出处。我个人揣测,很多人把起始函数和回调函数看作为一体,大概有两个原因:第一,可能是“回调”这一名字的误导;第二,给中间函数传入什么样的回调函数,是在起始函数里决定的。实际上,回调并不是“你我”两方的互动,而是ABC的三方联动。有了这个清楚的概念,在自己的代码里实现回调时才不容易混淆出错。

另外,回调实际上有两种:阻塞式回调和延迟式回调。两者的区别在于:阻塞式回调里,回调函数的调用一定发生在起始函数返回之前;而延迟式回调里,回调函数的调用有可能是在起始函数返回之后。这里不打算对这两个概率做更深入的讨论,之所以把它们提出来,也是为了说明强调起始函数的重要性。网上的很多文章,提到这两个概念时,只是笼统地说阻塞式回调发生在主调函数返回之前,却没有明确这个主调函数到底是起始函数还是中间函数,不免让人糊涂,所以这里特意说明一下。另外还请注意,本文中所举的示例均为阻塞式回调。延迟式回调通常牵扯到多线程,我自己还没有完全搞明白,所以这里就不多说了。

链接:https://www.zhihu.com/question/19801131/answer/27459821