引言

  • 人多力量大,一个人干活慢多叫几个人,单线程处理慢多开几个线程是不是就快了?
  • 读取文件速度远远小于文件处理速度,我用多线程来提高效率合适吗?
  • 多线程和异步有什么关系?他们适合的场景怎么区分?
  • 多个方法都想同时访问数据库怎样才能保证一致性?

要想回答这些问题,首先先搞懂提到的几个词儿都是啥意思。

单线程与多线程

Q: 啥是多线程?
A: 顾名思义,多线程就是多个线程协同完成一件事。啥?你问我线程是啥,😓请先百度了解线程基本概念。

以完成一件任务为例,单线程就是一个人一摸黑从头干到尾,多线程就是多个人一起分工干完。
你可能就要说了:这多个人干指定快啊,人多力量大么💪!

请注意多个人干有个条件:分工

回想一下你参与过的团队任务,是否每次分工都是最合理的,当分工不合理时往往达不到最高效率,甚至不如哥们单干来的快。

同理一个程序单线程不一定比多线程慢,当多线程分工合理时不会因为其中某个线程速度跟不上而导致整体进度停滞,当分工不合理时线程间可能会产生资源竞争导致死锁等问题,就和团队合作里有人吵架一样⚔。

所以要权衡单线程和多线程的使用,要想提高效率可以考虑多线程,但是同时要充分解决多线程的资源竞争等问题。

同步与异步

Q: 我程序处理速度杠杠的,但是磁盘读写速度不争气,拖累整体的处理效率,这种情况给他搞多线程有用没🧐?
A: 没多大用,太麻烦,创建销毁线程也要耗费时间和资源,这时候就应该掏出利器:异步

异步又是什么玩意儿?
以约哥们出去玩为例,同步就像两个人打电话,必须等电话接通了讲话才能被对方听到,异步就像两个人发微信,并不需要对方在线,对方在收到微信消息提醒后就知道有人联系他了。
异步有什么好处?
还是上面的例子,异步就是消息发送后就不用管了,可以先去换身衣服,等哥们回消息了就出发,同步就必须把手机带着,接通后对方才知道你打电话干啥。

多线程与异步

Q: 又是单线程多线程又是同步异步都搞混了😵,他们都有什么区别?

概念 多线程 异步编程
承载体 线程 协程
调度方式 线程调度 事件循环调度
并发模式 多个线程并行执行 单个线程内任务切换
主要难点 资源竞争,死锁,同步 异步事件的流程安排
应用场景 CPU密集型 IO密集型
  • 多线程多个线程在一个进程里,异步多个协程在一个线程里。

  • 多个协程可以在一个线程里通过事件循环协作调度。

  • 协程都在一个线程里就天然具备资源共享的优势,不需要系统内核介入。

  • 异步中协程是假并行真并发,同一时间单个线程中只有一个协程在工作,通过程序自己使用await给别的协程让出控制权,多线程是真并行,多核CPU几个核心同时运行,通过系统进行抢占式调度,线程切换也是内核介入切换。

控制权这里就充分体现了异步和多线程的区别:就像排队半业务,异步就说“你的业务简单,你先办,我多等会儿”,多线程就说“我先来的我先办,你一边等着去”

异步概念专业的说法:异步编程允许在单个线程中通过协程实现并发,利用非阻塞IO和协作式调度,在等待IO时切换到其他任务,从而高效利用线程资源

适用场景具体例子:

  • 需要大量计算的任务,例如图像处理,GPU计算的任务,使用多线程能大大提升效率。
  • 需要频繁互相调用的,例如处理大量并发连接,数据库访问,外部网络请求的任务,使用异步编程更加合适。

误区:
Q: 同一个程序部署多套是多线程吗?
A: 同一程序多套部署是多进程不是多线程,虽然也能提高一点处理能力,但多进程的进程之间相互独立,内存隔离,而多线程是发生在单个程序进程中,多个线程共享同一片资源共享区,并且不同的线程之间可以通过较低成本实现线程间通信协作,而进程间要想协作必须通过RPC这种较高成本的方式。

数据库多连接

Q: 实际开发中,因为线程间需要协作,因此往往会发生资源竞争,例如竞争同一个数据库连接,竞争同一个文件的读写,怎么应对这种竞争?
A: 建立共用的连接池对连接进行管理,或者每个线程建立各自的连接

连接池

使用队列建立一个连接池,下面是一个测试的样例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from queue import Queue
from threading import Thread
from time import sleep

def create_db_connection():
# 在这创建新的数据库连接
num = 10
return num

class ConnectionPool:
"""
连接池
"""
def __init__(self, max_connections):
self._pool = Queue(maxsize=max_connections)
# 初始化时往连接池放入maxsize个连接
for _ in range(max_connections):
self._pool.put(create_db_connection())

def get_connection(self):
conn = self._pool.get()
return conn

def release_connection(self, conn):
self._pool.put(conn)

# ----------------测试-------------------------
pool = ConnectionPool(2)

def worker(name):
conn = pool.get_connection()
print(f"{name} 获取连接")
for i in range(conn):
print(f"name={name},i={i}")
sleep(1)
pool.release_connection(conn)
print(f"{name} 释放连接")

