Python Coroutine 池化实现-程序员宅基地

技术标签: python  大数据  

池化介绍

在当今计算机科学和软件工程的领域中,池化技术如线程池、连接池和对象池等已经成为优化资源利用率和提高软件性能的重要工具。然而,在 Python 的协程领域,我们却很少见到类似于 ThreadPoolExecutor 的 CoroutinePoolExecutor。为什么会这样呢?

首先,Python Coroutine 的特性使得池化技术在协程中的应用相对较少。与像 Golang 这样支持有栈协程的语言不同,Python Coroutine 是无栈的,无法跨核执行,从而限制了协程池发挥多核优势的可能性。

其次,Python Coroutine 的轻量级和快速创建销毁的特性,使得频繁创建和销毁协程并不会带来显著的性能损耗。这也解释了为什么 Python 官方一直没有引入 CoroutinePoolExecutor。

然而,作为开发者,我们仍然可以在特定场景下考虑协程的池化。虽然 Python Coroutine 轻量,但在一些需要大量协程协同工作的应用中,池化技术能够提供更方便、统一的调度子协程的方式。尤其是在涉及到异步操作的同时需要控制并发数量时,协程池的优势就显而易见了。

关于 Python 官方是否会在未来引入类似于 TaskGroup 的 CoroutinePoolExecutor,这或许是一个悬而未决的问题。考虑到 Python 在异步编程方面的快速发展,我们不能排除未来可能性的存在。或许有一天,我们会看到 TaskGroup 引入一个 max_workers 的形参,以更好地支持对协程池的需求。

在实际开发中,我们也可以尝试编写自己的 CoroutinePoolExecutor,以满足特定业务场景的需求。通过合理的设计架构和对数据流的全局考虑,我们可以最大程度地发挥协程池的优势,提高系统的性能和响应速度。

在接下来的文章中,我们将探讨如何设计和实现一个简单的 CoroutinePoolExecutor,以及在实际项目中的应用场景。通过深入理解协程池的工作原理,我们或许能更好地利用这一技术,使我们的异步应用更为高效。

如何开始编写

如何开始编写 CoroutinePoolExecutor,首先我们要明确出其适用范畴、考虑到使用方式和其潜在的风险点:

  • 它并不适用于 Mult Thread + Mult Event Loop 的场景,因此它并非线程安全的。
  • 应当保持和 ThreadPoolExecutor 相同的调用方式。
  • 不同于 Mult Thread 中子线程不依赖于主线程的运行,而在 Mult Coroutine 中子协程必须依赖于主协程,因此主协程在子协程没有全部运行完毕之前不能直接 done 掉。这也解释了为什么 TaskGroup 官方实现中没有提供类似于 shutdown 之类的方法,而是只提供上下文管理的运行方式。

有了上述 3 点的考量,我们决定将 ThreadPoolExecutor 平替成 CoroutinePoolExecutor。这样的好处在于,作为学习者一方面可以了解 ThreadPoolExecutor 的内部实现机制,另一方面站在巨人肩膀上的编程借鉴往往会事半功倍,对于自我的提升也是较为明显的。

在考虑这些因素的同时,我们将继续深入研究协程池的设计和实现。通过对适用范围和使用方式的明确,我们能更好地把握 CoroutinePoolExecutor 的潜在优势,为异步应用的性能提升做出更有针对性的贡献。

具体代码实现

在这里我先贴出完整的代码实现,其中着重点已经用注释标明。

以下是 CoroutinePoolExecutor 的代码实现:

import os
import asyncio
import weakref
import logging
import itertools


async def _worker(executor_reference: "CoroutinePoolExecutor", work_queue: asyncio.Queue):
    try:
        while True:
            work_item = await work_queue.get()

            if work_item is not None:
                await work_item.run()
                del work_item

                executor = executor_reference()
                if executor is not None:
                    # Notify available coroutines
                    executor._idle_semaphore.release()
                del executor
                continue

            # Notifies the next coroutine task that it is time to exit
            await work_queue.put(None)
            break

    except Exception as exc:
        logging.critical('Exception in worker', exc_info=True)


