一、多线程化选择
并行化一个代码有两大选择:multithread 和 multiprocess。
Multithread,多线程,同一个进程(process)可以开启多个线程执行计算。每个线程代表了一个 CPU 核心,这么多线程可以访问同样的内存地址(所谓共享内存),实现了线程之间的通讯,算是最简单的并行模型。
Multiprocess,多进程,则相当于同时开启多个 Python 解释器,每个解释器有自己独有的数据,自然不会有数据冲突。
二、并行化思想
并行化的基本思路是把 dataframe 用 np.array_split 方法切割成多个子 dataframe。再调用 Pool.map 函数并行地执行。注意到顺序执行的 pandas.DataFrame.apply 是如何转化成 Pool.map 然后并行执行的。
Pool 对象是一组并行的进程,开源Pool类
开源Pool类定义
def Pool(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
'''Returns a process pool object'''
from .pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
context=self.get_context())
设置进程初始化函数
def init_process(global_vars): global a a = global_vars
设置进程初始化函数
Pool(processes=8,initializer=init_process,initargs=(a,))
其中,指定产生 8 个进程,每个进程的初始化需运行 init_process函数,其参数为一个 singleton tuple a. 利用 init_process 和 initargs,我们可以方便的设定需要在进程间共享的全局变量(这里是 a)。
with 关键词是 context manager,避免写很繁琐的处理开关进程的逻辑。
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
result_parts = pool.map(apply_f,df_parts)
三、多线程化应用
多线程时间比较和多线程的几种apply应用
import numpy as np
import pandas as pd
import time
from multiprocessing import Pool
def f(row):
#直接对某列进行操作
return sum(row)+a
def f1_1(row):
#对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作
return row[0]**2
def f1_2(row1):
#对某一列进行操作,我这里的columns=range(0,2),此处是对第0列进行操作
return row1**2
def f2_1(row):
#对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作
return pd.Series([row[0]**2,row[1]**2],index=['1_1','1_2'])
def f2_2(row1,row2):
#对某两列进行操作,我这里的columns=range(0,2),此处是对第0,2列进行操作
return pd.Series([row1**2,row2**2],index=['2_1','2_2'])
def apply_f(df):
return df.apply(f,axis=1)
def apply_f1_1(df):
return df.apply(f1_1,axis=1)
def apply_f1_2(df):
return df[0].apply(f1_2)
def apply_f2_1(df):
return df.apply(f2_1,axis=1)
def apply_f2_2(df):
return df.apply(lambda row :f2_2(row[0],row[1]),axis=1)
def init_process(global_vars):
global a
a = global_vars
def time_compare():
'''直接调用和多线程调用时间对比'''
a = 2
np.random.seed(0)
df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
print(df.columns)
t1= time.time()
result_serial = df.apply(f,axis=1)
t2 = time.time()
print("Serial time =",t2-t1)
print(result_serial.head())
df_parts=np.array_split(df,20)
print(len(df_parts),type(df_parts[0]))
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
#with Pool(processes=8) as pool:
result_parts = pool.map(apply_f,df_parts)
result_parallel= pd.concat(result_parts)
t3 = time.time()
print("Parallel time =",t3-t2)
print(result_parallel.head())
def apply_fun():
'''多种apply函数的调用'''
a = 2
np.random.seed(0)
df = pd.DataFrame(np.random.rand(10**5,2),columns=range(0,2))
print(df.columns)
df_parts=np.array_split(df,20)
print(len(df_parts),type(df_parts[0]))
with Pool(processes=8,initializer=init_process,initargs=(a,)) as pool:
#with Pool(processes=8) as pool:
res_part0 = pool.map(apply_f,df_parts)
res_part1 = pool.map(apply_f1_1,df_parts)
res_part2 = pool.map(apply_f1_2,df_parts)
res_part3 = pool.map(apply_f2_1,df_parts)
res_part4 = pool.map(apply_f2_2,df_parts)
res_parallel0 = pd.concat(res_part0)
res_parallel1 = pd.concat(res_part1)
res_parallel2 = pd.concat(res_part2)
res_parallel3 = pd.concat(res_part3)
res_parallel4 = pd.concat(res_part4)
print("f:\n",res_parallel0.head())
print("f1:\n",res_parallel1.head())
print("f2:\n",res_parallel2.head())
print("f3:\n",res_parallel3.head())
print("f4:\n",res_parallel4.head())
df=pd.concat([df,res_parallel0],axis=1)
df=pd.concat([df,res_parallel1],axis=1)
df=pd.concat([df,res_parallel2],axis=1)
df=pd.concat([df,res_parallel3],axis=1)
df=pd.concat([df,res_parallel4],axis=1)
print(df.head())
if __name__ == '__main__':
time_compare()
apply_fun()
参考网址
https://blog.fangzhou.me/posts/20170702-python-parallelism/
https://docs.python.org/3.7/library/multiprocessing.html
免责声明:本站文章均来自网站采集或用户投稿,网站不提供任何软件下载或自行开发的软件! 如有用户或公司发现本站内容信息存在侵权行为,请邮件告知! 858582#qq.com
《魔兽世界》大逃杀!60人新游玩模式《强袭风暴》3月21日上线
暴雪近日发布了《魔兽世界》10.2.6 更新内容,新游玩模式《强袭风暴》即将于3月21 日在亚服上线,届时玩家将前往阿拉希高地展开一场 60 人大逃杀对战。
艾泽拉斯的冒险者已经征服了艾泽拉斯的大地及遥远的彼岸。他们在对抗世界上最致命的敌人时展现出过人的手腕,并且成功阻止终结宇宙等级的威胁。当他们在为即将于《魔兽世界》资料片《地心之战》中来袭的萨拉塔斯势力做战斗准备时,他们还需要在熟悉的阿拉希高地面对一个全新的敌人──那就是彼此。在《巨龙崛起》10.2.6 更新的《强袭风暴》中,玩家将会进入一个全新的海盗主题大逃杀式限时活动,其中包含极高的风险和史诗级的奖励。
《强袭风暴》不是普通的战场,作为一个独立于主游戏之外的活动,玩家可以用大逃杀的风格来体验《魔兽世界》,不分职业、不分装备(除了你在赛局中捡到的),光是技巧和战略的强弱之分就能决定出谁才是能坚持到最后的赢家。本次活动将会开放单人和双人模式,玩家在加入海盗主题的预赛大厅区域前,可以从强袭风暴角色画面新增好友。游玩游戏将可以累计名望轨迹,《巨龙崛起》和《魔兽世界:巫妖王之怒 经典版》的玩家都可以获得奖励。