python异步
python异步
asyncio
aiohttp
aiokafka
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from .setting import Settings
import json
import asyncio
settings = Settings()
async def main_producer():
producer = AIOKafkaProducer(
bootstrap_servers=settings.kafka_bootstrap_servers,
)
msg = {
'order_id': '1234567890',
'order_date': '2021-01-01',
}
try:
await producer.start()
result = await producer.send_and_wait(
topic=settings.topic,
value=json.dumps(msg).encode('utf-8')
)
print(result)
except Exception as e:
print(str(e))
finally:
await producer.stop()
async def main_consumer():
consumer = AIOKafkaConsumer(
topic=settings.topic,
bootstrap_servers=settings.kafka_bootstrap_servers,
group_id=settings.group_id,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
session_timeout_ms=settings.session_timeout_ms,
heartbeat_interval_ms=settings.heartbeat_interval_ms,
max_poll_interval_ms=settings.max_poll_interval_ms
)
try:
await consumer.start()
async for msg in consumer:
data = msg.value
print(data)
await consumer.commit()
except Exception as e:
print(str(e))
if __name__ == '__main__':
asyncio.run(main_producer())
aiobotocore
异步版本的AWS SDK for Python
- 提供异步接口访问AWS云服务器(如s3, SQS, EC2等)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import aiobotocore
boto_session = aiobotocore.get_session()
bucket = "bucket-name"
key = "file-key"
# 创建s3客户端(支持腾讯云COS)
async with boto_session.create_client("s3", **config.qcloud_info) as s3_client:
# 下载文件
response = await s3_client.get_object(Bucket=bucket, Key=key)
async with response["Body"] as stream:
data = await stream.read()
# 上传文件
data = "this is a data"
await s3_client.put_object(Bucket=bucket, Key=key, Body=data)
1
2
3
4
5
6
qcloud_info = {
"aws_access_key_id": "your_access_key",
"aws_secret_access_key": "your_secret_key",
"endpoint_url": "https://cos.ap-beijing-1.myqcloud.com",
"region_name": "ap-beijing-1"
}
aioredis
创建连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import aioredis
# 基本连接
redis = aioredis.from_url("redis://localhost")
# 带参数的连接
redis = aioredis.from_url(
"redis://localhost:6379",
encoding="utf-8",
decode_responses=True,
password="password",
db=0
)
# 使用连接池
redis = aioredis.from_url(
"redis://localhost:6379"m
max_connection=20
)
# 关闭连接
await redis.close()
基本操作
1
2
3
4
5
6
7
8
9
10
11
12
# 设置键值对
await redis.set("key", "value")
value = await redis.get("key")
# 设置带过期时间的键值对
await redis.setex("key", 60, "value") # 1分钟过期
# 检查键是否存在
exists = await redis.exists("key")
# 删除键
await redis.delete("key")
哈希操作
1
2
3
4
5
6
7
await redis.hset("user:1", mapping={
"name": "alice",
"age": "25"
})
name = await redis.hget("user:1", "name")
user_data = await redis.hgetall("user:1")
This post is licensed under CC BY 4.0 by the author.