读:Python 延迟——time.sleep() 不是万能的
目录
Real Python 有一篇教程讲了 Python 中添加延迟的多种方法。time.sleep() 这东西在普通脚本里用着挺顺手,但程序一旦涉及多线程、异步或者 GUI,它就开始给你添乱了。线程退不掉、事件循环冻死、界面卡住,根子都在一件事上,time.sleep() 阻塞的是整个线程,而你的程序可能在一个线程里同时处理好几件事。
普通脚本,time.sleep() 够用
基本用法
time.sleep() 接受一个秒数参数(支持小数),暂停当前线程。
import time print("开始") time.sleep(3) print("结束")
有两点要留心。一是 time.sleep() 暂停的是 调用它的线程 ,不是整个进程,如果你的程序只有主线程,那就是暂停整个程序。二是精度不保证,你传 3 秒,实际等待几乎总是略长于 3 秒,因为操作系统调度有开销。
精度到底差多少
用 time.perf_counter() 可以精确测量。连跑 5 次 sleep(1),看看实际耗时是多少。
import time for i in range(5): start = time.perf_counter() time.sleep(1) elapsed = time.perf_counter() - start print(f"第 {i+1} 次: sleep(1) 实际耗时 {elapsed:.6f} 秒")
第 1 次: sleep(1) 实际耗时 1.000111 秒 第 2 次: sleep(1) 实际耗时 1.000143 秒 第 3 次: sleep(1) 实际耗时 1.000474 秒 第 4 次: sleep(1) 实际耗时 1.000104 秒 第 5 次: sleep(1) 实际耗时 1.000100 秒
每次都比 1 秒多了 0.0001 到 0.0005 秒。多数场景下这点误差无所谓,但如果在做精确计时或高频轮询(比如每 10 毫秒一次),这个偏差会累积。原因是操作系统的调度有开销,time.sleep() 只能保证"至少等这么久",不能保证"恰好等这么久"。
实战,定时检查网站状态
一个常见的用法是定时轮询。下面这个函数每 60 秒检查一次网站是否在线(为了演示,间隔改为 5 秒,只检查 3 次)。
import time import urllib.request import urllib.error def uptime_bot(url, interval=5, checks=3): for _ in range(checks): try: response = urllib.request.urlopen(url, timeout=10) print(f"[{response.status}] {url} 正常") except urllib.error.HTTPError as e: print(f"[HTTP {e.code}] {url} 返回错误") except urllib.error.URLError as e: print(f"[URL错误] {url}: {e.reason}") time.sleep(interval) uptime_bot("https://httpbin.org/status/404")
[HTTP 404] https://httpbin.org/status/404 返回错误 [HTTP 404] https://httpbin.org/status/404 返回错误 [HTTP 404] https://httpbin.org/status/404 返回错误
这里用 httpbin 的 404 端点做演示,实际使用时换成你要监控的 URL,间隔改回 60 秒就行。"循环 + sleep"在脚本里没毛病,脚本只有一个线程,不需要响应外部事件。
重试间隔,用装饰器把 time.sleep() 包装起来
有些操作偶尔失败(网络超时、接口没就绪),重试几次就好了,但重试之间得等一等。把"重试 + 等待"的逻辑抽成装饰器,任何函数加一行就能自动重试。
import time def retry(delay=3, max_retries=3): def decorator(func): def wrapper(*args, **kwargs): last_exc = None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exc = e print(f"第 {attempt + 1} 次失败,{delay} 秒后重试: {e}") time.sleep(delay) raise last_exc return wrapper return decorator @retry(delay=2) def download_file(url): raise ConnectionError(f"无法连接 {url}") try: download_file("https://example.com/file.zip") except ConnectionError as e: print(f"最终失败: {e}")
第 1 次失败,2 秒后重试: 无法连接 https://example.com/file.zip 第 2 次失败,2 秒后重试: 无法连接 https://example.com/file.zip 第 3 次失败,2 秒后重试: 无法连接 https://example.com/file.zip 最终失败: 无法连接 https://example.com/file.zip
time.sleep() 用在这里挺合适的,重试逻辑本身就是同步的,不需要响应外部事件。装饰器的好处,就是把重试逻辑从业务代码里抽出来,省得每次都手写 for 循环加 try/except。
以上场景 time.sleep() 都没问题。下面三个场景,它就开始坑人了。
多线程,time.sleep() 让线程退不掉,改用 Event.wait()
你有个后台线程在做数据迁移,每批操作之间要等 5 秒,免得压垮数据库。time.sleep(5) 能实现等待,但麻烦来了,你想停掉这个线程的时候(比如用户按了 Ctrl+C),线程得等 sleep 结束才能响应退出信号。sleep 剩 4 秒,你就得干等 4 秒。
举个更具体的例子。你的程序有个后台线程,每隔 10 秒往日志里写一条心跳记录。你用的是 time.sleep(10):
import threading import time def heartbeat(): for i in range(2): print(f"心跳 {i} [{time.strftime('%H:%M:%S')}]") time.sleep(5) # 注意:没有设 daemon=True,所以是普通线程 t = threading.Thread(target=heartbeat) t.start() time.sleep(2) print(f"主程序要退出了... [{time.strftime('%H:%M:%S')}]") # 线程刚发完心跳 0,正卡在 sleep(5) 的第 2 秒 # Python 会等非 daemon 线程结束才退出,还得等约 8 秒
心跳 0 [16:53:45] 主程序要退出了... [16:53:47] 心跳 1 [16:53:50] (程序总共跑了 10 秒才退出,而不是主线程希望的 2 秒)
主程序想退出,线程正卡在 sleep 中间,Python 要等非 daemon 线程跑完才退出。上面这个例子,主线程 2 秒就想走了,但实际等了 10 秒。这在生产环境里很烦人,特别是 sleep 时间长的时候(比如 60 秒的监控间隔)。
threading.Event 的 wait() 方法就是干这个的,也能等待指定时间,但可以立刻被中断。
import threading import logging import time logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s") def worker(interval, stop_event): while not stop_event.is_set(): logging.debug("工作中...") stop_event.wait(timeout=interval) logging.debug("退出") stop_event = threading.Event() t1 = threading.Thread(target=worker, args=(2, stop_event), name="Worker-1") t2 = threading.Thread(target=worker, args=(3, stop_event), name="Worker-2") t1.start() t2.start() try: while not stop_event.wait(timeout=5): logging.debug("主线程仍在运行") except KeyboardInterrupt: logging.debug("收到中断信号") stop_event.set() t1.join() t2.join() logging.debug("所有线程已退出")
Worker-1: 工作中... Worker-2: 工作中... Worker-1: 工作中... Worker-2: 工作中... Worker-1: 工作中... MainThread: 主线程仍在运行 MainThread: 收到中断信号 Worker-1: 退出 Worker-2: 退出 MainThread: 所有线程已退出
Event.wait(timeout=N):N 秒内 event 被 set() 了,它立刻返回 True,线程马上退出循环。超时了就返回 False,线程继续下一轮。比 time.sleep() 优雅得多,不用等 sleep 跑完才能停。
异步代码,time.sleep() 冻结整个事件循环,改用 asyncio.sleep()
异步编程的思路:一个线程同时处理多个任务,哪个任务在等 IO 就先挂起它,去跑其他任务。time.sleep() 会把整个线程(也就是整个事件循环)冻住,所有任务都卡住,异步的优势全没了。
asyncio.sleep() 只挂起当前协程,不阻塞事件循环,其他协程可以继续跑。下面这个例子用时间戳直观展示两者的区别。
import asyncio import time async def task(name, delay): print(f"{time.strftime('%H:%M:%S')} {name} 开始") await asyncio.sleep(delay) print(f"{time.strftime('%H:%M:%S')} {name} 结束(等了 {delay} 秒)") async def main(): # task A 等 2 秒,task B 等 1 秒 # 如果是串行,总共要 3 秒;并发执行只需 2 秒 await asyncio.gather(task("A", 2), task("B", 1)) asyncio.run(main())
14:44:23 A 开始 14:44:23 B 开始 14:44:24 B 结束(等了 1 秒) 14:44:25 A 结束(等了 2 秒)
B 在第 1 秒就结束了,不用等 A。两个任务是并发执行的,总共只花了 2 秒而不是串行的 3 秒。
如果换成 time.sleep(),效果是这样的:
import time # 串行版本(time.sleep 在 async 中的效果) print("=== 串行 ===") start = time.perf_counter() for name, delay in [("A", 2), ("B", 1)]: print(f"{time.strftime('%H:%M:%S')} {name} 开始") time.sleep(delay) print(f"{time.strftime('%H:%M:%S')} {name} 结束(等了 {delay} 秒)") print(f"串行总耗时: {time.perf_counter() - start:.1f} 秒")
=== 串行 === 16:40:28 A 开始 16:40:30 A 结束(等了 2 秒) 16:40:30 B 开始 16:40:31 B 结束(等了 1 秒) 串行总耗时: 3.0 秒
对比一下,串行版本里 B 必须等 A 跑完才开始,总共 3 秒。而用 asyncio.sleep() 的并发版本,两个任务同时开始,B 先结束,总共只要 2 秒。差异来自 time.sleep() 把整个线程焊死了,事件循环没法切换到其他协程。
说个实际的场景,调 API 有速率限制,需要在请求之间加间隔。用 asyncio.sleep() 可以在不阻塞事件循环的前提下控制请求频率,同时其他协程(比如处理用户输入)不受影响。
GUI 程序,time.sleep() 冻死界面,改用 .after()
GUI 框架(Tkinter、wxPython)在主线程跑一个事件循环,处理用户点击、窗口重绘这些事。在事件循环里调用 time.sleep(),会导致界面直接冻住,按钮点不动,窗口拖不了,Windows 上甚至会弹出"程序未响应"。
先看错误示范。
import tkinter as tk import time root = tk.Tk() root.title("冻结演示") root.geometry("300x150") def on_click(): print("按钮被点击") time.sleep(3) print("3 秒到了") btn = tk.Button(root, text="点我(会冻结 3 秒)", command=on_click) btn.pack(pady=50) root.mainloop()
点击按钮后,整个窗口冻住 3 秒,什么都做不了。
正确做法是用 Tkinter 的 .after() 方法。它接受两个参数,等待的毫秒数和回调函数。调用后立刻返回,不阻塞事件循环。
import tkinter as tk root = tk.Tk() root.title("非阻塞演示") root.geometry("300x150") def delayed_action(): print("3 秒到了") def on_click(): root.after(3000, delayed_action) print("已安排 3 秒后执行,界面不会卡") btn = tk.Button(root, text="点我(不会冻结)", command=on_click) btn.pack(pady=50) root.mainloop()
已安排 3 秒后执行,界面不会卡 (3 秒后) 3 秒到了 (此输出为描述,未经实际验证——验证环境缺少 python3-tk)
用 .after() 之后,按钮可以反复点击,每次点击都安排一个延迟回调,界面始终流畅。wxPython 的等价方法是 wx.CallLater(),思路一样,安排延迟回调而不是阻塞线程。
决策速查表
| 场景 | 用什么 | 为什么 |
|---|---|---|
| 普通脚本(包括重试间隔) | time.sleep() | 只有一个线程,不怕阻塞 |
| 多线程,工作间隔 | Event.wait() | 可以被 set() 立即中断,不用等 sleep 跑完 |
| 异步代码,协程暂停 | asyncio.sleep() | 只挂起当前协程,不冻结事件循环 |
| GUI,延迟操作 | .after() / wx.CallLater() | 不阻塞 GUI 事件循环,界面保持响应 |
说到底就一句话,time.sleep() 阻塞的是整个线程。只要你的程序在一个线程里跑多个任务(多个协程共享一个线程、GUI 事件循环在主线程处理用户操作),就不能用它,得换成不阻塞或可中断的方案。