class _WorkItem:
    def __init__(self, future, coro):
        self.future = future
        self.coro = coro

    async def run(self):
        try:
            result = await self.coro
        except Exception as exc:
            self.future.set_exception(exc)
        else:
            self.future.set_result(result)


class CoroutinePoolExecutor:
    """
    Coroutine pool implemented based on ThreadPoolExecutor
    Different from ThreadPoolExecutor, because the running of sub-coroutine depends on the main coroutine
    So you must use the shutdown method to wait for all subtasks and wait for them to complete execution
    """

    # Used to assign unique thread names when coroutine_name_prefix is not supplied.
    _counter = itertools.count().__next__

    def __init__(self, max_workers, coroutine_name_prefix=""):

        if max_workers is None:
            max_workers = min(32, (os.cpu_count() or 1) + 4)
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = asyncio.Queue()
        self._idle_semaphore = asyncio.Semaphore(0)
        self._coroutines = set()
        self._shutdown = False
        self._shutdown_lock = asyncio.Lock()
        self._coroutine_name_prefix = (coroutine_name_prefix or (
            f"{__class__.__name__}-{self._counter()}"
        ))

    async def submit(self, coro):
        async with self._shutdown_lock:
            # When the executor is closed, new coroutine tasks should be rejected, otherwise it will cause the problem that the newly added tasks cannot be executed.
            # This is because after shutdown, all sub-coroutines will end their work
            # one after another. Even if there are new coroutine tasks, they will not
            # be reactivated.
            if self._shutdown:
                raise RuntimeError('cannot schedule new coroutine task after shutdown')

            f = asyncio.Future()
            w = _WorkItem(
                f,
                coro
            )
            await self._work_queue.put(w)
            await self._adjust_coroutine_count()
            return f

    async def _adjust_coroutine_count(self):

        try:
            # 2 functions:
            # - When there is an idle coroutine and the semaphore is not 0, there is no need to create a new sub-coroutine.
            # - Prevent exceptions from modifying self._coroutines members when the for loop self._coroutines and await task in shutdown are modified
            # Since the Semaphore provided by asyncio does not have a timeout
            # parameter, you can choose to use it with wait_for.
            if await asyncio.wait_for(
                    self._idle_semaphore.acquire(),
                    0
            ):
                return
        except TimeoutError:
            pass

        num_coroutines = len(self._coroutines)
        if num_coroutines < self._max_workers:
            coroutine_name = f"{self._coroutine_name_prefix or self}_{num_coroutines}"
            t = asyncio.create_task(
                coro=_worker(
                    weakref.ref(self),
                    self._work_queue
                ),
                name=coroutine_name
            )

            self._coroutines.add(t)

    async def shutdown(self, wait=True, *, cancel_futures=False):
        async with self._shutdown_lock:
            self._shutdown = True

            if cancel_futures:
                while True:
                    try:
                        work_item = self._work_queue.get_nowait()
                    except asyncio.QueueEmpty:
                        break
                    if work_item is not None:
                        work_item.future.cancel()

            # None is an exit signal, given by the shutdown method, when the shutdown method is called
            # will notify the sub-coroutine to stop working and exit the loop
            await self._work_queue.put(None)

        if wait:
            for t in self._coroutines:
                await t

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.shutdown(wait=True)
        return False

以下是 CoroutinePoolExecutor 的使用方式:

import asyncio

from coroutinepoolexecutor import CoroutinePoolExecutor


async def task(i):
    await asyncio.sleep(1)
    print(f"task-{i}")


async def main():
    async with CoroutinePoolExecutor(2) as executor:
        for i in range(10):
            await executor.submit(task(i))

