介绍

Python 不乏并发选项,标准库包括对线程、进程和异步 I/O 的支持。在许多情况下,Python 通过创建异步、线程和子进程等高级模块,消除了使用这些各种并发方法的困难。在标准库之外,还有第三种解决方案,例如twisted、stackless 和处理模块,仅举几例。本文使用实践示例专门关注 Python 中的线程处理。网上有很多很好的资源来记录线程 API,但本文试图提供常见线程使用模式的实践示例。

首先定义进程和线程之间的区别很重要。线程与进程的不同之处在于它们共享状态、内存和资源。这个简单的区别对于线程来说既是优点也是缺点。一方面,线程是轻量级的并且易于通信,但另一方面,它们带来了一系列问题,包括死锁、竞争条件和纯粹的复杂性。幸运的是,由于 GIL 和排队模块,Python 中的线程实现起来比其他语言要简单得多。

你好 Python 线程

接下来,我假设你已经安装了 Python 2.5 或更高版本,因为许多示例将使用 Python 语言的更新功能,这些功能至少出现在 Python2.5 中。要开始使用 Python 中的线程,我们将从一个简单的“Hello World”示例开始:

清单 1. hello_threads_example
        import threading
        import datetime
        
        class ThreadClass(threading.Thread):
          def run(self):
            now = datetime.datetime.now()
            print "%s says Hello World at time: %s" % 
            (self.getName(), now)
        
        for i in range(2):
          t = ThreadClass()
          t.start()

如果你运行这个例子,你会得到以下输出:

      #python hello_threads.py 
      Thread‑1 says Hello World at time: 2008‑05‑13 13:22:50.252069
      Thread‑2 says Hello World at time: 2008‑05‑13 13:22:50.252576

查看此输出,你可以看到你收到了来自两个带有日期戳的线程的 Hello World 语句。如果你查看实际代码,会发现有两个 import 语句;一个导入 datetime 模块,另一个导入 threading 模块。该类ThreadClass继承自threading.Thread,因此,您需要定义一个 run 方法来执行您在线程内运行的代码。在 run 方法中唯一需要注意的重要事项self.getName()是该方法将标识线程的名称。

最后三行代码实际上调用了类并启动了线程。如果您注意到,t.start()实际上是启动线程的。线程模块在设计时就考虑到了继承性,实际上是建立在较低级别的线程模块之上的。在大多数情况下,继承自 被认为是最佳实践threading.Thread,因为它为线程编程创建了一个非常自然的 API。

使用带线程的队列

正如我之前提到的,当线程需要共享数据或资源时,线程处理可能会很复杂。线程模块确实提供了许多同步原语,包括信号量、条件变量、事件和锁。虽然存在这些选项,但最好的做法是专注于使用队列。队列更容易处理,并使线程编程更加安全,因为它们有效地将所有对资源的访问集中到单个线程,并允许更清晰、更易读的设计模式。

在下一个示例中,你将首先创建一个程序,该程序将依次或一个接一个地获取网站的 URL,并打印出页面的前 1024 个字节。这是使用线程可以更快地完成某些事情的经典示例。首先,让我们使用urllib2模块一次抓取这些页面,并对代码进行计时:

清单 2. URL 获取序列
        import urllib2
        import time
        
        hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"        
        start = time.time()
        #grabs urls of hosts and prints first 1024 bytes of page
        for host in hosts:
          url = urllib2.urlopen(host)
          print url.read(1024)
        
        print "Elapsed Time: %s" % (time.time() ‑ start)

当你运行它时,你会得到大量输出到标准输出,因为页面被部分打印。但你会在最后得到这个:

        Elapsed Time: 2.40353488922

让我们稍微看一下这段代码。你只导入两个模块。首先,urllib2模块是承担重任并抓取网页的东西。其次,你通过调用​time.time()​创建一个开始时间值,然后再次调用它并减去初始值以确定程序执行所需的时间。最后,从程序的速度来看,“两秒半”的结果并不可怕,但如果你有数百个网页要检索,考虑到当前的平均值,大约需要 50 秒。看看创建线程版本如何加快速度:

清单 3. URL 获取线程
          #!/usr/bin/env python
          import Queue
          import threading
          import urllib2
          import time
          
          hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
          "http://ibm.com", "http://apple.com"          
          queue = Queue.Queue()
          
          class ThreadUrl(threading.Thread):
          """Threaded Url Grab"""
            def init(self, queue):
              threading.Thread.init(self)
              self.queue = queue
          
            def run(self):
              while True:
                #grabs host from queue
                host = self.queue.get()
            
                #grabs urls of hosts and prints first 1024 bytes of page
                url = urllib2.urlopen(host)
                print url.read(1024)
            
                #signals to queue job is done
                self.queue.task_done()
          
          start = time.time()
          def main():
          
            #spawn a pool of threads, and pass them queue instance 
            for i in range(5):
              t = ThreadUrl(queue)
              t.setDaemon(True)
              t.start()
              
           #populate queue with data   
              for host in hosts:
                queue.put(host)
           
           #wait on the queue until everything has been processed     
           queue.join()
          
          main()
          print "Elapsed Time: %s" % (time.time() ‑ start)

