Python异步库asyncio使用方法
一、异步编程概述
1、异步编程的概念和优势
异步编程是一种编写能够在单线程中同时处理多个任务的编程方式。与传统的同步编程相比,异步编程的主要优势在于能够提高程序的并发性和响应性,尤其适用于IO密集型任务,如网络通信、数据库访问等。
asyncio
是Python 3.4版本引入的标准库,用于实现异步编程。它基于事件循环(Event Loop)模型,通过协程(Coroutines)来实现任务的切换和调度。在asyncio
中,我们可以使用async
关键字定义协程函数,并使用await
关键字挂起协程的执行。
异步编程的核心思想是避免阻塞,即当一个任务在等待某个操作完成时,可以让出CPU执行权,让其他任务继续执行。这样可以充分利用CPU的时间片,提高程序的整体效率。
2、异步编程示例
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
await asyncio.gather(
hello(),
hello()
)
asyncio.run(main())
定义了两个协程函数hello()
,每个函数打印一条消息,并通过await asyncio.sleep(1)
实现了一个模拟的耗时操作。在main()
函数中,使用asyncio.gather()
方法并发运行两个协程任务。最后,通过asyncio.run()
来运行主函数。
这个示例展示了异步编程的基本特点:协程函数能够在等待耗时操作时挂起执行,让其他任务继续执行,从而实现并发执行多个任务。
二、协程基础
1、协程的概念和特点
协程(Coroutines)是异步编程中的一种特殊函数,它可以在运行过程中暂停并记录当前状态,然后在需要时恢复执行。协程非常适合处理那些可能会被中断和恢复的任务,如IO操作、计算密集型任务等。
2、async
和await
关键字的用法
在Python中,可以通过使用async
关键字定义协程函数,使用await
关键字挂起协程的执行,等待耗时操作完成后再恢复执行。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(hello())
在上述示例中,我们定义了一个协程函数hello()
,它打印一条消息,然后通过await asyncio.sleep(1)
实现了一个模拟的耗时操作。最后,通过asyncio.run()
来运行协程函数。
Hello
World
可以看到,在协程函数中,通过await
关键字挂起协程的执行,等待耗时操作完成。这样,协程函数可以在等待期间让出CPU执行权,让其他任务继续执行。
需要注意的是,协程函数必须通过await
关键字与具体的异步操作进行配合使用。await
后面可以跟一个协程对象、一个实现了__await__()
方法的对象(如asyncio.sleep()
方法),或者一些其他的异步操作。
协程函数的定义方式与普通函数相同,只需在函数声明处使用async
关键字即可。在协程函数中,可以使用return
返回结果,也可以不返回任何内容(即不使用return
语句)。
三、事件循环(Event Loop)基础
1、事件循环的概念和原理
事件循环是异步编程的核心机制,它负责协调和调度协程的执行,以及处理IO操作和定时器等事件。它会循环监听事件的发生,并根据事件的类型选择适当的协程进行调度。
在asyncio
库中,可以通过asyncio.get_event_loop()
方法获取默认的事件循环对象,也可以使用asyncio.new_event_loop()
方法创建新的事件循环对象。
2、事件循环的核心方法
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
在上述示例中,我们首先通过asyncio.get_event_loop()
方法获取默认的事件循环对象,然后使用loop.run_until_complete()
方法运行协程函数。最后,通过loop.close()
关闭事件循环。
在事件循环中,协程函数会按照调度规则进行执行。当遇到await
关键字时,协程函数会暂时挂起,并将控制权让给其他协程。当await
后面的耗时操作完成后,事件循环会恢复被挂起的协程的执行。
需要注意的是,run_until_complete()
方法接受一个可等待对象作为参数,可以是协程对象、任务对象或者Future对象。它会持续运行事件循环,直到可等待对象完成。
另外,使用asyncio
库创建和运行事件循环还有其他的方式,例如使用asyncio.run()
方法来简化上述代码:
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
asyncio.run(hello())
四、异步IO操作
1、异步IO操作的概念
在异步编程中,IO操作(如网络请求、文件读写等)通常是耗时的操作,阻塞式的IO操作会导致程序在等待IO完成时无法做其他事情。而异步IO操作则可以在等待IO完成时让出CPU执行权,以便同时处理其他任务。
在Python中,可以通过asyncio
库来实现异步IO操作。asyncio
提供了一系列的函数和类来管理异步IO操作,例如使用asyncio.open()
来异步打开文件、使用asyncio.wait()
来等待多个异步任务完成等。
2、IO操作方法
import asyncio
async def read_file(file_path):
async with asyncio.open(file_path, 'r') as file:
data = await file.read()
print(data)
asyncio.run(read_file('example.txt'))
在上述示例中,我们定义了一个协程函数read_file()
,它使用asyncio.open()
来异步打开文件,并使用await file.read()
来异步读取文件内容。最后,通过asyncio.run()
运行协程函数。
需要注意的是,asyncio.open()
返回一个AsyncIOModeReader
对象,可以使用await
关键字来读取文件内容。另外,在使用asyncio.open()
时,也可以通过指定文件的编码方式和其他参数来进行相关配置。
除了使用asyncio.open()
来进行异步文件操作外,asyncio
还提供了许多其他的异步IO操作函数和类,如asyncio.create_connection()
用于异步创建网络连接、asyncio.start_server()
用于异步启动服务器等。可以根据具体的需求选择合适的方法来进行异步IO操作。
除了读取文件,我们还可以使用异步的文件对象执行异步写入操作。
import asyncio
async def write_file(file_path, content):
try:
async with asyncio.open_file(file_path, 'w') as file:
await file.write(content)
print("文件写入成功")
except OSError:
print("写入文件失败")
async def main():
await write_file("myfile.txt", "Hello, world!")
asyncio.run(main())
在上述示例中,我们定义了一个write_file()
函数,该函数用于异步写入文件内容。我们使用asyncio.open_file()
函数来打开文件,并指定文件路径和打开模式(例如'w'表示写入模式)。
然后,我们使用await file.write(content)
来异步将内容写入文件。最后,我们打印输出写入文件成功的消息。
五、定时器和延迟执行
在异步编程中,我们常常需要在一定的时间间隔内执行某些任务,或者延迟一段时间后执行某些操作。asyncio
库提供了定时器和延迟执行的功能,使我们能够方便地实现这些需求。
要创建一个定时器,可以使用asyncio.sleep()
函数。该函数会暂停当前任务的执行,并在指定的时间间隔之后恢复执行。
import asyncio
async def task():
print("开始任务")
await asyncio.sleep(3)
print("任务完成")
async def main():
print("程序开始")
await task()
print("程序结束")
asyncio.run(main())
在上述示例中,我们定义了一个task()
函数,其中使用await asyncio.sleep(3)
来创建一个定时器,指定了3秒的时间间隔。
当我们运行main()
函数时,会输出"程序开始",然后调用task()
函数。在调用await asyncio.sleep(3)
后,程序会暂停3秒钟,然后才会继续执行后续的代码。
因此,会输出"开始任务",然后等待3秒后输出"任务完成",最后输出"程序结束"。
除了使用定时器外,我们还可以使用asyncio.ensure_future()
函数和asyncio.gather()
函数来实现延迟执行任务。
asyncio.ensure_future()
函数接受一个协程对象作为参数,并将其封装为一个Task
对象,表示一个可等待的任务。
import asyncio
async def task():
print("执行任务")
async def main():
print("程序开始")
future = asyncio.ensure_future(task())
await asyncio.sleep(3)
await future
print("程序结束")
asyncio.run(main())
在上述示例中,我们使用asyncio.ensure_future()
函数将task()
函数封装为一个Task
对象,并将其赋值给变量future
。
然后,我们使用await asyncio.sleep(3)
创建一个定时器来延迟执行任务,等待3秒钟。
最后,使用await future
来等待任务的完成,即延迟执行的任务。
另外,asyncio.gather()
函数可以同时运行多个协程,并等待它们全部完成。
import asyncio
async def task1():
print("任务1开始")
await asyncio.sleep(2)
print("任务1完成")
async def task2():
print("任务2开始")
await asyncio.sleep(3)
print("任务2完成")
async def main():
print("程序开始")
await asyncio.gather(task1(), task2())
print("程序结束")
asyncio.run(main())
在上述示例中,我们定义了两个task1()
和task2()
函数,分别表示两个需要执行的任务。
在main()
函数中,使用await asyncio.gather(task1(), task2())
来同时运行这两个任务,并等待它们全部完成。
六、异步网络通信
1、介绍异步网络编程的概念
在异步编程中,异步网络请求是一种常见的任务。它允许我们以非阻塞的方式发送网络请求并同时执行其他任务,而无需等待响应返回。
异步网络请求的概念是通过在发送请求时立即返回一个可等待对象,在后台进行网络通信,并在响应可用时进行处理。
在Python中,我们可以使用asyncio
库来实现异步网络请求。asyncio
提供了一些函数和类来进行异步的网络通信,如aiohttp
库。
2、异步网络通信的方法
import asyncio
import aiohttp
async def fetch_data(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.text()
print(data)
async def main():
await fetch_data("https://www.example.com")
asyncio.run(main())
在上述示例中,我们定义了一个fetch_data()
函数,用于异步获取指定URL的数据。
首先,我们创建一个异步的HTTP客户端会话,使用aiohttp.ClientSession()
。然后,我们使用session.get()
方法发送GET请求并获取响应对象。
接下来,我们使用await response.text()
来异步获取响应的文本数据,并将其存储在变量data
中。
最后,我们可以根据需要对获取到的数据进行处理,这里只是简单地将其打印输出。
需要注意的是,在使用完异步的HTTP客户端会话后,我们使用async with
语句来自动关闭会话,以释放相关资源。
当我们运行main()
函数时,会执行异步获取数据的操作。而在等待数据返回的过程中,程序可以继续执行其他任务。
七、异步数据库操作
在异步编程中,与数据库的交互也是一项常见的任务。asyncio
库提供了一些函数和类来实现异步的数据库操作。
使用asyncio
封装一个关于Oracle操作的类时,你可以创建一个OracleHandler
类,并在其中定义增删改查等操作的方法。下面是一个示例代码,其中包含了插入、删除、更新和查询方法的实现:
import asyncio
import cx_Oracle
class OracleHandler:
def __init__(self, username, password, hostname, port, sid):
self.username = username
self.password = password
self.hostname = hostname
self.port = port
self.sid = sid
async def connect(self):
"""建立与Oracle数据库的连接"""
connection = await cx_Oracle.connect(f"{self.username}/{self.password}@{self.hostname}:{self.port}/{self.sid}")
return connection
async def disconnect(self, connection):
"""断开与Oracle数据库的连接"""
await connection.close()
async def insert(self, table_name, data):
"""插入数据"""
connection = await self.connect()
try:
cursor = await connection.cursor()
query = f"INSERT INTO {table_name} (column1, column2, ...) VALUES (:value1, :value2, ...)"
await cursor.execute(query, data)
await connection.commit()
finally:
await self.disconnect(connection)
async def delete(self, table_name, condition):
"""删除数据"""
connection = await self.connect()
try:
cursor = await connection.cursor()
query = f"DELETE FROM {table_name} WHERE {condition}"
await cursor.execute(query)
await connection.commit()
finally:
await self.disconnect(connection)
async def update(self, table_name, data, condition):
"""更新数据"""
connection = await self.connect()
try:
cursor = await connection.cursor()
query = f"UPDATE {table_name} SET column1 = :value1, column2 = :value2, ... WHERE {condition}"
await cursor.execute(query, data)
await connection.commit()
finally:
await self.disconnect(connection)
async def select(self, table_name, columns, condition):
"""查询数据"""
connection = await self.connect()
try:
cursor = await connection.cursor()
query = f"SELECT {columns} FROM {table_name} WHERE {condition}"
await cursor.execute(query)
result = await cursor.fetchall()
return result
finally:
await self.disconnect(connection)
async def main():
handler = OracleHandler("username", "password", "hostname", "port", "sid")
data = {"value1": "foo", "value2": "bar"}
await handler.insert("table_name", data)
condition = "column1 = 'foo'"
await handler.delete("table_name", condition)
data = {"value1": "new_value"}
condition = "column2 = 'bar'"
await handler.update("table_name", data, condition)
columns = "column1, column2"
condition = "column1 = 'foo'"
result = await handler.select("table_name", columns, condition)
print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在上述代码中,你需要根据实际情况替换OracleHandler
类的构造函数中的用户名、密码、主机名、端口和SID等参数。然后,你可以在main()
函数中调用OracleHandler
类的方法来执行对应的操作。
八、参考
1、https://zhuanlan.zhihu.com/p/59621713