if __name__ == "__main__":
    asyncio.run(main())

我们知道,在线程池中,工作线程一旦创建会不断的领取新的任务并执行,除开 shutdown() 调用,否则对于静态的线程池来讲工作线程不会自己结束。

在上述协程池代码实现中,CoroutinePoolExecutor 类包含了主要的对外调用功能的接口、内部提供了存储 task 的 Queue、工作协程自动生成 name 的计数器、保障协程的信号量锁等等。

而 _worker 函数是工作协程的运行函数,其会在工作协程启动后,不断的从 CoroutinePoolExecutor 的 Queue 中得到 _WorkItem 并由 _WorkItem 具体执行 coro task。

剩下的 _WorkItem 是一个 future 对象与 coro task 的封装器,其功能是解耦 future 对象和 coro task、并在 coro task 运行时和运行后设置 future 的结果。

对于异步循环的思考

在此 CoroutinePoolExecutor 实现后,我其实又有了一个新的思考。Python 的 EventLoop 相较于 Node.js 的 EventLoop 来说其实更加的底层,它有感的暴露了出来。

具体体现在当 Python Event Loop 启动后,如果 main coroutine 停止运行,那么所有的 subtask coroutine 也会停止运行,尤其是对于一些需要清理资源的操作、如 aiohttp 的 close session、CoroutinePoolExecutor 的 shutdown 等都会在某些情况显得无措,说的更具体点就是不知道在什么时候调用。

对于这些问题,我们可以继承 BaseEventLoop 自己手动对 EventLoop 的功能进行扩展,如在事件循环关闭之前添加 hook function,甚至可以限制整个 EventLoop 的 max_workers 或者做成动态的可调节 coroutine 数量的 EventLoop 都行。

无论如何,只要心里有想法,就可以去将它实现 .. 学习本身就是一个不断挑战的过程。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/douluo998/article/details/137833899

智能推荐

NIOS开发积累_iord-程序员宅基地

文章浏览阅读389次。1、IORD/IOWR读写函数原型:IORD (BASE, REGNUM) 输入参数:BASE为寄存器的基地址,REGNUM为寄存器的偏移量 函数说明:从基地址为BASE的设备中读取寄存器中偏移量为REGNUM的单元里面的值。寄存器的值在地址总线的范围之内。 返回值: 有 函数原型:IOWR (BASE, REGNUM, DATA) 输入参数:BASE为寄存器的基地址,REGNUM..._iord

MacBook Pro安装纯windows系统教程_mac装windows教程-程序员宅基地

文章浏览阅读1.2k次,点赞39次,收藏19次。你是否和我一样有一个废弃的MacBook,来一起给它变成windows重新发光发热!!!_mac装windows教程

Unity Dots学习_unity disableautocreation-程序员宅基地

文章浏览阅读765次。我用的Unity版本是Unity 2019.4.19f1c1 (64-bit)Unity中的PacketManager中下载Entities,再下载Hybrid Rer用于显示。Dots入门1.打印一个数字先创建一个Componentusing System.Collections;using System.Collections.Generic;using Unity.Entities;using UnityEngine;public struct PrintComponentData_unity disableautocreation

Swift学习笔记(一)基础语法_swift 学习-程序员宅基地

文章浏览阅读1.9k次,点赞2次,收藏6次。浅学一下Swift,这篇笔记做个记录//引入UIKit框架 import UIKit //定义一个变量,赋值为字符串“hello,playground” var greeting = "Hello, playground"_swift 学习

office运行时错误,部分系统文件可能丢失或已损坏(错误代码:0x80040154)-程序员宅基地

文章浏览阅读1.6w次。在使用Office进行文件操作时,经常会出现如图下所示office运行时错误,部分系统文件可能丢失或已损坏(错误代码:0x80040154)出现这种情况多数是由于Office的安全机制导致的解决步骤:打开左上角 文件 菜单找到 选项 菜单打开后找到 信任中心 同时点击 信任中心设置找到 受保护的视图 ,将右侧选项全部取消勾选即可..._0x80040154

