1. <em id="2qvri"><tr id="2qvri"></tr></em>
      1. 首頁»Python»改善 Python 程序的 91 個建議(六)

        改善 Python 程序的 91 個建議(六)

        來源:馭風者 發布時間:2017-05-17 閱讀次數:

        建議 87:充分利用 set 的優勢

        Python 中集合是通過 Hash 算法實現的無序不重復的元素集。

        我們來做一些測試:

        $ python -m timeit -n 1000 "[x for x in range(1000) if x in range(600, 1000)]"
        1000 loops, best of 3: 6.44 msec per loop
        $ python -m timeit -n 1000 "set(range(100)).intersection(range(60, 100))"   
        1000 loops, best of 3: 9.18 usec per loop
        

        實際上 set 的 union、intersection、difference 等操作要比 list 的迭代要快。因此如果涉及求 list 交集、并集或者差等問題可以轉換為 set 來操作。

        建議 88:使用 multiprocess 克服 GIL 的缺陷

        多進程 Multiprocess 是 Python 中的多進程管理包,在 Python2.6 版本中引進的,主要用來幫助處理進程的創建以及它們之間的通信和相互協調。它主要解決了兩個問題:一是盡量縮小平臺之間的差異,提供高層次的 API 從而使得使用者忽略底層 IPC 的問題;二是提供對復雜對象的共享支持,支持本地和遠程并發。

        類 Process 是 multiprocess 中較為重要的一個類,用戶創建進程,其構造函數如下:

        Process([group[, target[, name[, args[, kwargs]]]]])

        其中,參數 target 表示可調用對象;args 表示調用對象的位置參數元組;kwargs 表示調用對象的字典;name 為進程的名稱;group 一般設置為 None。該類提供的方法與屬性基本上與 threading.Thread 一致,包括 is_alive()、join([timeout])、run()、start()、terminate()、daemon(要通過 start() 設置)、exitcode、name、pid 等。

        不同于線程,每個進程都有其獨立的地址空間,進程間的數據空間也相互獨立,因此進程之間數據的共享和傳遞不如線程來得方便。慶幸的是 multiprocess 模塊中都提供了相應的機制:如進程間同步操作原語 Lock、Event、Condition、Semaphore,傳統的管道通信機制 pipe 以及隊列 Queue,用于共享資源的 multiprocess.Value 和 multiprocess.Array 以及 Manager 等。

        Multiprocessing 模塊在使用上需要注意以下幾個要點:

        1. 進程之間的的通信優先考慮 Pipe 和 Queue,而不是 Lock、Event、Condition、Semaphore 等同步原語。進程中的類 Queue 使用 pipe 和一些 locks、semaphores 原語來實現,是進程安全的。該類的構造函數返回一個進程的共享隊列,其支持的方法和線程中的 Queue 基本類似,除了方法 task_done()和 join() 是在其子類 JoinableQueue 中實現的以外。需要注意的是,由于底層使用 pipe 來實現,使用 Queue 進行進程之間的通信的時候,傳輸的對象必須是可以序列化的,否則 put 操作會導致 PicklingError。此外,為了提供 put 方法的超時控制,Queue 并不是直接將對象寫到管道中而是先寫到一個本地的緩存中,再將其從緩存中放入 pipe 中,內部有個專門的線程 feeder 負責這項工作。由于 feeder 的存在,Queue 還提供了以下特殊方法來處理進程退出時緩存中仍然存在數據的問題。

          • close():表明不再存放數據到 queue 中。一旦所有緩沖的數據刷新到管道,后臺線程將退出。

          • join_thread():一般在 close 方法之后使用,它會阻止直到后臺線程退出,確保所有緩沖區中的數據已經刷新到管道中。

          • cancel_join_thread():需要立即退出當前進程,而無需等待排隊的數據刷新到底層管道的時候可以使用該方法,表明無須阻止到后臺進程的退出。

          Multiprocessing 中還有個 SimpleQueue 隊列,它是實現了鎖機制的 pipe,內部去掉了 buffer,但沒有提供 put 和 get 的超時處理,兩個動作都是阻塞的。

          除了 multiprocessing.Queue 之外,另一種很重要的通信方式是 multiprocessing.Pipe。它的構造函數為 multiprocess.Pipe([duplex]),其中 duplex 默認為 True,表示為雙向管道,否則為單向。它返回一個 Connection 對象的組(conn1, conn2),分別表示管道的兩端。Pipe 不支持進程安全,因此當有多個進程同時對管道的一端進行讀操作或者寫操作的時候可能會導致數據丟失或者損壞。因此在進程通信的時候,如果是超過 2 個以上的線程,可以使用 queue,但對于兩個進程之間的通信而言 Pipe 性能更快。

          from multiprocessing import Process, Pipe, Queue
          import time
          
          def reader_pipe(pipe):
              output_p, input_p = pipe    # 返回管道的兩端
              inout_p.close()
              while True:
                  try:
                      msg = output_p.recv()    # 從 pipe 中讀取消息
                  except EOFError:
                          break
          
          def writer_pipe(count, input_p):    # 寫消息到管道中
              for i in range(0, count):
                  input_p.send(i)                # 發送消息
          
          def reader_queue(queue):            # 利用隊列來發送消息
              while True:
                  msg = queue.get()            # 從隊列中獲取元素
                  if msg == "DONE":
                      break
          
          def writer_queue(count, queue):
              for ii in range(0, count):
                  queue.put(ii)                # 放入消息隊列中
              queue.put("DONE")
          
          if __name__ == "__main__":
              print("testing for pipe:")
              for count in [10 ** 3, 10 ** 4, 10 ** 5]:
                  output_p, input_p = Pipe()
                  reader_p = Process(target=reader_pipe, args=((output_p, input_p),))
                  reader_p.start()            # 啟動進程
                  output_p.close()
                  _start = time.time()
                  writer_pipe(count, input_p)    # 寫消息到管道中
                  input_p.close()
                  reader_p.join()                # 等待進程處理完畢
                  print("Sending {} numbers to Pipe() took {} seconds".format(count, (time.time() - _start)))
          
              print("testsing for queue:")
              for count in [10 ** 3, 10 ** 4, 10 ** 5]:
                  queue = Queue()                # 利用 queue 進行通信
                  reader_p = Process(target=reader_queue, args=((queue),))
                  reader_p.daemon = True
                  reader_p.start()
          
                  _start = time.time()
                  writer_queue(count, queue)    # 寫消息到 queue 中
                  reader_p.join()
                  print("Seding {} numbers to Queue() took {} seconds".format(count, (time.time() - _start)))
          

          上面代碼分別用來測試兩個多線程的情況下使用 pipe 和 queue 進行通信發送相同數據的時候的性能,從函數輸出可以看出,pipe 所消耗的時間較小,性能更好。

        2. 盡量避免資源共享。相比于線程,進程之間資源共享的開銷較大,因此要盡量避免資源共享。但如果不可避免,可以通過 multiprocessing.Value 和 multiprocessing.Array 或者 multiprocessing.sharedctpyes 來實現內存共享,也可以通過服務器進程管理器 Manager() 來實現數據和狀態的共享。這兩種方式各有優勢,總體來說共享內存的方式更快,效率更高,但服務器進程管理器 Manager() 使用起來更為方便,并且支持本地和遠程內存共享。

          # 示例一
          import time
          from multiprocessing import Process, Value
          
          def func(val):    # 多個進程同時修改 val
              for i in range(10):
                  time.sleep(0.1)
                  val.value += 1
          
          if __name__ == "__main__":
              v = Value("i", 0)    # 使用 value 來共享內存
              processList = [Process(target=func, args=(v,)) for i in range(10)]
              for p in processList: p.start()
              for p in processList: p.join()
              print v.value
          # 修改 func 函數,真正控制同步訪問
          def func(val):
              for i in range(10):
                  time.sleep(0.1)
                  with val.get_lock():    # 仍然需要使用 get_lock 方法來獲取鎖對象
                      val.value += 1
          # 示例二
          import multiprocessing
          def f(ns):
              ns.x.append(1)
              ns.y.append("a")
          
          if __name__ == "__main__":
              manager = multiprocessing.Manager()
              ns = manager.Namespace()
              ns.x = []    # manager 內部包括可變對象
              ns.y = []
          
              print("before process operation: {}".format(ns))
              p = multiprocessing.Process(target=f, args=(ns,))
              p.start()
              p.join()
              print("after process operation {}".format(ns))    # 修改根本不會生效
          # 修改
          import multiprocessing
          def f(ns, x, y):
              x.append(1)
              y.append("a")
              ns.x = x    # 將可變對象也作為參數傳入
              ns.y = y
          
          if __name__ == "__main__":
              manager = multiprocessing.Manager()
              ns = manager.Namespace()
              ns.x = []    # manager 內部包括可變對象
              ns.y = []
          
              print("before process operation: {}".format(ns))
              p = multiprocessing.Process(target=f, args=(ns, ns.x, ns.y))
              p.start()
              p.join()
              print("after process operation {}".format(ns))
          
        3. 注意平臺之間的差異。由于 Linux 平臺使用 fork() 來創建進程,因此父進程中所有的資源,如數據結構、打開的文件或者數據庫的連接都會在子進程中共享,而 Windows 平臺中父子進程相對獨立,因此為了更好地保持平臺的兼容性,最好能夠將相關資源對象作為子進程的構造函數的參數傳遞進去。要避免如下方式:

          f = None
          def child(f):
              # do something
          
          if __name__ == "__main__":
              f = open(filename, mode)
              p = Process(target=child)
              p.start()
              p.join()
          # 推薦的方式
          def child(f):
              print(f)
          
          if __name__ == "__main__":
              f = open(filename, mode)
              p = Process(target=child, args=(f, ))    # 將資源對象作為構造函數參數傳入
              p.start()
              p.join()
          

          需要注意的是,Linux 平臺上 multiprocessing 的實現是基于 C 庫中的 fork(),所有子進程與父進程的數據是完全相同,因此父進程中所有的資源,如數據結構、打開的文件或者數據庫的連接都會在子進程中共享。但 Windows 平臺上由于沒有 fork() 函數,父子進程相對獨立,因此保持了平臺的兼容性,最好在腳本中加上 if __name__ == "__main__" 的判斷,這樣可以避免出現 RuntimeError 或者死鎖。

        4. 盡量避免使用 terminate() 方式終止進程,并且確保 pool.map 中傳入的參數是可以序列化的。

          import multiprocessing
          def unwrap_self_f(*args, **kwargs):
              return calculate.f(*args, **kwargs)    # 返回一個對象
          
          class calculate(object):
              def f(self, x):
                  return x * x
              def run(self):
                  p = multiprocessing.Pool()
                  return p.map(unwrap_self_f, zip([self] * 3, [1, 2, 3]))
          
          if __name__ == "__main__":
              c1 = calculate()
              print(c1.run())
          

        建議 89:使用線程池提高效率

        我們知道線程的生命周期分為 5 個狀態:創建、就緒、運行、阻塞和終止。自線程創建到終止,線程便不斷在運行、就緒和阻塞這 3 個狀態之間轉換直至銷毀。而真正占有 CPU 的只有運行、創建和銷毀這 3 個狀態。一個線程的運行時間由此可以分為 3 部分:線程的啟動時間(Ts)、線程體的運行時間(Tr)以及線程的銷毀時間(Td)。在多線程處理的情境中,如果線程不能夠被重用,就意味著每次創建都需要經過啟動、銷毀和運行這 3 個過程。這必然會增加系統的相應時間,降低效率。而線程體的運行時間 Tr 不可控制,在這種情況下要提高線程運行的效率,線程池便是一個解決方案。

        線程池通過實現創建多個能夠執行任務的線程放入池中,所要執行的任務通常被安排在隊列中。通常情況下,需要處理的任務比線程的數目要多,線程執行完當前任務后,會從隊列中取下一個任務,直到所有的任務已經完成。

        由于線程預先被創建并放入線程池中,同時處理完當前任務之后并不銷毀而是被安排處理下一個任務,因此能夠避免多次創建線程,從而節省線程創建和銷毀的開銷,帶來更好的性能和系統穩定性。線程池技術適合處理突發性大量請求或者需要大量線程來完成任務、但任務實際處理時間較短的應用場景,它能有效避免由于系統中創建線程過多而導致的系統性能負載過大、響應過慢等問題。

        Python 中利用線程池有兩種解決方案:一是自己實現線程池模式,二是使用線程池模塊。

        先來看一個線程池模式的簡單實現:線程池代碼

        本段代碼偏長,編輯到知乎上始終無法保存,只好貼我的博客地址,各位可轉到博客去看。

        自行實現線程,需要定義一個 Worker 處理工作請求,定義 WorkerManager 來進行線程池的管理和創建,它包含一個工作請求隊列和執行結果隊列,具體的下載工作通過 download_file 方法實現。

        相比自己實現的線程池模型,使用現成的線程池模塊往往更簡單。Python 中線程池模塊的下載地址。該模塊提供了以下基本類和方法:

        • threadpool.ThreadPool:線程池類,主要的作用是用來分派任務請求和收集運行結果。主要有以下方法:

          • __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):建立線程池,并啟動對應 num_workers 的線程;q_size 表示任務請求隊列的大小,resq_size 表示存放運行結果隊列的大小。

          • createWorkers(self, num_workers, poll_timeout=5):將 num_workers 數量對應的線程加入線程池中。

          • dismissWorkers(self, num_workers, do_join=False):告訴 num_workers 數量的工作線程當執行完當前任務后退出

          • joinAllDismissedWorkers(self):在設置為退出的線程上執行 Thread.join

          • putRequest(self, request, block=True, timeout=None):將工作請求放入隊列中

          • poll(self, block=False):處理任務隊列中新的請求wait(self):阻塞用于等待所有執行結果。注意當所有執行結果返回后,線程池內部的線程并沒有銷毀,而是在等待新的任務。因此,wait() 之后仍然可以再次調用 pool.putRequests()往其中添加任務

        • threadpool.WorkRequest:包含有具體執行方法的工作請求類

        • threadpool.WorkerThread:處理任務的工作線程,主要有 run() 方法以及 dismiss() 方法。

        • makeRequests(callable_, args_list, callback=None, exec_callback=_handle_thread_exception):主要函數,作用是創建具有相同的執行函數但參數不同的一系列工作請求。

        再來看一個線程池實現的例子:

        import urllib2
        import os
        import time
        import threadpool
        
        def download_file(url):
            print("begin download {}".format(url ))
            urlhandler = urllib2.urlopen(url)
            fname = os.path.basename(url) + ".html"
            with open(fname, "wb") as f:
                while True:
                    chunk = urlhandler.read(1024)
                    if not chunk:
                        break
                    f.write(chunk)
        
        urls = ["http://wiki.python.org/moni/WebProgramming",
               "https://www.createspace.com/3611970",
               "http://wiki.python.org/moin/Documention"]
        pool_size = 2
        pool = threadpool.ThreadPool(pool_size)    # 創建線程池,大小為 2
        requests = threadpool.makrRequests(download_file, urls)    # 創建工作請求
        [pool.putRequest(req) for req in requests]
        
        print("putting request to pool")
        pool.putRequest(threadpool.WorkRequest(download_file, args=["http://chrisarndt.de/projects/threadpool/api/",]))    # 將具體的請求放入線程池
        pool.putRequest(threadpool.WorkRequest(download_file, args=["https://pypi.python.org/pypi/threadpool",]))
        pool.poll()    # 處理任務隊列中的新的請求
        pool.wait()
        print("destory all threads before exist")
        pool.dismissWorkers(pool_size, do_join=True)    # 完成后退出
        

        建議 90:使用 C/C++ 模塊擴展高性能

        Python 具有良好的可擴展性,利用 Python 提供的 API,如宏、類型、函數等,可以讓 Python 方便地進行 C/C++ 擴展,從而獲得較優的執行性能。所有這些 API 卻包含在 Python.h 的頭文件中,在編寫 C 代碼的時候引入該頭文件即可。

        來看一個簡單的例子:

        1、先用 C 實現相關函數,實現素數判斷,也可以直接使用 C 語言實現相關函數功能后再使用 Python 進行包裝。

        #include "Python.h"
        static PyObject * pr_isprime(PyObject, *self, PyObject * args) {
          int n, num;
          if (!PyArg_ParseTuple(args, "i", &num))    // 解析參數
            return NULL;
          if (num < 1) {
            return Py_BuildValue("i", 0);    // C 類型的數據結構轉換成 Python 對象
          }
          n = num - 1;
          while (n > 1) {
            if (num % n == 0) {
              return Py_BuildValue("i", 0);
              n--;
            }
          }
          return Py_BuildValue("i", 1);
        }
        
        static PyMethodDef PrMethods[] = {
          {"isPrime", pr_isprime, METH_VARARGS, "check if an input number is prime or not."},
          {NULL, NULL, 0, NULL}
        };
        
        void initpr(void) {
          (void) Py_InitModule("pr", PrMethods);
        }
        

        上面的代碼包含以下 3 部分:

        • 導出函數:C 模塊對外暴露的接口函數 pr_isprime,帶有 self 和 args 兩個參數,其中參數 args 中包含了 Python 解釋器要傳遞給 C 函數的所有參數,通常使用函數 PyArg_ParseTuple() 來獲得這些參數值

        • 初始化函數:以便 Python 解釋器能夠對模塊進行正確的初始化,初始化時要以 init 開頭,如 initp

        • 方法列表:提供給外部的 Python 程序使用的一個 C 模塊函數名稱映射表 PrMethods。它是一個 PyMethodDef 結構體,其中成員依次表示方法名、導出函數、參數傳遞方式和方法描述。看下面的例子:

         struct PyMethodDef {
            char * m1_name;        // 方法名
            PyCFunction m1_meth;    // 導出函數
            int m1_flags;            // 參數傳遞方法
            char * m1_doc;        // 方法描述
         }
        

        參數傳遞方法一般設置為 METH_VARARGS,如果想傳入關鍵字參數,則可以將其與 METH_KEYWORDS 進行或運算。若不想接受任何參數,則可以將其設置為 METH_NOARGS。該結構體必須與 {NULL, NULL, 0, NULL} 所表示的一條空記錄來結尾。

        2、編寫 setup.py 腳本。

        from distutils.core import setup, Extension
        module = Extension("pr", sources=["testextend.c"])
        setup(name="Pr test", version="1.0", ext_modules=[module])
        

        3、使用 python setup.py build 進行編譯,系統會在當前目錄下生成一個 build 子目錄,里面包含 pr.so 和 pr.o 文件。

        4、將生成的文件 py.so 復制到 Python 的 site_packages 目錄下,或者將 pr.so 所在目錄的路徑添加到 sys.path 中,就可以使用 C 擴展的模塊了。

        更多關于 C 模塊擴展的內容請讀者參考

        建議 91:使用 Cython 編寫擴展模塊

        Python-API 讓大家可以方便地使用 C/C++ 編寫擴展模塊,從而通過重寫應用中的瓶頸代碼獲得性能提升。但是,這種方式仍然有幾個問題:

        1. 掌握 C/C++ 編程語言、工具鏈有巨大的學習成本

        2. 即便是 C/C++ 熟手,重寫代碼也有非常多的工作,比如編寫特定數據結構、算法的 C/C++ 版本,費時費力還容易出錯

        所以整個 Python 社區都在努力實現一個 ”編譯器“,它可以把 Python 代碼直接編譯成等價的 C/C++ 代碼,從而獲得性能提升,如 Pyrex、Py2C 和 Cython 等。而從 Pyrex 發展而來的 Cython 是其中的集大成者。

        Cython 通過給 Python 代碼增加類型聲明和直接調用 C 函數,使得從 Python 代碼中轉換的 C 代碼能夠有非常高的執行效率。它的優勢在于它幾乎支持全部 Python 特性,也就是說,基本上所有的 Python 代碼都是有效的 Cython 代碼,這使得將 Cython 技術引入項目的成本降到最低。除此之外,Cython 支持使用 decorator 語法聲明類型,甚至支持專門的類型聲明文件,以使原有的 Python 代碼能夠繼續保持獨立,這些特性都使得它得到廣泛應用,比如 PyAMF、PyYAML 等庫都使用它編寫自己的高效率版本。

        # 安裝
        $ pip install -U cython
        # 生成 .c 文件
        $ cython arithmetic.py
        # 提交編譯器
        $ gcc -shared -pthread -fPIC -fwrapv -02 -Wall -fno-strict-aliasing -I /usr/include/python2.7 -o arithmetic.so arithmetic.c
        # 這時生成了 arithmetic.so 文件
        # 我們就可以像 import 普通模塊一樣使用它
        

        每一次都需要編譯、等待有點麻煩,所以 Cython 很體貼地提供了無需顯式編譯的方案:pyximport。只要將原有的 Python 代碼后綴名從 .py 改為 .pyx 即可。

        $ cp arithmetic.py arithmetic.pyx
        $ cd ~
        $ python
        >>> import pyximport; pyximport.install()
        >>> import arithmetic
        >>> arithmetic.__file__
        

        從 __file__ 屬性可以看出,這個 .pyx 文件已經被編譯鏈接為共享庫了,pyximport 的確方便啊!

        接下來我們看看 Cython 是如何提升性能的。

        在 GIS 中,經常需要計算地球表面上兩點之間的距離:

        import math
        def great_circle(lon1, lat1, lon2, lat2):
            radius = 3956    # miles
            x = math.pi / 180.0
            a = (90.0 - lat1) * (x)
            b = (90.0 - lat2) * (x)
            theta = (lon2 - lon1) * (x)
            c = math.acos(math.cos(a) * math.cos(b)) + (math.sin(a) * math.sin(b) * math.cos(theta))
            return radius * c
        

        接下來嘗試 Cython 進行改寫:

        import math
        def great_circle(float lon1, float lat1, float lon2, float lat2):
            cdef float radius = 3956.0
            cdef float pi = 3.14159265
            cdef float x = pi / 180.0
            cdef float a, b, theta, c
            a = (90.0 - lat1) * (x)
            b = (90.0 - lat2) * (x)
            theta = (lon2 - lon1) * (x)
            c = math.acos(math.cos(a) * math.cos(b)) + (math.sin(a) * math.sin(b) * math.cos(theta))
            return radius * c
        

        通過給 great_circle 函數的參數、中間變量增加類型聲明,Cython 代碼業務邏輯代碼一行沒改。使用 timeit 庫可以測定提速將近 2 成,說明類型聲明對性能提升非常有幫助。這時候,還有一個性能瓶頸,調用的 math 庫是一個 Python 庫,性能較差,可以直接調用 C 函數來解決:

        cdef extern from "math.h":
            float cosf(float theta)
            float sinf(float theta)
            float acosf(float theta)
        
        def greate_circle(float lon1, float lat1, float lon2, float lat2):
            cdef float radius = 3956.0
            cdef float pi = 3.14159265
            cdef float x = pi / 180.0
            cdef float a, b, theta, c
            a = (90.0 - lat1) * (x)
            b = (90.0 - lat2) * (x)
            theta = (lon2 - lon1) * (x)
            c = acosf((cosf(a) * cosf(b)) + (sinf(a) * sinf(b) * cosf(theta)))
            return radius * c
        

        Cython 使用 cdef extern from 語法,將 math.h 這個 C 語言庫頭文件里聲明的 cofs、sinf、acosf 等函數導入代碼中。因為減少了 Python 函數調用和調用時產生的類型轉換開銷,使用 timeit 測試這個版本的代碼性能提升了 5 倍有余。

        通過這個例子,可以掌握 Cython 的兩大技能:類型聲明和直接調用 C 函數。比起直接使用 C/C++ 編寫擴展模塊,使用 Cython 的方法方便得多。

        除了使用 Cython 編寫擴展模塊提升性能之外,Cython 也可以用來把之前編寫的 C/C++ 代碼封裝成 .so 模塊給 Python 調用(類似 boost.python/SWIG 的功能),Cython 社區已經開發了許多自動化工具。

        QQ群:WEB開發者官方群(515171538),驗證消息:10000
        微信群:加小編微信 849023636 邀請您加入,驗證消息:10000
        提示:更多精彩內容關注微信公眾號:全棧開發者中心(fsder-com)
        網友評論(共1條評論) 正在載入評論......
        理智評論文明上網,拒絕惡意謾罵 發表評論 / 共1條評論
        登錄會員中心
        江苏快3投注技巧