暗无天日

=============>DarkSun的个人博客

读:Python 延迟——time.sleep() 不是万能的

Real Python 有一篇教程讲了 Python 中添加延迟的多种方法。time.sleep() 这东西在普通脚本里用着挺顺手,但程序一旦涉及多线程、异步或者 GUI,它就开始给你添乱了。线程退不掉、事件循环冻死、界面卡住,根子都在一件事上,time.sleep() 阻塞的是整个线程,而你的程序可能在一个线程里同时处理好几件事。

普通脚本,time.sleep() 够用

基本用法

time.sleep() 接受一个秒数参数(支持小数),暂停当前线程。

import time

print("开始")
time.sleep(3)
print("结束")

有两点要留心。一是 time.sleep() 暂停的是 调用它的线程 ,不是整个进程,如果你的程序只有主线程,那就是暂停整个程序。二是精度不保证,你传 3 秒,实际等待几乎总是略长于 3 秒,因为操作系统调度有开销。

精度到底差多少

用 time.perf_counter() 可以精确测量。连跑 5 次 sleep(1),看看实际耗时是多少。

import time

for i in range(5):
    start = time.perf_counter()
    time.sleep(1)
    elapsed = time.perf_counter() - start
    print(f"第 {i+1} 次: sleep(1) 实际耗时 {elapsed:.6f} 秒")
第 1 次: sleep(1) 实际耗时 1.000111 秒
第 2 次: sleep(1) 实际耗时 1.000143 秒
第 3 次: sleep(1) 实际耗时 1.000474 秒
第 4 次: sleep(1) 实际耗时 1.000104 秒
第 5 次: sleep(1) 实际耗时 1.000100 秒

每次都比 1 秒多了 0.0001 到 0.0005 秒。多数场景下这点误差无所谓,但如果在做精确计时或高频轮询(比如每 10 毫秒一次),这个偏差会累积。原因是操作系统的调度有开销,time.sleep() 只能保证"至少等这么久",不能保证"恰好等这么久"。

实战,定时检查网站状态

一个常见的用法是定时轮询。下面这个函数每 60 秒检查一次网站是否在线(为了演示,间隔改为 5 秒,只检查 3 次)。

import time
import urllib.request
import urllib.error

def uptime_bot(url, interval=5, checks=3):
    for _ in range(checks):
        try:
            response = urllib.request.urlopen(url, timeout=10)
            print(f"[{response.status}] {url} 正常")
        except urllib.error.HTTPError as e:
            print(f"[HTTP {e.code}] {url} 返回错误")
        except urllib.error.URLError as e:
            print(f"[URL错误] {url}: {e.reason}")
        time.sleep(interval)

uptime_bot("https://httpbin.org/status/404")
[HTTP 404] https://httpbin.org/status/404 返回错误
[HTTP 404] https://httpbin.org/status/404 返回错误
[HTTP 404] https://httpbin.org/status/404 返回错误

这里用 httpbin 的 404 端点做演示,实际使用时换成你要监控的 URL,间隔改回 60 秒就行。"循环 + sleep"在脚本里没毛病,脚本只有一个线程,不需要响应外部事件。

重试间隔,用装饰器把 time.sleep() 包装起来

有些操作偶尔失败(网络超时、接口没就绪),重试几次就好了,但重试之间得等一等。把"重试 + 等待"的逻辑抽成装饰器,任何函数加一行就能自动重试。

import time

def retry(delay=3, max_retries=3):
    def decorator(func):
        def wrapper(*args, **kwargs):
            last_exc = None
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_exc = e
                    print(f"第 {attempt + 1} 次失败,{delay} 秒后重试: {e}")
                    time.sleep(delay)
            raise last_exc
        return wrapper
    return decorator

@retry(delay=2)
def download_file(url):
    raise ConnectionError(f"无法连接 {url}")

try:
    download_file("https://example.com/file.zip")
except ConnectionError as e:
    print(f"最终失败: {e}")
第 1 次失败,2 秒后重试: 无法连接 https://example.com/file.zip
第 2 次失败,2 秒后重试: 无法连接 https://example.com/file.zip
第 3 次失败,2 秒后重试: 无法连接 https://example.com/file.zip
最终失败: 无法连接 https://example.com/file.zip