这个例子有更多的代码需要解释,但由于使用了排队模块,它并没有比第一个线程示例复杂多少。这种模式是在 Python 中使用线程的一种非常常见且推荐的方式。步骤描述如下:

  1. 创建一个​Queue.Queue()​实例,然后用数据填充它。
  2. 将填充数据的实例传递到从​threading.Thread​继承而创建的​Thread​类中。
  3. 产生一个守护线程池。
  4. 一次从队列中拉出一项,并在线程内部使用该数据(即 run 方法)来完成这项工作。
  5. 工作完成后,向​queue.task_done()​队列发送任务已完成的信号。
  6. 加入队列,这实际上意味着等到队列为空,然后退出主程序。

关于此模式的注意事项:通过将守护线程设置为 true,它允许主线程或程序在只有守护线程处于活动状态时退出。这创建了一种控制程序流程的简单方法,因为你可以在退出之前加入队列,或等到队列为空。确切的过程在队列模块的文档中得到了最好的描述,如右侧的资源部分所示:

join()

阻塞,直到队列中的所有项目都被获取和处理。每当将项目添加到队列时,未完成任务的计数就会增加。每当使用者线程调用 task_done() 以指示该项目已被检索并且其上的所有工作已完成时,未完成任务的计数就会下降。当未完成任务的数量降至零时, join()解锁。

使用多个队列

因为上面演示的模式非常有效,所以通过将额外的线程池与队列链接来扩展它是相对简单的。在上面的示例中,你只是打印出网页的第一部分。下一个示例返回每个线程抓取的整个网页,然后将其放入另一个队列。然后设置另一个加入第二个队列的线程池,然后在网页上工作。本示例中执行的工作涉及使用名为 Beautiful Soup 的第三方 Python 模块解析网页。仅使用几行代码,使用此模块,你将提取标题标签并为你访问的每个页面打印出来。

清单 4. 多队列数据挖掘网站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
        "http://ibm.com", "http://apple.com"
queue = Queue.Queue()
outqueue = Queue.Queue()

class ThreadUrl(threading.Thread):
    """Threaded Url Grab"""
    def init(self, queue, outqueue):
        threading.Thread.init(self)
        self.queue = queue
        self.outqueue = outqueue

    def run(self):
        while True:
            #grabs host from queue
            host = self.queue.get()

            #grabs urls of hosts and then grabs chunk of webpage
            url = urllib2.urlopen(host)
            chunk = url.read()

            #place chunk into out queue
            self.out_queue.put(chunk)

            #signals to queue job is done
            self.queue.task_done()

class DatamineThread(threading.Thread):
    """Threaded Url Grab"""
    def __init(self, out_queue):
        threading.Thread.__init(self)
        self.out_queue = out_queue

    def run(self):
        while True:
            #grabs host from queue
            chunk = self.out_queue.get()

            #parse the chunk
            soup = BeautifulSoup(chunk)
            print soup.findAll(['title'])

            #signals to queue job is done
            self.out_queue.task_done()

start = time.time()
def main():

    #spawn a pool of threads, and pass them queue instance
    for i in range(5):
        t = ThreadUrl(queue, out_queue)
        t.setDaemon(True)
        t.start()

    #populate queue with data
    for host in hosts:
        queue.put(host)

    for i in range(5):
        dt = DatamineThread(out_queue)
        dt.setDaemon(True)
        dt.start()


    #wait on the queue until everything has been processed
    queue.join()
    out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() ‑ start)

如果你运行此版本的脚本,你将获得以下输出:

  #python url_fetch_threaded_part2.py 

  <title>Google</title>  <title>Yahoo!</title>  <title>Apple</title>  <title>IBM United States</title>  <title>Amazon.com: Online Shopping for Electronics, Apparel,
 Computers, Books, DVDs & more</title>  Elapsed Time: 3.75387597084

在查看代码时,你可以看到我们添加了另一个队列实例,然后将该队列传递给第一个线程池类ThreadURL. 接下来,你几乎为下一个线程池类复制了完全相同的结构DatamineThread。在这个类的run方法中,从每个线程的队列中抓取网页,chunk,然后用Beautiful Soup处理这个chunk。在这种情况下, 你可以使用 Beautiful Soup 来简单地从每个页面中提取标题标签并打印出来。这个例子可以很容易地变成更有用的东西,因为你拥有基本搜索引擎或数据挖掘工具的核心。一个想法是使用 Beautiful Soup 从每个页面中提取链接,然后关注它们。

总结

本文探讨了 Python 中的线程,并展示了使用队列来减轻复杂性和细微错误以及提高可读代码的最佳实践。虽然这个基本模式相对简单,但它可以通过将队列和线程池链接在一起来解决大量问题。在最后一部分,您开始探索创建一个更复杂的处理管道,作为未来项目的模型。在资源部分有很多关于并发和线程的优秀资源。

最后,重要的是要指出线程并不是所有问题的解决方案,而且进程可以非常适合许多情况。如果你只需要分叉多个进程并监听响应,那么标准库 ​​subprocess 模块尤其可以更简单地处理。