百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

Python文件操作常用库高级应用教程

wptr33 2025-06-15 19:47 3 浏览

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。

一、高级文件系统监控

1.1 watchdog库 - 实时文件系统监控

安装与基本使用

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class FileChangeHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            print(f"文件被修改: {event.src_path}")

def monitor_directory(path):
    event_handler = FileChangeHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

# 监控当前目录
monitor_directory('.')

功能扩展

class CustomHandler(FileSystemEventHandler):
    def __init__(self, callback):
        self.callback = callback
    
    def on_any_event(self, event):
        event_type = event.event_type  # 'modified', 'created', 'deleted', 'moved'
        self.callback(event_type, event.src_path, 
                     getattr(event, 'dest_path', None))

# 使用示例
def log_change(event_type, src, dest):
    print(f"{event_type}: {src} {f'-> {dest}' if dest else ''}")

monitor_with_callback('.', log_change)

文件监控系统架构




二、高性能文件处理

2.1 内存映射文件(mmap)

大文件处理示例

import mmap
import re

def search_large_file(filename, pattern):
    """在大文件中搜索模式"""
    with open(filename, 'r+b') as f:
        # 内存映射文件
        with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            # 使用正则搜索
            for match in re.finditer(pattern.encode(), mm):
                yield match.start(), match.group().decode()

# 使用示例
for pos, text in search_large_file('large.log', r'ERROR\s+\d+'):
    print(f"在位置 {pos} 发现错误: {text}")

性能对比测试

import timeit

def test_performance():
    # 传统方式
    def traditional():
        with open('large.file') as f:
            return sum(1 for _ in f)
    
    # mmap方式
    def mmap_way():
        with open('large.file', 'r+b') as f:
            with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                return sum(1 for _ in mm)
    
    print("传统方式:", timeit.timeit(traditional, number=10))
    print("mmap方式:", timeit.timeit(mmap_way, number=10))

2.2 并行文件处理

多进程文件处理

from multiprocessing import Pool
import os

def process_file_chunk(args):
    """处理文件块"""
    filename, start, end = args
    with open(filename, 'rb') as f:
        f.seek(start)
        chunk = f.read(end - start)
        return len(chunk.splitlines())

def parallel_file_processing(filename, workers=4):
    """并行处理大文件"""
    file_size = os.path.getsize(filename)
    chunk_size = file_size // workers
    ranges = [(filename, i*chunk_size, (i+1)*chunk_size) 
              for i in range(workers)]
    
    with Pool(workers) as pool:
        results = pool.map(process_file_chunk, ranges)
    
    return sum(results)

# 使用示例
total_lines = parallel_file_processing('huge_file.csv')
print(f"文件总行数: {total_lines}")

三、高级压缩与归档处理

3.1 增量压缩与加密

ZIP加密与增量添加

import zipfile
import os
from tqdm import tqdm  # 进度条库

def create_secure_zip(output_zip, files_to_zip, password):
    """创建加密ZIP文件"""
    with zipfile.ZipFile(
        output_zip, 
        'w', 
        compression=zipfile.ZIP_DEFLATED
    ) as zipf:
        zipf.setpassword(password.encode())
        
        for file in tqdm(files_to_zip, desc="压缩进度"):
            if os.path.isfile(file):
                zipf.write(file)
                
def add_to_existing_zip(zip_file, new_files, password=None):
    """向现有ZIP添加文件"""
    temp_zip = f"temp_{zip_file}"
    os.rename(zip_file, temp_zip)
    
    with zipfile.ZipFile(temp_zip, 'r') as zin:
        with zipfile.ZipFile(zip_file, 'w') as zout:
            if password:
                zout.setpassword(password.encode())
            
            # 复制现有条目
            for item in tqdm(zin.infolist(), desc="复制原有文件"):
                zout.writestr(item, zin.read(item))
            
            # 添加新文件
            for file in tqdm(new_files, desc="添加新文件"):
                if os.path.isfile(file):
                    zout.write(file)
    
    os.remove(temp_zip)

3.2 多卷压缩文件