Python|每日一练|整数数组|非重复子集(幂集)|递归:子集 II_给你一个整数数组 nums ,其中可能包含重复元素,请你返回该数组所有可能的子集(幂-程序员宅基地

文章浏览阅读266次。给你一个整数数组nums,其中可能包含重复元素,请你返回该数组所有可能的子集(幂集)。解集不能包含重复的子集。返回的解集中,子集可以按任意顺序排列。_给你一个整数数组 nums ,其中可能包含重复元素,请你返回该数组所有可能的子集(幂

随便推点

linux重定向输入和输出_linux重定向输出原则-程序员宅基地

文章浏览阅读842次。输出重定向将命令的输出发送到一个文件中,会覆盖原有内容date > date.txtcat date.txt2017年 09月 12日 星期二 21:42:30 CST追加who >> date.txtcat date.txt2017年 09月 12日 星期二 21:42:30 CSTroot pts/0 2017-09-12 21:09 (49.77.150.29)输入重_linux重定向输出原则

AI五子棋——超强改进版本_超强五子棋ai-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏6次。【代码】AI五子棋。_超强五子棋ai

Android 内存泄漏总结_android内存泄漏-程序员宅基地

文章浏览阅读132次。#Android 内存泄漏总结内存管理的目的就是让我们在开发中怎么有效的避免我们的应用出现内存泄漏的问题。内存泄漏大家都不陌生了,简单粗俗的讲,就是该被释放的对象没有释放,一直被某个或某些实例所持有却不再被使用导致 GC 不能回收。最近自己阅读了大量相关的文档资料,打算做个 总结 沉淀下来跟大家一起分享和学习,也给自己一个警示,以后 coding 时怎么避免这些情况,提高应用的体验和质量。_android内存泄漏

基于相对熵优化VMD的非局部均值去噪方法_变分模态分解与样本熵-程序员宅基地

文章浏览阅读7.1k次,点赞49次,收藏148次。利用K-L散度(相对熵)确定VMD分解信号的K值和惩罚因子alpha,得到一组信号分量;计算各个分量的样本熵,根据样本熵的值,选取出噪声主导分量和有效分量;对噪声主导信号进行非局部均值(NLM)去噪;将去噪后的信号分量与剩余的有效信号分量进行重构得到去噪信号。_变分模态分解与样本熵

[STM32F1]一文说清STM32F103双通道ADC_DMA采集,可拓展多通道-程序员宅基地

文章浏览阅读6k次,点赞2次,收藏56次。12位是相对于二进制数来说,也就是“111111111111”,转换为十进制就是4095,其实是0-4095,实际上是4096个数,STM32F103的引脚电压是0-3.3V,12位的ADC就会把0-3.3V切割成4096份。因为采用两个通道,所以需要两个缓冲保存数据,100*2=200,DMA在运行是就会采集200个ADC值保存在数组中,100个通道4的值,100个通道5的值。对于STM32F103来说,它的ADC是12位,一共18个通道,其中16个外部通道,2个内部通道。我这里只用了双通道,所以为2..

加载.npz文件时,出现错误:OSError: Failed to interpret file ‘xxx.npz‘ as a pickle-程序员宅基地

文章浏览阅读9.9k次,点赞4次,收藏6次。.npz文件的内容是怎样的,怎么打开?加载.npz文件时,出现错误:OSError: Failed to interpret file ‘xxx.npz’ as a pickle首先了解pickle的定义:pickle: 用于python特有的类型和python的数据类型间进行转换pickle提供四个功能:dumps,dump,loads,loadpickle可以存储所有python支持的原生类型(bool,int,float,string,byte,none等),由任何原生类型组成的列表、元_as a pickle

推荐文章

热门文章

相关标签