Post

python异步

python异步

asyncio

aiohttp

aiokafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer  
from .setting import Settings  
import json  
import asyncio

settings = Settings()


async def main_producer():  
    producer = AIOKafkaProducer(  
        bootstrap_servers=settings.kafka_bootstrap_servers,  
    )  
    msg = {  
        'order_id': '1234567890',  
        'order_date': '2021-01-01',  
    }  
    try:  
        await producer.start()  
        result = await producer.send_and_wait(  
            topic=settings.topic,  
            value=json.dumps(msg).encode('utf-8')  
        )  
        print(result)  
    except Exception as e:  
        print(str(e))  
    finally:  
        await producer.stop()


async def main_consumer():  
    consumer = AIOKafkaConsumer(  
        topic=settings.topic,  
        bootstrap_servers=settings.kafka_bootstrap_servers,  
        group_id=settings.group_id,  
        auto_offset_reset='earliest',  
        value_deserializer=lambda x: json.loads(x.decode('utf-8')),  
        session_timeout_ms=settings.session_timeout_ms,  
        heartbeat_interval_ms=settings.heartbeat_interval_ms,  
        max_poll_interval_ms=settings.max_poll_interval_ms  
    )  
    try:  
        await consumer.start()  
        async for msg in consumer:  
            data = msg.value  
            print(data)  
            await consumer.commit()  
    except Exception as e:  
        print(str(e))


if __name__ == '__main__':  
    asyncio.run(main_producer())

aiobotocore

异步版本的AWS SDK for Python

  • 提供异步接口访问AWS云服务器(如s3, SQS, EC2等)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import aiobotocore

boto_session = aiobotocore.get_session()
bucket = "bucket-name"
key = "file-key"

# 创建s3客户端(支持腾讯云COS)
async with boto_session.create_client("s3", **config.qcloud_info) as s3_client:
    # 下载文件
    response = await s3_client.get_object(Bucket=bucket, Key=key)
    async with response["Body"] as stream:
        data = await stream.read()

    # 上传文件
    data = "this is a data"
    await s3_client.put_object(Bucket=bucket, Key=key, Body=data)
1
2
3
4
5
6
qcloud_info = {
    "aws_access_key_id": "your_access_key",
    "aws_secret_access_key": "your_secret_key",
    "endpoint_url": "https://cos.ap-beijing-1.myqcloud.com",
    "region_name": "ap-beijing-1"
}

aioredis

创建连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import aioredis

# 基本连接
redis = aioredis.from_url("redis://localhost")

# 带参数的连接
redis = aioredis.from_url(
    "redis://localhost:6379",
    encoding="utf-8",
    decode_responses=True,
    password="password",
    db=0
)

# 使用连接池
redis = aioredis.from_url(
    "redis://localhost:6379"m
    max_connection=20
)

# 关闭连接
await redis.close()

基本操作

1
2
3
4
5
6
7
8
9
10
11
12
# 设置键值对
await redis.set("key", "value")
value = await redis.get("key")

# 设置带过期时间的键值对
await redis.setex("key", 60, "value")  # 1分钟过期

# 检查键是否存在
exists = await redis.exists("key")

# 删除键
await redis.delete("key")

哈希操作

1
2
3
4
5
6
7
await redis.hset("user:1", mapping={
    "name": "alice",
    "age": "25"
})

name = await redis.hget("user:1", "name")
user_data = await redis.hgetall("user:1")
This post is licensed under CC BY 4.0 by the author.