百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Python教程(二十五):装饰器–函数的高级用法

wptr33 2025-08-05 21:49 2 浏览

今天您将学习什么

  • 什么是装饰器以及如何创建装饰器
  • 函数装饰器和类装饰器
  • 带参数的装饰器
  • 装饰器的实际应用
  • 真实世界示例:日志记录、性能监控、缓存、权限验证

什么是装饰器?

装饰器是Python中的一种设计模式,它允许您在不修改原函数代码的情况下,为函数添加新的功能。装饰器本质上是一个函数,它接受一个函数作为参数,并返回一个新的函数。

装饰器的优势:

  • 代码复用:相同的功能可以应用到多个函数
  • 代码分离:核心逻辑和横切关注点分离
  • 可读性:使用@语法使代码更清晰
  • 灵活性:可以动态地添加或移除功能

1. 基本装饰器

简单装饰器

def timer_decorator(func):
    """计时装饰器"""
    import time
    
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} 执行时间:{end_time - start_time:.4f}秒")
        return result
    
    return wrapper

def log_decorator(func):
    """日志装饰器"""
    def wrapper(*args, **kwargs):
        print(f"调用函数:{func.__name__}")
        print(f"参数:args={args}, kwargs={kwargs}")
        result = func(*args, **kwargs)
        print(f"返回值:{result}")
        return result
    
    return wrapper

# 使用装饰器
@timer_decorator
@log_decorator
def fibonacci(n):
    """计算斐波那契数列"""
    if n <= 1:
        return n
    return fibonacci(n-1) + fibonacci(n-2)

# 等价于:
# fibonacci = timer_decorator(log_decorator(fibonacci))

# 测试装饰器
print("计算斐波那契数列第5项:")
result = fibonacci(5)
print(f"结果:{result}")

保留函数元信息

from functools import wraps

def preserve_metadata(func):
    """保留函数元信息的装饰器"""
    @wraps(func)  # 保留原函数的元信息
    def wrapper(*args, **kwargs):
        print(f"调用函数:{func.__name__}")
        return func(*args, **kwargs)
    return wrapper

@preserve_metadata
def greet(name):
    """问候函数"""
    return f"你好,{name}!"

# 测试元信息保留
print(f"函数名:{greet.__name__}")
print(f"函数文档:{greet.__doc__}")
print(f"函数调用:{greet('张三')}")

2. 带参数的装饰器

装饰器工厂