# 启动 4 个线程(maxsize=2,应有 2 个等待)
threads = [Thread(target=worker, args=(f"Thread-{i}",)) for i in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()

上面例子中,使用队列建立连接池,连接数可设置,当无连接可用时阻塞线程。

注意多线程和多进程的区别,这里的连接池是为同一个进程的不同线程服务的,不同的进程中的连接池是互不影响的,例如在进程A中和进程B中都调用连接池中初始化的pool对象,但实际上是在进程A和B中分别创建了一个pool对象,这两个对象的连接数是没有关系的,可以通过在程序中print(id(pool))来验证是不是相同的对象。

连接池大小建议:(核心线程数 × 2) + 磁盘数(根据实际压力测试调整)

独立连接

在线程中创建连接,而不是在进程中创建一个全局的连接,给不同的方法来共享使用。

创建全局连接被不同的线程使用可能会造成以下后果:

  • SQL命令交叉执行

    线程A执行SELECT * FROM employees后尚未获取结果
    线程B的SELECT * FROM departments已发送到连接
    导致线程A获取到部门数据而非员工数据

  • 事务状态冲突

    线程A开启事务(BEGIN)但未提交
    线程B尝试提交(COMMIT)会意外提交线程A的事务
    或线程B的查询看到线程A未提交的中间状态(脏读)

  • 游标对象混乱

    Oracle连接在同一时间只能激活一个游标
    多线程竞争会导致cursor.fetchall()读取到错误数据

连接池示例

Q: 利用线程池来管理Oracle连接怎么做?
A: 使用python提供的oracle操作包cx_Oracle,这个包中实现了SessionPool线程池类,下面是使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import cx_Oracle
import threading

# 创建连接池
pool = cx_Oracle.SessionPool(
user="your_username",
password="your_password",
dsn="localhost/orclpdb1", # Oracle服务名
min=2, # 最小连接数
max=5, # 最大连接数
increment=1, # 每次新增连接数
threaded=True, # 明确启用线程安全模式
encoding="UTF-8"
)

def query_from_pool(sql):
# 从池中获取连接,使用with确保离开代码块释放连接,代码块发生异常也会释放连接
with pool.acquire() as conn:
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
cursor.close()
return result

# 多线程测试
def thread_worker(sql):
print(query_from_pool(sql))

threads = [
threading.Thread(target=thread_worker, args=("SELECT * FROM employees WHERE rownum <= 5",))
for _ in range(3)
]
for t in threads: t.start()
for t in threads: t.join()

# 关闭连接池(应用退出时执行)
pool.close()

SessionPool的参数说明:

参数 可选值 含义
min Integer 连接池中保持的最小连接数(即使空闲)
max Integer 连接池中允许同时进行的最大连接数(超出后阻塞或报错)
increment Integer 需要新建连接时一次性新建连接数
threaded True/False 是否多线程使用(多线程会保证线程安全)

对于上面提到的超出后阻塞还是报错使用getmode参数:

getmode值 含义
cx_Oracle.SPOOL_ATTRVAL_WAIT 默认阻塞
cx_Oracle.SPOOL_ATTRVAL_NOWAIT 立即报错
  • 默认阻塞就是达到最大连接数后还没有分配到连接时就阻塞等待的模式,生产中用的比较多;
  • 立即报错就是达到最大连接数后还没有分配到连接时立即报错返回,常用在测试数据量等场景。

永远不要跨线程共享Oracle连接对象

FastAPI的异步支持

Q: 上面的例子中都使用了多线程的手法,来个异步的示例瞧瞧?
A: FastAPI中使用async和await搭配来实现异步,示例说来就来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

from fastapi import FastAPI
import asyncio
import uvicorn

# 创建 FastAPI 应用
app = FastAPI(title="异步示例API", description="一个演示FastAPI异步特性的示例")

# 这里模拟一个异步网络请求
async def mock_external_call():
# 实际项目中这里可能是 await httpx.get(...)
await asyncio.sleep(10) # 模拟网络延迟
# time.sleep(10)
return {"data": "来自外部API的响应"}

# 模拟异步外部API调用
@app.get("/external-data/")
async def fetch_external_data():
"""模拟异步调用外部API"""
response = await mock_external_call()
return {"status": "success", "result": response}

@app.get("/say-hello/")
async def sayHello():
"""模拟异步调用外部API"""
return {"status": "success", "result": "hello world"}

if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)

代码理解:

  • 路由函数fetch_external_data()使用async修饰表示它是一个协程,可以非阻塞地处理其他路由请求
  • 在fetch_external_data()中我们异步调用mock_external_call(),在此方法中使用asyncio.sleep(10)来模拟外部响应花费了10s
  • 可以通过浏览器或Postman同时访问external-data和say-hello服务,可以看到在external-data服务等待结果时say-hello服务也能正常返回

Q: 我注意到上面示例代码中模拟网络延迟使用的是asyncio.sleep()方法,为什么不使用常用的time.sleep()方法?

A: time.sleep()方法是同步阻塞的的方法,会完全阻塞当前线程,让操作系统将线程挂起,而在异步方法中所有的异步协程都运行在同一个线程的事件循环上。事件循环管理同一个线程上的不同写成来实现异步,当阻塞了当前线程事件循环就无法调度其他协程了。
当然也提供了异步阻塞的方法asyncio.sleep()方法,暂停当前协程,不影响事件循环调度其他的协程完成异步任务。
可以尝试用time.sleep(10)替换await asyncio.sleep(10),同样地访问两个服务,看say-hello服务能否在external-data服务执行期间得到返回结果。

同样的禁止在协程中直接使用的还有:

  • time.sleep()
  • 同步文件操作 (open().read())
  • requests.get()
  • 同步数据库驱动 (psycopg2, pymysql)

必须替换为:

  • await asyncio.sleep()
  • aiofiles
  • httpx.AsyncClient
  • 异步数据库驱动 (asyncpg, aiomysql)

Q: 如何检测代码是否阻塞事件循环?

A: 设置日志打印,如果看到Executing ... took ... seconds,说明有阻塞

1
2
import logging
logging.basicConfig(level=logging.DEBUG)

最后回顾引言的几个问题,想必你都有了自己的答案🎉🎉🎉!