time.sleep() 用在这里挺合适的,重试逻辑本身就是同步的,不需要响应外部事件。装饰器的好处,就是把重试逻辑从业务代码里抽出来,省得每次都手写 for 循环加 try/except。

以上场景 time.sleep() 都没问题。下面三个场景,它就开始坑人了。

多线程,time.sleep() 让线程退不掉,改用 Event.wait()

你有个后台线程在做数据迁移,每批操作之间要等 5 秒,免得压垮数据库。time.sleep(5) 能实现等待,但麻烦来了,你想停掉这个线程的时候(比如用户按了 Ctrl+C),线程得等 sleep 结束才能响应退出信号。sleep 剩 4 秒,你就得干等 4 秒。

举个更具体的例子。你的程序有个后台线程,每隔 10 秒往日志里写一条心跳记录。你用的是 time.sleep(10):

import threading
import time

def heartbeat():
    for i in range(2):
        print(f"心跳 {i} [{time.strftime('%H:%M:%S')}]")
        time.sleep(5)

# 注意:没有设 daemon=True,所以是普通线程
t = threading.Thread(target=heartbeat)
t.start()
time.sleep(2)
print(f"主程序要退出了... [{time.strftime('%H:%M:%S')}]")
# 线程刚发完心跳 0,正卡在 sleep(5) 的第 2 秒
# Python 会等非 daemon 线程结束才退出,还得等约 8 秒
心跳 0 [16:53:45]
主程序要退出了... [16:53:47]
心跳 1 [16:53:50]
(程序总共跑了 10 秒才退出,而不是主线程希望的 2 秒)

主程序想退出,线程正卡在 sleep 中间,Python 要等非 daemon 线程跑完才退出。上面这个例子,主线程 2 秒就想走了,但实际等了 10 秒。这在生产环境里很烦人,特别是 sleep 时间长的时候(比如 60 秒的监控间隔)。

threading.Event 的 wait() 方法就是干这个的,也能等待指定时间,但可以立刻被中断。

import threading
import logging
import time

logging.basicConfig(level=logging.DEBUG, format="%(threadName)s: %(message)s")

def worker(interval, stop_event):
    while not stop_event.is_set():
        logging.debug("工作中...")
        stop_event.wait(timeout=interval)
    logging.debug("退出")

stop_event = threading.Event()
t1 = threading.Thread(target=worker, args=(2, stop_event), name="Worker-1")
t2 = threading.Thread(target=worker, args=(3, stop_event), name="Worker-2")
t1.start()
t2.start()

try:
    while not stop_event.wait(timeout=5):
        logging.debug("主线程仍在运行")
except KeyboardInterrupt:
    logging.debug("收到中断信号")
    stop_event.set()

t1.join()
t2.join()
logging.debug("所有线程已退出")
Worker-1: 工作中...
Worker-2: 工作中...
Worker-1: 工作中...
Worker-2: 工作中...
Worker-1: 工作中...
MainThread: 主线程仍在运行
MainThread: 收到中断信号
Worker-1: 退出
Worker-2: 退出
MainThread: 所有线程已退出

Event.wait(timeout=N):N 秒内 event 被 set() 了,它立刻返回 True,线程马上退出循环。超时了就返回 False,线程继续下一轮。比 time.sleep() 优雅得多,不用等 sleep 跑完才能停。

异步代码,time.sleep() 冻结整个事件循环,改用 asyncio.sleep()

异步编程的思路:一个线程同时处理多个任务,哪个任务在等 IO 就先挂起它,去跑其他任务。time.sleep() 会把整个线程(也就是整个事件循环)冻住,所有任务都卡住,异步的优势全没了。

asyncio.sleep() 只挂起当前协程,不阻塞事件循环,其他协程可以继续跑。下面这个例子用时间戳直观展示两者的区别。

import asyncio
import time

async def task(name, delay):
    print(f"{time.strftime('%H:%M:%S')} {name} 开始")
    await asyncio.sleep(delay)
    print(f"{time.strftime('%H:%M:%S')} {name} 结束(等了 {delay} 秒)")