分卷压缩实现

import zipfile
import math

def split_zip(input_files, output_prefix, max_size_mb):
    """创建分卷ZIP文件"""
    max_size = max_size_mb * 1024 * 1024
    part_num = 1
    current_size = 0
    current_zip = None
    
    for file in input_files:
        file_size = os.path.getsize(file)
        
        # 如果当前ZIP不存在或需要新分卷
        if current_zip is None or current_size + file_size > max_size:
            if current_zip is not None:
                current_zip.close()
            
            zip_name = f"{output_prefix}.zip.{part_num:03d}"
            current_zip = zipfile.ZipFile(
                zip_name, 'w', zipfile.ZIP_DEFLATED)
            part_num += 1
            current_size = 0
        
        current_zip.write(file)
        current_size += file_size
    
    if current_zip is not None:
        current_zip.close()
    
    return part_num - 1  # 返回分卷数量

四、云存储集成

4.1 boto3 - AWS S3集成

S3文件操作示例

import boto3
from botocore.exceptions import ClientError

class S3Manager:
    def __init__(self, bucket_name):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name
    
    def upload_file(self, local_path, s3_key):
        """上传文件到S3"""
        try:
            self.s3.upload_file(
                local_path, 
                self.bucket, 
                s3_key,
                ExtraArgs={
                    'ACL': 'bucket-owner-full-control',
                    'StorageClass': 'INTELLIGENT_TIERING'
                }
            )
            return True
        except ClientError as e:
            print(f"S3上传错误: {e}")
            return False
    
    def download_file(self, s3_key, local_path):
        """从S3下载文件"""
        try:
            self.s3.download_file(self.bucket, s3_key, local_path)
            return True
        except ClientError as e:
            print(f"S3下载错误: {e}")
            return False
    
    def generate_presigned_url(self, s3_key, expires=3600):
        """生成预签名URL"""
        try:
            return self.s3.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket, 'Key': s3_key},
                ExpiresIn=expires
            )
        except ClientError as e:
            print(f"生成URL错误: {e}")
            return None

4.2 阿里云OSS集成

OSS文件操作示例

import oss2
from oss2.exceptions import OssError

class OSSManager:
    def __init__(self, access_key, secret_key, endpoint, bucket_name):
        auth = oss2.Auth(access_key, secret_key)
        self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
    
    def upload_with_progress(self, local_path, oss_key):
        """带进度显示的上传"""
        def progress_callback(consumed_bytes, total_bytes):
            if total_bytes:
                percent = 100 * (consumed_bytes / total_bytes)
                print(f"\r上传进度: {percent:.2f}%", end='')
        
        try:
            result = self.bucket.put_object_from_file(
                oss_key, 
                local_path,
                progress_callback=progress_callback
            )
            print("\n上传完成")
            return result.status == 200
        except OssError as e:
            print(f"\nOSS上传错误: {e}")
            return False
    
    def download_large_file(self, oss_key, local_path, part_size=10*1024*1024):
        """分片下载大文件"""
        try:
            oss2.resumable_download(
                self.bucket, 
                oss_key, 
                local_path,
                part_size=part_size,
                num_threads=4
            )
            return True
        except OssError as e:
            print(f"OSS下载错误: {e}")
            return False

五、文件内容高级处理

5.1 二进制文件解析

结构化二进制文件解析

import struct
from collections import namedtuple

def parse_binary_file(filename):
    """解析自定义二进制格式文件"""
    # 定义文件头结构
    Header = namedtuple('Header', 'magic version num_records')
    header_format = '<4sII'  # 4字节魔数,2个无符号整数
    
    # 定义记录结构
    Record = namedtuple('Record', 'id timestamp value')
    record_format = '<Qdf'  # 无符号长整型,双精度浮点,单精度浮点
    
    with open(filename, 'rb') as f:
        # 读取文件头
        header_data = f.read(struct.calcsize(header_format))
        header = Header._make(struct.unpack(header_format, header_data))
        
        # 验证魔数
        if header.magic != b'BIN1':
            raise ValueError("无效的文件格式")
        
        # 读取记录
        records = []
        for _ in range(header.num_records):
            record_data = f.read(struct.calcsize(record_format))
            records.append(Record._make(struct.unpack(record_format, record_data)))
    
    return header, records

