这篇文章主要讲解了“怎么用Mars Remote API执行Python函数”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用Mars Remote API执行Python函数”吧!
Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。
启动 Mars 分布式环境可以参考:
命令行方式在集群中部署。
Kubernetes 中部署。
MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。
使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。
采用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。
from typing import List import numpy as np def calc_chunk(n: int, i: int): # 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数 rs = np.random.RandomState(i) a = rs.uniform(-1, 1, size=(n, 2)) d = np.linalg.norm(a, axis=1) return (d < 1).sum() def calc_pi(fs: List[int], N: int): # 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值 return sum(fs) * 4 / N N = 200_000_000 n = 10_000_000 fs = [calc_chunk(n, i) for i in range(N // n)] pi = calc_pi(fs, N) print(pi)
%%time 下可以看到结果:
3.1416312 CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s Wall time: 12.3 s
在单机需要 12.3 s。
要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。
import mars.remote as mr # 函数调用改成 mars.remote.spawn fs = [mr.spawn(calc_chunk, args=(n, i)) for i in range(N // n)] # 把 spawn 的列表传入作为参数,再 spawn 新的函数 pi = mr.spawn(calc_pi, args=(fs, N)) # 通过 execute() 触发执行,fetch() 获取结果 print(pi.execute().fetch())
%%time 下看到结果:
3.1416312 CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms Wall time: 2.85 s
结果一模一样,但是却有数倍的性能提升。
可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。
为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。
困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。
准备数据
这个例子里我们使用 otto 数据集。
首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。
import pandas as pd from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import train_test_split def gen_data(): df = pd.read_csv('otto/train.csv') X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123) X_train, X_test, y_train, y_test = gen_data()
模型
接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。
RandomForest:
from sklearn.ensemble import RandomForestClassifier def random_forest(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = RandomForestClassifier(verbose=verbose, **kw) model.fit(X_train, y_train) return model
接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。
def gen_random_forest_parameters(): for n_estimators in [50, 100, 600]: for max_depth in [None, 3, 15]: for criterion in ['gini', 'entropy']: yield { 'n_estimators': n_estimators, 'max_depth': max_depth, 'criterion': criterion }
LogisticRegression 也是这个过程。我们先定义模型。
from sklearn.linear_model import LogisticRegression def logistic_regression(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = LogisticRegression(verbose=verbose, **kw) model.fit(X_train, y_train) return model
接着生成供 LogisticRegression 使用的超参。
def gen_lr_parameters(): for penalty in ['l2', 'none']: for tol in [0.1, 0.01, 1e-4]: yield { 'penalty': penalty, 'tol': tol }
XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。
from xgboost import XGBClassifier def xgb(X_train: pd.DataFrame, y_train: pd.Series, verbose: bool = False, **kw): model = XGBClassifier(verbosity=int(verbose), **kw) model.fit(X_train, y_train) return model
生成一系列超参。
def gen_xgb_parameters(): for n_estimators in [100, 600]: for criterion in ['gini', 'entropy']: for learning_rate in [0.001, 0.1, 0.5]: yield { 'n_estimators': n_estimators, 'criterion': criterion, 'learning_rate': learning_rate }
验证
接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。
from sklearn.metrics import log_loss def metric_model(model, X_test: pd.DataFrame, y_test: pd.Series) -> float: if isinstance(model, bytes): model = pickle.loads(model) y_pred = model.predict_proba(X_test) return log_loss(y_test, y_pred) def train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): # 把训练和验证封装到一起 model = train_func(X_train, y_train, verbose=verbose, **train_params) metric = metric_model(model, X_test, y_test) return model, metric
做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。
results = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(random_forest, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(logistic_regression, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric}) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): print(f'calculating on {params}') # fixed random_state params['random_state'] = 123 # use all CPU cores params['n_jobs'] = -1 model, metric = train_and_metric(xgb, params, X_train, y_train, X_test, y_test) print(f'metric: {metric}') results.append({'model': model, 'metric': metric})
运行一下,需要相当长时间,我们省略掉一部分输出内容。
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'} metric: 0.6964123781828575 calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'} metric: 0.6912312790832288 # 省略其他模型的输出结果 CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s Wall time: 31min 44s
从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。
现在我们尝试使用 Remote API 通过分布式方式加速整个过程。
集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。
n_cores = 8 mem = 2 * n_cores # 16G # o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')
为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。
if not o.exist_resource('otto_train.csv'): with open('otto/train.csv') as f: # 上传资源 o.create_resource('otto_train.csv', 'file', fileobj=f) def gen_data(): # 改成从资源读取 df = pd.read_csv(o.open_resource('otto_train.csv')) X = df.drop(['target', 'id'], axis=1) y = df['target'] label_encoder = LabelEncoder() label_encoder.fit(y) y = label_encoder.transform(y) return train_test_split(X, y, test_size=0.33, random_state=123)
稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。
import mars.remote as mr # n_output 说明是 4 输出 # execute() 执行后,数据会读取到 Mars 集群内部 data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute() # remote_ 开头的都是 Mars 对象,这时候数据在集群内,这些对象只是引用 remote_X_train, remote_X_test, remote_y_train, remote_y_test = data
目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。
def distributed_train_and_metric(train_func, train_params: dict, X_train: pd.DataFrame, y_train: pd.Series, X_test: pd.DataFrame, y_test: pd.Series, verbose: bool = False ): model, metric = train_and_metric(train_func, train_params, X_train, y_train, X_test, y_test, verbose=verbose) return pickle.dumps(model), metric
后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。
接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。
import numpy as np tasks = [] models = [] metrics = [] # ------------- # Random Forest # ------------- for params in gen_random_forest_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(random_forest, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # ------------------- # Logistic Regression # ------------------- for params in gen_lr_parameters(): # fixed random_state params['random_state'] = 123 task = mr.spawn(distributed_train_and_metric, args=(logistic_regression, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # ------- # XGBoost # ------- for params in gen_xgb_parameters(): # fixed random_state params['random_state'] = 123 # 再指定并发为核的个数 params['n_jobs'] = n_cores task = mr.spawn(distributed_train_and_metric, args=(xgb, params, remote_X_train, remote_y_train, remote_X_test, remote_y_test), kwargs={'verbose': 2}, n_output=2 ) tasks.extend(task) # 把模型和评价分别存储 models.append(task[0]) metrics.append(task[1]) # 把顺序打乱,目的是能分散到 worker 上平均一点 shuffled_tasks = np.random.permutation(tasks) _ = mr.ExecutableTuple(shuffled_tasks).execute()
可以看到代码几乎一致。
运行查看结果:
CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms Wall time: 1min 59s
时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。
细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。
以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。
print(models[0].fetch_log())
输出我们简略一部分。
building tree 1 of 50 building tree 2 of 50 building tree 3 of 50 building tree 4 of 50 building tree 5 of 50 building tree 6 of 50 # 中间省略 building tree 49 of 50 building tree 50 of 50
要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。
感谢各位的阅读,以上就是“怎么用Mars Remote API执行Python函数”的内容了,经过本文的学习后,相信大家对怎么用Mars Remote API执行Python函数这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。