开发者

Python异步编程入门之实现文件批处理的并发处理方式

开发者 https://www.devze.com 2024-10-29 09:26 出处:网络 作者: engchina
目录引言代码实现1. 日志配置2. 异步批处理类3. 异步批处理入口函数代码解释总结引言
目录
  • 引言
  • 代码实现
    • 1. 日志配置
    • 2. 异步批处理类
    • 3. 异步批处理入口函数
    • 代码解释
  • 总结

    引言

    在现代软件开发中,处理大量文件或数据时,提高处理效率和并发性是非常重要的。

    python 的 asyncio 库提供了一种强大的方式来实现异步编程,从而提高程序的并发处理能力。

    本文将面向 Python 初级程序员,介绍如何使用 asynciologging 模块来实现一个异步批处理文件的并发处理系统。

    代码实现

    1. 日志配置

    首先,我们需要配置日志系统,以便在处理文件时记录日志信息。

    日志配置包括设置日志格式和输出位置。

    import logging
    import os
    
    # 获取当前文件的绝对路径
    current_file = os.path.abspath(__file__)
    
    # 配置日志格式
    log_format = '%(asctime)s - %(levelname)s - %(pathname)s:%(lineno)d - %(message)s'
    logging.basicConfig(format=log_format, level=logging.INFO)
    
    # 创建一个文件处理器,并将日志输出到文件
    file_handler = logging.FileHandler('app.log')
    file_handler.setFormatter(logging.Formatter(log_format))
    logging.getLogger().addHandler(file_handler)

    2. 异步批处理类

    接下来,我们定义一个 AsyncBATchProcessor 类,用于处理批量文件。

    该类使用 asyncio.Semaphore 来控制并发任务的数量。

    import asyncio
    import random
    
    DEFAULT_MAX_CONCURRENT_TASKS = 2  # 最大并发任务数
    MAX_RETRIES = 3  # 最大重试次数
    
    class AsyncBatchProcessor:
        def __init__(self, max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS):
            self.max_concurrent = max_concurrent
            self.semphpaphore = asyncio.Semaphore(max_concurrent)
    
        async def process_single_www.devze.comfile(
                self,
                input_file: str,
                retry_count: int = 0
        ) -> None:
            """处理单个文件的异步方法"""
            async with self.semaphore:  # 使用信号量控制并发
                try:
                    logging.info(f"Processing file: {input_file}")
    
                    # 模拟文件处理过程
                    await asyncio.sleep(random.uniform(0.5, 2.0))
    
                    logging.info(f"Successfully processed {input_file}")
    
                except Exception as e:
                    logging.error(f"Error processing {input_file} of Attempt {retry_count}: {str(e)}")
                    if retry_count < MAX_RETRIES:
                        logging.info(f"Retrying {input_file} (Attempt {retry_count + 1})")
                        await asyncio.sleep(1)
                        await self.process_single_file(input_file, retry_count + 1)
                    else:
                        logging.error(f"Failed to process {input_file} after {MAX_RETRIES} attempts")
    
        async def process_batch(
                self,
                file_list: list
        ) -> None:
            total_files = len(file_list)
            logging.info(f"Found {total_files} files to process")
    
            # 创建工作队列
            queue = asyncio.Queue()
    
            # 将所有文件放入队列
            for file_path in file_list:
                await queue.put(file_path)
    
            # 创建工作协程
            async def worker(worker_id: int):
                while True:
                    try:
                        # 非阻塞方式获取任务
                        input_file_path = await queue.get()
                        logging.info(f"Worker {worker_id} processing: {input_file_path}")
    
                        try:
                            await self.process_single_file(input_file_path)
                        except Exception as e:
                            logging.error(f"Error processing {input_file_path}: {str(e)}")
                        finally:
                            queue.task_done()
    
                    except asyncio.QueueEmpty:
                        # 队列为空,工作结束
                        break
                    except Exception as e:
                js        logging.error(f"Worker {worker_id} encountered error: {str(e)}")
                        break
    
            # 创建工作任务
            workers = []
            for i in range(self.max_concurrent):
          android      worker_task = asyncio.create_task(worker(i))
                workers.append(worker_task)
    
            # 等待队列处理完成
            await queue.join()
    
            # 取消所有仍在运行的工作任务
            for w in workers:
                w.cancel()
    
            # 等待所有工作任务完成
            await asyncio.gather(*workeRYfstwUpIurs, return_exceptions=True)

    3. 异步批处理入口函数

    最后,我们定义一个异步批处理入口函数 batch_detect,用于启动批处理任务。

    async def batch_detect(
            file_list: list,
            max_concurrent: int = DEFAULT_MAX_CONCURRENT_TASKS
    ):
        """异步批处理入口函数"""
        processor = AsyncBatchProcessor(max_concurrent)
        await processor.process_batch(file_list)
    
    # 示例调用
    file_list = ["file1.pdf", "file2.pdf", "file3.pdf", "file4.pdf"]
    asyncio.run(batch_detect(file_list))

    代码解释

    1.日志配置

    • 使用 logging 模块记录日志信息,包括时间、日志级别、文件路径和行号、以及日志消息。
    • 日志输出到文件 app.log 中,便于后续查看和分析。

    2.异步批处理类 AsyncBatchProcessor

    • __init__ 方法初始化最大并发任务数和信号量。
    • process_single_file 方法处理单个文件,使用信号量控制并发,模拟文件处理过程,并在失败时重试。
    • process_batch 方法处理批量文件,创建工作队列和协程,控制并发任务的执行。

    3.异步批处理入口函数 batch_detect

    • 创建 AsyncBatchProcessor 实例,并调用 process_batch 方法启动批处理任务。

    总结

    通过使用 asynciologging 模块,我们实现了一个高效的异步批处理文件系统。

    该系统能够并发处理大量文件,并在处理失败时自动重试,直到达到最大重试次数。

    日志系统帮助我们记录每个文件的处理过程,便于后续的调试和分析。

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    关注公众号