怎么在pandas apply中对并行进行处理?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。
对于一个带有Pandas DataFrame df的简单用例和一个应用func的函数,只需用parallel_apply替换经典的apply。
from pandarallel import pandarallel
# Initialization
pandarallel.initialize()
# Standard pandas apply
df.apply(func)
# Parallel apply
df.parallel_apply(func)
注意,如果不想并行化计算,仍然可以使用经典的apply方法。
另外可以通过在initialize函数中传递progress_bar=True来显示每个工作CPU的一个进度条。
https://pypi.python.org/pypi/joblib
# Embarrassingly parallel helper: to make it easy to write readable parallel code and debug it quickly
from math import sqrt
from joblib import Parallel, delayed
def test():
start = time.time()
result1 = Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10000))
end = time.time()
print(end-start)
result2 = Parallel(n_jobs=8)(delayed(sqrt)(i**2) for i in range(10000))
end2 = time.time()
print(end2-end)
-------输出结果----------
0.4434356689453125
0.6346755027770996
import multiprocessing as mp
with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(f, df['col'])
multiprocessing.cpu_count()
返回系统的CPU数量。
该数量不同于当前进程可以使用的CPU数量。可用的CPU数量可以由 len(os.sched_getaffinity(0)) 方法获得。
可能引发 NotImplementedError 。
参见os.cpu_count()
(1)代码
import sys
import time
import pandas as pd
import multiprocessing as mp
from joblib import Parallel, delayed
from pandarallel import pandarallel
from tqdm import tqdm, tqdm_notebook
def get_url_len(url):
url_list = url.split(".")
time.sleep(0.01) # 休眠0.01秒
return len(url_list)
def test1(data):
"""
不进行任何优化
"""
start = time.time()
data['len'] = data['url'].apply(get_url_len)
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("res:{}, cost time:{}".format(res, cost_time))
def test_mp(data):
"""
采用mp优化
"""
start = time.time()
with mp.Pool(mp.cpu_count()) as pool:
data['len'] = pool.map(get_url_len, data['url'])
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("test_mp \t res:{}, cost time:{}".format(res, cost_time))
def test_pandarallel(data):
"""
采用pandarallel优化
"""
start = time.time()
pandarallel.initialize()
data['len'] = data['url'].parallel_apply(get_url_len)
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("test_pandarallel \t res:{}, cost time:{}".format(res, cost_time))
def test_delayed(data):
"""
采用delayed优化
"""
def key_func(subset):
subset["len"] = subset["url"].apply(get_url_len)
return subset
start = time.time()
data_grouped = data.groupby(data.index)
# data_grouped 是一个可迭代的对象,那么就可以使用 tqdm 来可视化进度条
results = Parallel(n_jobs=8)(delayed(key_func)(group) for name, group in tqdm(data_grouped))
data = pd.concat(results)
end = time.time()
cost_time = end - start
res = sum(data['len'])
print("test_delayed \t res:{}, cost time:{}".format(res, cost_time))
if __name__ == '__main__':
columns = ['title', 'url', 'pub_old', 'pub_new']
temp = pd.read_csv("./input.csv", names=columns, nrows=10000)
data = temp
"""
for i in range(99):
data = data.append(temp)
"""
print(len(data))
"""
test1(data)
test_mp(data)
test_pandarallel(data)
"""
test_delayed(data)
(2) 结果输出
1k
res:4338, cost time:0.0018074512481689453
test_mp res:4338, cost time:0.2626469135284424
test_pandarallel res:4338, cost time:0.3467681407928467
1w
res:42936, cost time:0.008773326873779297
test_mp res:42936, cost time:0.26111721992492676
test_pandarallel res:42936, cost time:0.33237743377685547
10w
res:426742, cost time:0.07944369316101074
test_mp res:426742, cost time:0.294996976852417
test_pandarallel res:426742, cost time:0.39208269119262695
100w
res:4267420, cost time:0.8074917793273926
test_mp res:4267420, cost time:0.9741342067718506
test_pandarallel res:4267420, cost time:0.6779992580413818
1000w
res:42674200, cost time:8.027287006378174
test_mp res:42674200, cost time:7.751036882400513
test_pandarallel res:42674200, cost time:4.404983282089233
在get_url_len函数里加个sleep语句(模拟复杂逻辑),数据量为1k,运行结果如下:
1k
res:4338, cost time:10.054503679275513
test_mp res:4338, cost time:0.35697126388549805
test_pandarallel res:4338, cost time:0.43415403366088867
test_delayed res:4338, cost time:2.294757843017578
(1)如果数据量比较少,并行处理比单次执行效率更慢;
(2)如果apply的函数逻辑简单,并行处理比单次执行效率更慢。
关于怎么在pandas apply中对并行进行处理问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注亿速云行业资讯频道了解更多相关知识。
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。