5.2 实时日志文件分析

实时日志分析器

import re
from collections import defaultdict
import time

class LogAnalyzer:
    def __init__(self, log_file, patterns):
        self.log_file = log_file
        self.patterns = patterns
        self.position = 0
        self.stats = defaultdict(int)
    
    def analyze(self):
        """分析日志文件"""
        with open(self.log_file, 'r', encoding='utf-8', errors='ignore') as f:
            # 跳到上次读取位置
            f.seek(self.position)
            
            for line in f:
                for name, pattern in self.patterns.items():
                    if re.search(pattern, line):
                        self.stats[name] += 1
            
            # 更新读取位置
            self.position = f.tell()
        
        return dict(self.stats)
    
    def continuous_monitor(self, interval=5):
        """持续监控日志文件"""
        try:
            while True:
                stats = self.analyze()
                if stats:
                    print(f"\n[{time.ctime()}] 统计结果:")
                    for name, count in stats.items():
                        print(f"{name}: {count}")
                time.sleep(interval)
        except KeyboardInterrupt:
            print("\n监控停止")

# 使用示例
patterns = {
    'ERROR': r'ERROR',
    'WARNING': r'WARNING',
    'HTTP_500': r'HTTP/1.\d"\s+500'
}
analyzer = LogAnalyzer('app.log', patterns)
analyzer.continuous_monitor()

六、安全文件操作

6.1 安全文件写入模式

原子写入模式

import os
import tempfile

def atomic_write(filename, data, mode='w', encoding='utf-8'):
    """原子性写入文件"""
    # 创建临时文件
    temp_dir = os.path.dirname(os.path.abspath(filename))
    with tempfile.NamedTemporaryFile(
        mode=mode,
        dir=temp_dir,
        prefix='.tmp_',
        delete=False,
        encoding=encoding
    ) as tmp:
        tmp.write(data)
        tmp.flush()
        os.fsync(tmp.fileno())
        tempname = tmp.name
    
    # 原子替换
    try:
        os.replace(tempname, filename)
    except:
        os.unlink(tempname)
        raise

6.2 文件权限管理

安全权限设置

import os
import stat

def set_secure_permissions(filename):
    """设置安全文件权限"""
    # 获取当前权限
    current_mode = os.stat(filename).st_mode
    
    # 移除组和其他用户的写权限
    new_mode = current_mode & ~stat.S_IWGRP & ~stat.S_IWOTH
    
    # 如果是敏感文件,移除所有组和其他用户的权限
    if filename.endswith(('.key', '.pem', '.env')):
        new_mode = new_mode & ~stat.S_IRGRP & ~stat.S_IROTH
    
    os.chmod(filename, new_mode)

def create_secure_file(filename, content):
    """创建安全文件"""
    # 使用原子写入
    atomic_write(filename, content)
    
    # 设置安全权限
    set_secure_permissions(filename)
    
    # 验证权限
    mode = os.stat(filename).st_mode
    if mode & (stat.S_IWGRP | stat.S_IWOTH):
        raise RuntimeError("文件权限设置失败")

七、总结

表2:文件操作性能优化技术

场景

优化技术

适用条件

大文件读取

内存映射(mmap)

需要随机访问的大文件

批量小文件

并行处理(multiprocessing)

大量独立小文件

顺序处理

缓冲读写(大缓冲区)

顺序读写场景

网络存储

断点续传/分片上传

不稳定网络环境

实时处理

增量读取(记录位置)

日志监控等场景

压缩文件

流式处理(不解压)

大型压缩文件

建议

  1. 避免频繁的小IO操作,尽量批量处理
  2. 使用with语句确保资源释放
  3. 对大文件考虑内存映射技术
  4. 大量文件处理使用并行技术
  5. 网络存储操作添加重试机制
  6. 关键操作添加原子性保证

