一、使用jit让python的速度快100倍
NumPy的创始人Travis,创建了CONTINUUM,致力于将Python大数据处理方面的应用。
推出的Numba项目能够将处理NumPy数组的Python函数JIT编译为==机器码执行==,从而上百倍的提高程序的运算速度。
 1import time
 2from numba import jit
 3
 4@jit
 5def foo(x,y):
 6    tt = time.time()
 7    s = 0
 8    for i in range(x,y):
 9        s += i
10    print('Time used: {} sec'.format(time.time()-tt))
11    return s
12
13> print(foo(1,100000000))
numba中提供了一些修饰器,它们可以将其修饰的函数JIT编译成机器码函数,并返回一个可在Python中调用机器码的包装对象。为了能将Python函数编译成能高速执行的机器码,我们需要告诉JIT编译器函数的各个参数和返回值的类型。我们可以通过多种方式指定类型信息,在上面的例子中,类型信息由一个字符串’f8(f8[:])’指定。其中’f8’表示8个字节双精度浮点数,括号前面的’f8’表示返回值类型,括号里的表示参数类型,’[:]’表示一维数组。
 1import time
 2import numba as nb
 3from numba import jit
 4
 5@jit('f8(f8[:])')
 6def sum1d(array):
 7    s = 0.0
 8    n = array.shape[0]
 9    for i in range(n):
10        s += array[i]
11    return s
12
13import numpy as np
14tt = time.time()
15array = np.random.random(100000000)
16sum1d(array)
17np.sum(array)
18sum(array)
19print('Time used: {} sec'.format(time.time()-tt))
JIT能针对所有类型的参数进行运算,可以使用autojit:
 1from numba import autojit
 2@autojit
 3def sum1d2(array):
 4s = 0.0
 5n = array.shape[0]
 6for i in range(n):
 7s += array[i]
 8return s
 9
10%timeit sum1d2(array)
11print sum1d2(np.ones(10, dtype=np.int32))
12print sum1d2(np.ones(10, dtype=np.float32))
13print sum1d2(np.ones(10, dtype=np.float64)
二、NumPy、Numba和Python异步编程的高性能大数据分析与对比
(1)NumPy是用于科学计算的基础Python包。它提供了强大的N维数组对象和复杂的(广播)功能。”导入NumPy库之后,Python程序的性能更好、执行速度更快、更容易保证一致性并能方便地使用大量的数学运算和矩阵功能。也许正因为如此,我们不再需要使用Python List对象了?重要的是,许多Python数据生态系统库都基于NumPy之上,像Pandas、SciPy、Matplotlib等等。
(2)asyncio应用场景应该是高IO负载下减少多线程切换的代价,由于PYTHON的特点,不管是用多线程,还是asyncio,都无法提高纯计算类任务的执行效率。            
(3)Numba提供了由Python直接编写的高性能函数来加速应用程序的能力。通过几个注释,面向数组和数学计算较多的Python代码就可以被实时编译为原生机器指令。而且Numba拥有类似于C、C++和FORTRAN的性能,无需切换语言或Python解释器。
asyncio
asyncio是Python3.4版本引入的标准库,直接内置了对异步IO的支持。
asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO。
 1import asyncio
 2
 3@asyncio.coroutine
 4def hello():
 5    print("Hello world!")
 6    # 异步调用asyncio.sleep(1):
 7    r = yield from asyncio.sleep(1)
 8    print("Hello again!")
 9
10# 获取EventLoop:
11loop = asyncio.get_event_loop()
12# 执行coroutine
13loop.run_until_complete(hello())
14loop.close()
@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。
hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。
 1import threading
 2import asyncio
 3
 4@asyncio.coroutine
 5def hello():
 6    print('Hello world! (%s)' % threading.currentThread())
 7    yield from asyncio.sleep(1)
 8    print('Hello again! (%s)' % threading.currentThread())
 9
10loop = asyncio.get_event_loop()
11tasks = [hello(), hello()]
12loop.run_until_complete(asyncio.wait(tasks))
13loop.close()
然后在coroutine内部用yield from调用另一个coroutine实现异步操作。也可以使用async和await来异步操作
1async def hello():
2    print("Hello world!")
3    r = await asyncio.sleep(1)
4    print("Hello again!")
分布式进程
在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:
 1# task_master.py
 2
 3import random, time, queue
 4from multiprocessing.managers import BaseManager
 5
 6# 发送任务的队列:
 7task_queue = queue.Queue()
 8# 接收结果的队列:
 9result_queue = queue.Queue()
10
11# 从BaseManager继承的QueueManager:
12class QueueManager(BaseManager):
13    pass
14
15# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
16QueueManager.register('get_task_queue', callable=lambda: task_queue)
17QueueManager.register('get_result_queue', callable=lambda: result_queue)
18# 绑定端口5000, 设置验证码'abc':
19manager = QueueManager(address=('', 5000), authkey=b'abc')
20# 启动Queue:
21manager.start()
22# 获得通过网络访问的Queue对象:
23task = manager.get_task_queue()
24result = manager.get_result_queue()
25# 放几个任务进去:
26for i in range(10):
27    n = random.randint(0, 10000)
28    print('Put task %d...' % n)
29    task.put(n)
30# 从result队列读取结果:
31print('Try get results...')
32for i in range(10):
33    r = result.get(timeout=10)
34    print('Result: %s' % r)
35# 关闭:
36manager.shutdown()
37print('master exit.')
Master/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。
 1# task_worker.py
 2
 3import time, sys, queue
 4from multiprocessing.managers import BaseManager
 5
 6# 创建类似的QueueManager:
 7class QueueManager(BaseManager):
 8    pass
 9
10# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
11QueueManager.register('get_task_queue')
12QueueManager.register('get_result_queue')
13
14# 连接到服务器,也就是运行task_master.py的机器:
15server_addr = '127.0.0.1'
16print('Connect to server %s...' % server_addr)
17# 端口和验证码注意保持与task_master.py设置的完全一致:
18m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
19# 从网络连接:
20m.connect()
21# 获取Queue的对象:
22task = m.get_task_queue()
23result = m.get_result_queue()
24# 从task队列取任务,并把结果写入result队列:
25for i in range(10):
26    try:
27        n = task.get(timeout=1)
28        print('run task %d * %d...' % (n, n))
29        r = '%d * %d = %d' % (n, n, n*n)
30        time.sleep(1)
31        result.put(r)
32    except Queue.Empty:
33        print('task queue is empty.')
34# 处理结束:
35print('worker exit.')
Python的分布式进程接口简单,封装良好,适合需要把繁重任务分布到多台机器的环境下。
注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。
