python3效率优化笔记
多进程:multiprocessing库
创建一个进程
import multiprocessing
import time
def worker(interval):
n=5
while n>0:
print('当前时间:{}'.format(time.ctime()))
time.sleep(interval)
n-=1
if __name__=="__main__":
#Process([group [, target [, name [, args [, kwargs]]]]])参数说明:
#group:线程组,目前还没有实现,库引用中提示必须是None;
#target:表示调用对象。
#args:表示调用对象的位置参数元组。
#name:别名
#kwargs:表示调用对象的字典。
p=multiprocessing.Process(target=worker,args=(3,)
#进程开始
p.start()
#p.pid进程的ID号
print('p.pid:{}'.format(p.pid))
#p.name进程的默认名
print('p.name:{}'.format(p.name))
#is_alive()判断进程的状态
print('p.is_alive:{}'.format(p.is_alive()))
#将进程定义为类
import multiprocessing
import time
class ClockProces(multiprocessing.Process):
def __init__(self,interval):
super(ClockProces,self).__init__()
self.interval=interval
def run(self):
n=6
while n>1:
print('当前时间为:{}'.format(time.ctime()))
time.sleep(self.interval)
n=-1
if __name__=='__main__':
p=ClockProces(3)
p.start()
daemon属性
import multiprocessing
import time
def worker(interval):
print("work start:{0}".format(time.ctime()));
time.sleep(interval)
print("work end:{0}".format(time.ctime()));
if __name__ == "__main__":
p = multiprocessing.Process(target = worker, args = (3,))
#默认为False,打开后该进程变为守护进程,当主进程运行完后,守护进程同时关闭
p.daemon = True
p.start()
#如果此时执行join()方法,主线程则会等待守护线程结束后再关闭程序
#p.join()
print "end!"
进程锁
import multiprocessing
import sys
def worker_with(lock,f):
with lock:
fs=open(f,'a+')
n=5
while n>1:
fs.write('Lockd acquired via with\n')
m-=1
fs.close()
def worker_nowith(lock,f):
lock.acquire('Lock acquired directly\n')
try:
fs=open(f,'a+')
n=5
while n>1:
fs.write()
fs.close()
finally:
lock.release()
if __name__='__main__':
lock=multiprocessing.lock()
f=file.text
w=multiprocessing.Process(target=worker_with,args=(lock,f))
nw=multiprocessing.Process(target=worker_nowith,args=(lock,f))
w.start()
nw.start()
print('end')
信号量semaphore限制同时执行的进程数
import multiprocessing
import time
def worker(s,i):
s.acquire()
print(multiprocessing.current_process().name+"acquire")
time.sleep(i)
print(multiprocessing.current_process().name+"release\n")
s.release()
if __name__=='__main__':
s=multiprocessing.Semaphore(2)
for i in range(5):
p=multiprocessing.Process(target=worker,args=(s,i*2))
p.start()
多个进程
import multiprocessing
import time
def worker_1(interval):
print('worker_1')
time.sleep(interval)
print('end worker_1')
def worker_2(interval):
print('worker_2')
time.sleep(interval)
print('end worker_2')
def worker_3(interval):
print('worker_3')
time.sleep(interval)
print('end worker_3')
if __name__=='__main__':
p1=multiprocessing.Process(target=worker_1,args=(2,))
p2=multiprocessing.Process(target=worker_1,args=(3,))
p3=multiprocessing.Process(target=worker_1,args=(4,))
p1.start()
p2.start()
p3.start()
#cpu_count()用来获得当前的CPU的核数,可以用来设置接下来子进程的个数。
print("The number of CPU is:" + str(multiprocessing.cpu_count()))
#active_children()用来获得当前所有的子进程,包括daemon和非daemon子进程。
for p in multiprocessing.active_children():
print("child p.name:" + p.name + "\tp.id" + str(p.pid))
print("END!!!!!!!!!!!!!!!!!")
锁Lock,针对资源访问做限制
import multiprocessing
def worker_with(lock,f):
with lock:
fs=open(f,'a+')
n=10
while n>1:
fs.write('Lockd acquired via with\n')
n=-1
fs.close()
def worker_no_with(lock,f):
#获取锁,防止其他进程对资源同时进行操作进行
lock.acquire()
try:
fs=open(f,'a+')
n=10
while n>1:
fs.write('Lock acquired directly\n')
n=-1
fs.close()
finally:
#释放锁后,其他进程正常访问资源
lock.release()
if __name__=='__main__':
lock=multiprocess.Lock()
f='file.text'
w=multiprocess.Process(target=worker_with,args=(lock,f))
nw=multiprocess.Process(target=worker_no_with,args=(lock,f))
w.start()
nw.start()
print('end')
信号量-Semaphore(针对同时运行的进程数做限制)
#Semaphore管理一个内置的计数器,
#每当调用acquire()时内置计数器-1;
#调用release() 时内置计数器+1;
#计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
import multiprocessing
import time
def work(s,i)
s.acquire()
print(multiprocessing.current_process().name + "acquire")
time.sleep(i)
print(multiprocessing.current_process().name + "release\n")
s.release()
if __name__='__main__':
s=multipocessing.Semaphore(2)
for i in range(5):
p=multiprocessing.Process(target = worker, args=(s, i*2))
p.start()
进程之间的通信标志:信号标志Event(针对进程的运行顺序做限制)
#Event内部包含了一个标志位,初始的时候为false。
#可以使用使用set()来将其设置为true;
#或者使用clear()将其从新设置为false;
#可以使用is_set()来检查标志位的状态;
#函数就是wait(timeout=None),用来阻塞当前线程,直到event的内部标志位被设置为true或者timeout超
#时。超时之后wait(t)依旧会返回False值,可以用来记录当前被阻塞线程的状态
import multiprocessing
import time
def work_for_event(e):
print('wait_for_event: starting')
e.wait()
print('wait_for_event:e.is_set()->'+str(e.is_set()))
def wait_for_event_timeout(e, t):
print("wait_for_event_timeout:starting")
e.wait(t)
print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))
if __name__=='__main__':
e=multiprocessing.Event()
w1 = multiprocessing.Process(name = "block",
target = wait_for_event,
args = (e,))
w2 = multiprocessing.Process(name = "non-block",
target = wait_for_event_timeout,
args = (e, 2))
w1.start()
w2.start()
time.sleep(3)
e.set()
print("main: event is set")
数据结构
队列Queue
import multiprocessing
def writer_pro(q):
try:
#put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为
#True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如
#果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.put(1,block=False)
except:
pass
def reader_pro(q):
try:
#get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为
#True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
#如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出
#Queue.Empty异常。
#get后不需要task_gone()
print(q.get(block=False))
except:
pass
#put block参数是一种保护机制,默认为true,当队列满的时候,进程就好堵塞并自动等待空出新的位置否
#则的话为false,则不管队列状态,如果此时队列已满,则抛出Queue.Full
#get block参数,默认为true,当队列为空的时候,进程就会被置于阻塞状态,自动等待队列中有新的元素
#可取,否则为ifalse,则不管队列状态,如果此时队列为空,则抛出Queue.Empty
if __name__=='__main__':
q=multiprocessing.Queue()
writer=multiprocessing.Process(target=writer_pro,args=(q,))
reader=multiprocessing.Process(target=reader_pro,args=(q,))
writer.start()
reader.start()
#调用join()方法后,主程序会被阻塞,直到调用该方法的函数完成运行,join方法有参数timeout,如果
#时间超过timeout,则该函数会被主动终止,并返回None
writer.join()
reader.join()
通道 pipe
import multiprocessing
import time
def pro1(pipe):
while True:
for i in range(10000):
print('pro1 send:{}'.format(i))
pipe.send(i)
time.sleep(1)
def pro2(pipe):
while True:
print('pro2 rev:',pipe.recv())
time.sleep(1)
def pro3(pipe):
while True:
print('pro3 rev:',pipe.recv())
time.sleep(1)
if __name__=='__main__':
#Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为
#True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,
#conn1只负责接受消息,conn2只负责发送消息。
pipe=multiprocessing.Pipe()
p1=multiprocessing.Process(target=pro1,args=(pipe[0],))
p2 = multiprocessing.Process(target=pro2, args=(pipe[1],))
p3 = multiprocessing.Process(target=pro3, args=(pipe[1],))
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
进程池Pool
import multiprocessing
import time
def func(msg):
print('msg',msg)
time.sleep(3)
print('end')
if __name__=='__main__':
pool=multiprocessing.Pool(3)
for i in range(4):
msg='hello {}'.format(i)
#apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[,
#kwds]])是阻塞的
pool.apply_async(func,(msg,))
print("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~")
#close() 关闭pool,使其不在接受新的任务。
#terminate() 结束工作进程,不在处理未完成的任务。
pool.close()
#join()使主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
#调用join之前,一定要先调用close() 函数,否则会出错, close()执行后不会有新的进程加入到
#pool,join函数等待素有子进程结束
print ('All subprocesses done.')
pool.join()
print ("Sub-process(es) done.")