通过本教程的高级技巧,我们可以在实际项目中实现更高效、更安全的文件操作,满足企业级应用的需求。


持续更新Python编程学习日志与技巧,敬请关注!



#编程# #python# #在头条记录我的2025#

相关推荐

Python自动化脚本应用与示例(python办公自动化脚本)

Python是编写自动化脚本的绝佳选择,因其语法简洁、库丰富且跨平台兼容性强。以下是Python自动化脚本的常见应用场景及示例,帮助你快速上手:一、常见自动化场景文件与目录操作...

Python文件操作常用库高级应用教程

本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。一、高级文件系统监控1.1watchdog库-实时文件系统监控安装与基本使用:...

Python办公自动化系列篇之六:文件系统与操作系统任务

作为高效办公自动化领域的主流编程语言,Python凭借其优雅的语法结构、完善的技术生态及成熟的第三方工具库集合,已成为企业数字化转型过程中提升运营效率的理想选择。该语言在结构化数据处理、自动化文档生成...

14《Python 办公自动化教程》os 模块操作文件与文件夹

在日常工作中,我们经常会和文件、文件夹打交道,比如将服务器上指定目录下文件进行归档,或将爬虫爬取的数据根据时间创建对应的文件夹/文件,如果这些还依靠手动来进行操作,无疑是费时费力的,这时候Pyt...

python中os模块详解(python os.path模块)

os模块是Python标准库中的一个模块,它提供了与操作系统交互的方法。使用os模块可以方便地执行许多常见的系统任务,如文件和目录操作、进程管理、环境变量管理等。下面是os模块中一些常用的函数和方法:...

21-Python-文件操作(python文件的操作步骤)

在Python中,文件操作是非常重要的一部分,它允许我们读取、写入和修改文件。下面将详细讲解Python文件操作的各个方面,并给出相应的示例。1-打开文件...

轻松玩转Python文件操作:移动、删除

哈喽,大家好,我是木头左!Python文件操作基础在处理计算机文件时,经常需要执行如移动和删除等基本操作。Python提供了一些内置的库来帮助完成这些任务,其中最常用的就是os模块和shutil模块。...

Python 初学者练习:删除文件和文件夹

在本教程中,你将学习如何在Python中删除文件和文件夹。使用os.remove()函数删除文件...

引人遐想,用 Python 获取你想要的“某个人”摄像头照片

仅用来学习,希望给你们有提供到学习上的作用。1.安装库需要安装python3.5以上版本,在官网下载即可。然后安装库opencv-python,安装方式为打开终端输入命令行。...

Python如何使用临时文件和目录(python目录下文件)

在某些项目中,有时候会有大量的临时数据,比如各种日志,这时候我们要做数据分析,并把最后的结果储存起来,这些大量的临时数据如果常驻内存,将消耗大量内存资源,我们可以使用临时文件,存储这些临时数据。使用标...

Linux 下海量文件删除方法效率对比,最慢的竟然是 rm

Linux下海量文件删除方法效率对比,本次参赛选手一共6位,分别是:rm、find、findwithdelete、rsync、Python、Perl.首先建立50万个文件$testfor...

Python 开发工程师必会的 5 个系统命令操作库

当我们需要编写自动化脚本、部署工具、监控程序时,熟练操作系统命令几乎是必备技能。今天就来聊聊我在实际项目中高频使用的5个系统命令操作库,这些可都是能让你效率翻倍的"瑞士军刀"。一...

Python常用文件操作库使用详解(python文件操作选项)

Python生态系统提供了丰富的文件操作库,可以处理各种复杂的文件操作需求。本教程将介绍Python中最常用的文件操作库及其实际应用。一、标准库核心模块1.1os模块-操作系统接口主要功能...

11. 文件与IO操作(文件io和网络io)

本章深入探讨Go语言文件处理与IO操作的核心技术,结合高性能实践与安全规范,提供企业级解决方案。11.1文件读写11.1.1基础操作...

Python os模块的20个应用实例(python中 import os模块用法)

在Python中,...