多线程
threading.Thread
线程对象,通过该对象创建线程。用户可以继承该对象以实现自定义的线程类。也可以传递函数来创建线程。
模块的常用方法
threading.active_count()
判断当前线程数量
threading.current_thread()
获取当前控制的线程对象
threading.get_ident()
获取当前线程的线程标识符,线程销毁后,标识符可被复用。
threading.enumerate()
获取当前线程对象列表
threading.main_thread()
获取主线程对象
threading.stack_size([size])
返回创建线程时的堆栈大小
通过传递函数来构造线程
import threading
import time
def test(id=1):
time.sleep(1)
print("threading-%s start"%id)
th1 = threading.Thread(target=test)
th2 = threading.Thread(target=test, args=(2,))
th1.start()
th2.start()
print("all done")
可以注意到,当线程在等待1秒的期间,最后一句打印会被执行,因为程序不会阻塞。
通过继承Thread创建线程
import threading
import time
class MyThread(threading.Thread):
def __init__(self, id):
self.id = id
def run(self):
time.sleep(1)
print("threading-%s start"%id)
th1 = MyThread(1)
th2 = MyThread(2)
th1.start()
th2.start()
print("all done")
线程对象的常用属性及方法
start()
开启线程活动,若调用该方法时,默认执行线程类中的run()方法。同一个线程对象只能调用一次。
join(timeout=None)
让当前线程等待,直到线程结束。可以被调用多次。
th = threading.Thread(target=func)
th.start()
th.join()
print("all done")
最后一句输出将会在线程结束后才调用。
is_alive()
判断线程是否存活,返回布尔值。
ident
获取线程的标识符
daemon
判断该线程是否是守护线程,所有主线程创建的线程默认都为非守护线程
threading.Lock
锁定当前线程,其他线程处于等待获取锁的状态。用于保护多线程下出现的不同线程获取到脏数据的情况。当锁被释放后,处于等待的线程可以获取锁。
锁不一定需要获取它的线程来释放。
常用方法
acquire(blocking=True, timeout=-1)
获取锁,返回布尔值。timeout=-1表示无限阻塞等待。blocking=True表示timeout参数有效。
release()
释放锁。
示例
import threading
import time
lock = threading.Lock()
GLIST = [None] * 5
def test(param):
lock.acquire()
global GLIST
for i in range(len(GLIST)):
GLIST[i] = param
print(GLIST)
lock.release()
th1 = threading.Thread(target=test, args=('th1', ))
th2 = threading.Thread(target=test, args=('th2', ))
th1.start()
th2.start()
如果单个线程连续两次请求获取锁,将会造成死锁。
threading.RLock
递归锁与普通锁的区别在于加入了所属线程和递归等级的概念。
- 所属线程:锁的释放必须由获取锁的线程来完成。
- 递归等级:当前线程再次获取锁时,锁的递归等级加1。初始等级为1
递归锁可以有效地解决死锁问题
示例
import threading
import time
rlock_hi = rlock_hello = threading.RLock()
def test_thread_hi():
rlock_hi.acquire()
print("threading test_thread_hi got lock rlock_hi")
time.sleep(2)
rlock_hello.acquire()
print("threading test_thread_hi got lock rlock_hello")
rlock_hello.release()
rlock_hi.release()
def test_thread_hello():
rlock_hello.acquire()
print("threading test_thread_hello got lock rlock_hello")
rlock_hi.acquire()
print("threading test_thread_hi got lock rlock_hi")
rlock_hi.release()
rlock_hello.release()
thread_hi = threading.Thread(target=test_thread_hi)
thread_hello = threading.Thread(target=test_thread_hello)
thread_hi.start()
thread_hello.start()
threading.Condition
Condition允许一个或多个线程等待,直到被另一个线程通知。
常用方法
acquire(*args)
请求锁
release()
释放锁
wait(timeout=None)
释放锁,阻塞线程,直到另一个线程使用notify()或notify_all()方法唤醒。被唤醒后会重新获取锁
wait_for(predicate, timeout=None)
相当于:
while not predicate:
threading.Condition().wait()
该方法先调用predicate对象,如果predicate返回False,表示上面的while循环条件为True,执行wait(),释放锁并等待。
notify(n=1)
唤醒一个正在等待的线程。n=1表示唤醒n个正在等待的线程。
该方法并不释放锁,所以当前线程如果调用了该方法,处于等待的线程也不能立刻获取锁,只有当前线程调用了release()方法释放了锁,等待的线程才能继续执行。
notify_all()
唤醒所有等待的线程。
示例
import threading
import time
condition_lock = threading.Condition()
PRE = 0
def pre():
print(PRE)
return PRE
def test_thread_hi():
condition_lock.acquire()
print("wait for threading test_thread_hello's notify")
condition_lock.wait_for(pre)
# 相当于:
# while not pre:
# condition_lock.wait()
# condition_lock.wait()
print("continue execute")
condition_lock.release()
def test_thread_hello():
time.sleep(1)
condition_lock.acquire()
global PRE
PRE = 1
print("modify PRE to 1")
print("notify thread test_thread_hello that it can prepare to get lock")
condition_lock.notify()
print("hello is ready to release lock")
condition_lock.release()
print("come and get lock")
thread_hi = threading.Thread(target=test_thread_hi)
thread_hello = threading.Thread(target=test_thread_hello)
thread_hi.start()
thread_hello.start()
threading.Semaphore
信号量对象通过一个内部计数器管理同一时间可执行的最大线程数。获取锁时计数器减1,释放锁时计数器加一,当计数器为0时,阻塞线程,知道有线程通过释放锁使计数器增加,处于阻塞的线程才能获取锁。
示例
import threading
import time
# 计数器初始值为3,表示同意之间只能有3个线程运行
semaphore3 = threading.Semaphore(3)
def thread_semaphore(index):
semaphore3.acquire()
time.sleep(2)
print("thread_%s is running..." % index)
semaphore3.release()
for index in range(9):
threading.Thread(target=thread_semaphore, args={index,}).start()
threading.Event
事件对象用于线程间分享状态,通过一个内部标识来实现。内部标识默认为False。
常用方法
is_set()
返回内部标识
set()
设置内部标识
clear()
将内部标识设置为False
wait(timeout=None)
阻塞线程,直到内部标识为True
示例
import threading
import time
event = threading.Event()
def student_exam(id):
print("student %s waiting for teacher to send papers" % id)
event.wait()
print("exam is begining")
def teacher():
time.sleep(5)
print("time up, begin to exam")
event.set()
for id in range(3):
threading.Thread(target=student_exam, args=(id,)).start()
threading.Thread(target=teacher).start()
threading.Barrier
栅栏对象。阻塞调用了wait()方法的线程,直到所有目标线程都调用了wait()方法。然后同时释放所有线程。
threading.Barrier(parties, action=None, timeout=None)
"""
parties: 指定线程数
action: 可调用对象。所有线程被释放前,随机抽取一个线程调用该方法
timeout: wait()的超时时间
"""
常用方法及属性
wait(timeout=None)
阻塞线程,若超时,抛出BrokenBarrierError异常
reset()
将栅栏对象重置为默认的初始态
abort()
主动抛出BrokenBarrierError异常。
broken
判断栅栏对象的的破损状态
n_waiting
处于等待中的线程数量
parties
通过栅栏的线程数量
示例
import threading
import time
def test_action():
print("call the func before all barrier threading released")
barrier = threading.Barrier(3, test_action)
def barrier_thread(sleep):
time.sleep(sleep)
print("barrier thread-%s wait" % sleep)
barrier.wait()
print("barrier thread-%s end" % sleep)
for sleep in range(6):
threading.Thread(target=barrier_thread, args=(sleep,)).start()
threading.local
TLS
线程局部存储。允许线程访问同一个全局变量,每个线程会为自己局部存储一个副本,相互之间互不影响。
如果全局变量本身需要维护,那么需要加锁。
flask的上下文就基于此实现。
示例:
import threading
userName = threading.local()
def SessionThread(userName_in):
userName.val = userName_in
print(userName.val)
Session1 = threading.Thread(target=SessionThread("User1"))
Session2 = threading.Thread(target=SessionThread("User2"))
Session1.start()
Session2.start()
Session1.join()
Session2.join()
通过实现一个全局字典,key为线程ID,value为全局变量副本,当线程访问全局变量时,其实是根据线程ID得到了对应的副本。