async def main():
    # task A 等 2 秒,task B 等 1 秒
    # 如果是串行,总共要 3 秒;并发执行只需 2 秒
    await asyncio.gather(task("A", 2), task("B", 1))

asyncio.run(main())
14:44:23 A 开始
14:44:23 B 开始
14:44:24 B 结束(等了 1 秒)
14:44:25 A 结束(等了 2 秒)

B 在第 1 秒就结束了,不用等 A。两个任务是并发执行的,总共只花了 2 秒而不是串行的 3 秒。

如果换成 time.sleep(),效果是这样的:

import time

# 串行版本(time.sleep 在 async 中的效果)
print("=== 串行 ===")
start = time.perf_counter()
for name, delay in [("A", 2), ("B", 1)]:
    print(f"{time.strftime('%H:%M:%S')} {name} 开始")
    time.sleep(delay)
    print(f"{time.strftime('%H:%M:%S')} {name} 结束(等了 {delay} 秒)")
print(f"串行总耗时: {time.perf_counter() - start:.1f} 秒")
=== 串行 ===
16:40:28 A 开始
16:40:30 A 结束(等了 2 秒)
16:40:30 B 开始
16:40:31 B 结束(等了 1 秒)
串行总耗时: 3.0 秒

对比一下,串行版本里 B 必须等 A 跑完才开始,总共 3 秒。而用 asyncio.sleep() 的并发版本,两个任务同时开始,B 先结束,总共只要 2 秒。差异来自 time.sleep() 把整个线程焊死了,事件循环没法切换到其他协程。

说个实际的场景,调 API 有速率限制,需要在请求之间加间隔。用 asyncio.sleep() 可以在不阻塞事件循环的前提下控制请求频率,同时其他协程(比如处理用户输入)不受影响。

GUI 程序,time.sleep() 冻死界面,改用 .after()

GUI 框架(Tkinter、wxPython)在主线程跑一个事件循环,处理用户点击、窗口重绘这些事。在事件循环里调用 time.sleep(),会导致界面直接冻住,按钮点不动,窗口拖不了,Windows 上甚至会弹出"程序未响应"。

先看错误示范。

import tkinter as tk
import time

root = tk.Tk()
root.title("冻结演示")
root.geometry("300x150")

def on_click():
    print("按钮被点击")
    time.sleep(3)
    print("3 秒到了")

btn = tk.Button(root, text="点我(会冻结 3 秒)", command=on_click)
btn.pack(pady=50)
root.mainloop()

点击按钮后,整个窗口冻住 3 秒,什么都做不了。

正确做法是用 Tkinter 的 .after() 方法。它接受两个参数,等待的毫秒数和回调函数。调用后立刻返回,不阻塞事件循环。

import tkinter as tk

root = tk.Tk()
root.title("非阻塞演示")
root.geometry("300x150")

def delayed_action():
    print("3 秒到了")

def on_click():
    root.after(3000, delayed_action)
    print("已安排 3 秒后执行,界面不会卡")

btn = tk.Button(root, text="点我(不会冻结)", command=on_click)
btn.pack(pady=50)
root.mainloop()
已安排 3 秒后执行,界面不会卡
(3 秒后)
3 秒到了
(此输出为描述,未经实际验证——验证环境缺少 python3-tk)

用 .after() 之后,按钮可以反复点击,每次点击都安排一个延迟回调,界面始终流畅。wxPython 的等价方法是 wx.CallLater(),思路一样,安排延迟回调而不是阻塞线程。

决策速查表

场景 用什么 为什么
普通脚本(包括重试间隔) time.sleep() 只有一个线程,不怕阻塞
多线程,工作间隔 Event.wait() 可以被 set() 立即中断,不用等 sleep 跑完
异步代码,协程暂停 asyncio.sleep() 只挂起当前协程,不冻结事件循环
GUI,延迟操作 .after() / wx.CallLater() 不阻塞 GUI 事件循环,界面保持响应

说到底就一句话,time.sleep() 阻塞的是整个线程。只要你的程序在一个线程里跑多个任务(多个协程共享一个线程、GUI 事件循环在主线程处理用户操作),就不能用它,得换成不阻塞或可中断的方案。

python : sleep : threading : asyncio : GUI : 并发