def repeat(times):
    """重复执行装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for i in range(times):
                print(f"第{i+1}次执行:")
                result = func(*args, **kwargs)
            return result
        return wrapper
    return decorator

def retry(max_attempts=3, delay=1):
    """重试装饰器"""
    import time
    
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        print(f"最终失败:{e}")
                        raise
                    print(f"第{attempt + 1}次尝试失败:{e},{delay}秒后重试...")
                    time.sleep(delay)
        return wrapper
    return decorator

@repeat(3)
def say_hello(name):
    """问候函数"""
    print(f"你好,{name}!")
    return "问候完成"

@retry(max_attempts=3, delay=1)
def risky_function():
    """有风险的函数"""
    import random
    if random.random() < 0.7:
        raise ValueError("随机错误")
    return "成功!"

# 测试带参数的装饰器
print("=== 重复执行装饰器 ===")
say_hello("李四")

print("\n=== 重试装饰器 ===")
try:
    result = risky_function()
    print(f"结果:{result}")
except Exception as e:
    print(f"最终失败:{e}")

真实世界示例1:性能监控装饰器

import time
import functools
from collections import defaultdict

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self.stats = defaultdict(list)
    
    def monitor(self, func):
        """性能监控装饰器"""
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            start_memory = self._get_memory_usage()
            
            try:
                result = func(*args, **kwargs)
                success = True
            except Exception as e:
                result = None
                success = False
                raise
            finally:
                end_time = time.time()
                end_memory = self._get_memory_usage()
                
                execution_time = end_time - start_time
                memory_used = end_memory - start_memory
                
                self.stats[func.__name__].append({
                    'execution_time': execution_time,
                    'memory_used': memory_used,
                    'success': success,
                    'timestamp': time.time()
                })
            
            return result
        return wrapper
    
    def _get_memory_usage(self):
        """获取内存使用量(简化版)"""
        import psutil
        try:
            return psutil.Process().memory_info().rss / 1024 / 1024  # MB
        except ImportError:
            return 0
    
    def get_stats(self, func_name=None):
        """获取统计信息"""
        if func_name:
            return self.stats.get(func_name, [])
        return dict(self.stats)
    
    def print_summary(self):
        """打印统计摘要"""
        print("性能监控摘要:")
        for func_name, calls in self.stats.items():
            if calls:
                times = [call['execution_time'] for call in calls]
                memories = [call['memory_used'] for call in calls]
                success_count = sum(1 for call in calls if call['success'])
                
                print(f"\n{func_name}:")
                print(f"  调用次数:{len(calls)}")
                print(f"  成功次数:{success_count}")
                print(f"  平均执行时间:{sum(times)/len(times):.4f}秒")
                print(f"  最大执行时间:{max(times):.4f}秒")
                print(f"  平均内存使用:{sum(memories)/len(memories):.2f}MB")

# 创建性能监控器实例
monitor = PerformanceMonitor()

@monitor.monitor
def slow_function():
    """慢函数"""
    time.sleep(0.1)
    return "完成"

@monitor.monitor
def fast_function():
    """快函数"""
    return "快速完成"

@monitor.monitor
def error_function():
    """会出错的函数"""
    raise ValueError("测试错误")

# 测试性能监控
print("=== 性能监控测试 ===")
for i in range(5):
    slow_function()
    fast_function()

try:
    error_function()
except:
    pass

# 打印统计信息
monitor.print_summary()

真实世界示例2:缓存装饰器

import functools
import time
from collections import OrderedDict

class LRUCache:
    """LRU缓存装饰器"""
    
    def __init__(self, max_size=128, ttl=None):
        self.max_size = max_size
        self.ttl = ttl  # 生存时间(秒)
        self.cache = OrderedDict()
    
    def __call__(self, func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 创建缓存键
            key = self._make_key(args, kwargs)
            
            # 检查缓存
            if key in self.cache:
                value, timestamp = self.cache[key]
                
                # 检查TTL
                if self.ttl is None or time.time() - timestamp < self.ttl:
                    # 移动到末尾(最近使用)
                    self.cache.move_to_end(key)
                    print(f"缓存命中:{func.__name__}")
                    return value
                else:
                    # 过期,删除
                    del self.cache[key]
            
            # 计算新值
            result = func(*args, **kwargs)
            
            # 存储到缓存
            self.cache[key] = (result, time.time())
            
            # 如果缓存满了,删除最旧的
            if len(self.cache) > self.max_size:
                self.cache.popitem(last=False)
            
            print(f"缓存未命中:{func.__name__}")
            return result
        
        return wrapper
    
    def _make_key(self, args, kwargs):
        """创建缓存键"""
        # 简化版:使用字符串表示
        key_parts = [str(arg) for arg in args]
        key_parts.extend(f"{k}={v}" for k, v in sorted(kwargs.items()))
        return "|".join(key_parts)
    
    def clear(self):
        """清空缓存"""
        self.cache.clear()
    
    def get_stats(self):
        """获取缓存统计"""
        return {
            'size': len(self.cache),
            'max_size': self.max_size,
            'keys': list(self.cache.keys())
        }

# 使用LRU缓存
@LRUCache(max_size=10, ttl=60)
def fibonacci_cached(n):
    """带缓存的斐波那契函数"""
    if n <= 1:
        return n
    return fibonacci_cached(n-1) + fibonacci_cached(n-2)

@LRUCache(max_size=5)
def expensive_calculation(x, y):
    """昂贵的计算"""
    time.sleep(0.1)  # 模拟耗时计算
    return x * y + x + y

# 测试缓存
print("=== 缓存测试 ===")
print("计算斐波那契数列:")
for i in range(10):
    result = fibonacci_cached(i)
    print(f"fibonacci({i}) = {result}")

print("\n昂贵计算测试:")
for i in range(3):
    for j in range(3):
        result = expensive_calculation(i, j)
        print(f"calc({i}, {j}) = {result}")

# 获取缓存统计
cache_decorator = fibonacci_cached.__closure__[0].cell_contents
print(f"\n缓存统计:{cache_decorator.get_stats()}")

真实世界示例3:权限验证装饰器

from functools import wraps
from enum import Enum

class Permission(Enum):
    """权限枚举"""
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class User:
    """用户类"""
    
    def __init__(self, username, permissions=None):
        self.username = username
        self.permissions = permissions or []
    
    def has_permission(self, permission):
        """检查是否有指定权限"""
        return permission in self.permissions or Permission.ADMIN in self.permissions

class PermissionDecorator:
    """权限验证装饰器"""
    
    def __init__(self, required_permission):
        self.required_permission = required_permission
    
    def __call__(self, func):
        @wraps(func)
        def wrapper(user, *args, **kwargs):
            if not isinstance(user, User):
                raise ValueError("第一个参数必须是User对象")
            
            if not user.has_permission(self.required_permission):
                raise PermissionError(
                    f"用户 {user.username} 没有 {self.required_permission.value} 权限"
                )
            
            print(f"用户 {user.username} 执行 {func.__name__}")
            return func(user, *args, **kwargs)
        
        return wrapper

class FileManager:
    """文件管理器"""
    
    @PermissionDecorator(Permission.READ)
    def read_file(self, user, filename):
        """读取文件"""
        return f"读取文件:{filename}"
    
    @PermissionDecorator(Permission.WRITE)
    def write_file(self, user, filename, content):
        """写入文件"""
        return f"写入文件:{filename},内容:{content}"
    
    @PermissionDecorator(Permission.DELETE)
    def delete_file(self, user, filename):
        """删除文件"""
        return f"删除文件:{filename}"
    
    @PermissionDecorator(Permission.ADMIN)
    def system_info(self, user):
        """系统信息(管理员权限)"""
        return "系统信息:正常运行"

# 创建用户
admin_user = User("admin", [Permission.ADMIN])
editor_user = User("editor", [Permission.READ, Permission.WRITE])
reader_user = User("reader", [Permission.READ])

# 创建文件管理器
file_manager = FileManager()

# 测试权限验证
print("=== 权限验证测试 ===")

# 管理员测试
print("\n管理员操作:")
try:
    print(file_manager.read_file(admin_user, "test.txt"))
    print(file_manager.write_file(admin_user, "test.txt", "Hello"))
    print(file_manager.delete_file(admin_user, "test.txt"))
    print(file_manager.system_info(admin_user))
except Exception as e:
    print(f"错误:{e}")

# 编辑者测试
print("\n编辑者操作:")
try:
    print(file_manager.read_file(editor_user, "test.txt"))
    print(file_manager.write_file(editor_user, "test.txt", "Hello"))
    try:
        print(file_manager.delete_file(editor_user, "test.txt"))
    except PermissionError as e:
        print(f"权限不足:{e}")
    try:
        print(file_manager.system_info(editor_user))
    except PermissionError as e:
        print(f"权限不足:{e}")
except Exception as e:
    print(f"错误:{e}")

# 读者测试
print("\n读者操作:")
try:
    print(file_manager.read_file(reader_user, "test.txt"))
    try:
        print(file_manager.write_file(reader_user, "test.txt", "Hello"))
    except PermissionError as e:
        print(f"权限不足:{e}")
    try:
        print(file_manager.delete_file(reader_user, "test.txt"))
    except PermissionError as e:
        print(f"权限不足:{e}")
    try:
        print(file_manager.system_info(reader_user))
    except PermissionError as e:
        print(f"权限不足:{e}")
except Exception as e:
    print(f"错误:{e}")

装饰器的最佳实践

推荐做法:

  • 使用functools.wraps保留函数元信息
  • 保持装饰器的简单性和可读性
  • 合理使用装饰器参数
  • 考虑装饰器的执行顺序

避免的做法:

  • 创建过于复杂的装饰器
  • 忽略函数元信息的保留
  • 在装饰器中产生副作用
  • 过度使用装饰器

高级装饰器特性

类装饰器

class Singleton:
    """单例装饰器"""
    
    def __init__(self, cls):
        self.cls = cls
        self.instance = None
    
    def __call__(self, *args, **kwargs):
        if self.instance is None:
            self.instance = self.cls(*args, **kwargs)
        return self.instance

@Singleton
class Database:
    """数据库连接类"""
    
    def __init__(self):
        print("创建数据库连接...")
        self.connection = "数据库连接"
    
    def query(self, sql):
        return f"执行查询:{sql}"

# 测试单例装饰器
db1 = Database()
db2 = Database()
print(f"db1 is db2: {db1 is db2}")  # True

装饰器链

def bold(func):
    """加粗装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        return f"**{func(*args, **kwargs)}**"
    return wrapper

def italic(func):
    """斜体装饰器"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        return f"*{func(*args, **kwargs)}*"
    return wrapper

@bold
@italic
def greet(name):
    """问候函数"""
    return f"你好,{name}!"

# 测试装饰器链
print(greet("世界"))  # **你好,世界!**

相关推荐

python数据容器之列表、元组、字符串

数据容器分为5类,分别是:列表(list)、元组(tuple)、字符串(str)、集合(set)、字典(dict)list#字面量[元素1,元素2,元素3,……]...

深入理解 PYTHON 虚拟机:令人拍案叫绝的字节码设计

深入理解PYTHON虚拟机:令人拍案叫绝的字节码设计在本篇文章当中主要给大家介绍cpython虚拟机对于字节码的设计以及在调试过程当中一个比较重要的字段co_lnotab的设计原理!PYT...

Python快速学习第一天!

第一天:Python是一种解释型的、面向对象的、带有动态语义的高级程序设计语言一、运行Python:1、在交互式环境下,直接输入Python进入Python编程环境[root@tanggao/]#...

Java 程序员的第一套Python代码

选择的Web组件是Python里面的Django,这不一定是一个最佳的框架或者最快的框架,当时他应该算是一个最成熟的框架。...

Python 中 必须掌握的 20 个核心函数及其含义,不允许你不会

以下是Python中必须掌握的20个核心函数及其含义...

Python代码:按和值奇偶比对号码进行组合

Python代码:按和值奇偶比对号码进行组合不少朋友在选定号码以后,会按照一定的和值来组号,比如大乐透常见和值有626372737481108116等我们不用固定在一个数上,我们可以给定...

30天学会Python编程:16. Python常用标准库使用教程

16.1collections模块16.1.1高级数据结构16.1.2示例...

Python强大的内置模块collections

1.模块说明collections是Python的一个内置模块,所谓内置模块的意思是指Python内部封装好的模块,无需安装即可直接使用。...

Python自动化办公应用学习笔记31—全局变量和局部变量

一个Python程序中的变量包括两类:全局变量和局部变量。一、全局变量·...

精通Python可视化爬虫:Selenium实战全攻略

在数据驱动的时代,爬虫技术成为获取信息的强大武器。而Python作为编程界的“瑞士军刀”,搭配Selenium库,更是让我们在动态网页抓取领域如鱼得水。本文将带你深入探索PythonSelenium...

Python中的数据类型操作

...

Python教程(二十五):装饰器–函数的高级用法

今天您将学习什么...

玩转Python列表/字典:增删改查与高效遍历技巧

为什么列表和字典是Python的灵魂?你是否遇到过这样的场景?想存储学生成绩,用列表却发现查找某个学生的分数像大海捞针?用字典存储购物车商品,却不知道如何高效批量修改价格?遍历数据时,传统循环写得...

Python列表操作

Python添加列表4分钟阅读在Python操作列表有各种方法。例如–简单地将一个列表的元素附加到...

充分利用Python多进程提高并发

在计算机编程中,我们经常需要同时执行多个任务。然而,传统的单线程方式无法充分利用计算机的多核处理器,导致程序的执行效率低下。Python中的多进程编程技术可以帮助我们解决这个问题,通过同时运行多个进程...