作者:wedo实验君 2021-09-09 15:45:17
人工智能
机器学习
分布式 Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架。 

创新互联建站专注于企业成都营销网站建设、网站重做改版、鹿城网站定制设计、自适应品牌网站建设、H5高端网站建设、商城网站建设、集团公司官网建设、外贸营销网站建设、高端网站制作、响应式网页设计等建站业务,价格优惠性价比高,为鹿城等各大城市提供网站开发制作服务。
[[422916]]
Python中文社区 (ID:python-china)
分布式计算框架大家一定都耳熟能详,诸如离线计算的Hadoop(map-reduce),spark, 流式计算的strom,Flink等。相对而言,这些计算框架都依赖于其他大数据组件,安装部署也相对复杂。
在python中,之前有分享过的Celery可以提供分布式的计算。今天和大家分享另外一个开源的分布式计算框架Ray。Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,具有比Spark更优异的计算性能,而且部署和改造更简单,同时支持机器学习和深度学习的分布式训练,支持主流的深度学习框架(pytorch,tensorflow,keras等)。
Ray的架构参见最早发布的论文Ray: A Distributed Framework for Emerging AI Applications
由上图可知Ray主要包括:
Ray适用于任何分布式计算的任务,包括分布式训练。笔者最近是用在大量的时间序列预测模型训练和在线预测上。
Ray目前库支持超参数调优Ray tune, 梯度下降Ray SGD,推理服务RaySERVE, 分布式数据Dataset以及分布式增强学习RLlib。还有其他第三方库,如下所示:
3.1 安装部署
- pip install --upgrade pip
 - # pip install ray
 - pip install ray == 1.6.0
 - # ImportError: cannot import name 'deep_mapping' from 'attr.validators'
 - # pip install attr == 19.1.0
 
3.2 单机使用
- import time
 - import ray
 - ray.init(num_cpus = 4) # Specify this system has 4 CPUs.
 - @ray.remote
 - def do_some_work(x):
 - time.sleep(1) # Replace this is with work you need to do.
 - return x
 - start = time.time()
 - results = ray.get([do_some_work.remote(x) for x in range(4)])
 - print("duration =", time.time() - start)
 - print("results = ", results)
 - # duration = 1.0107324123382568
 - # results = [0, 1, 2, 3]
 
remote返回的对象的id 如ObjectRef(7f10737098927148ffffffff0100000001000000)。需要通过ray.get来获取实际的值, 需要注意的是ray.get是阻塞式的调用,不能[ray.get(do_some_work.remote(x)) for x in range(4)]
- @ray.remote
 - def tiny_work(x):
 - time.sleep(0.0001) # Replace this is with work you need to do.
 - return x
 - start = time.time()
 - result_ids = [tiny_work.remote(x) for x in range(100000)]
 - results = ray.get(result_ids)
 - print("duration =", time.time() - start)
 
- num = ray.put(10)
 - ray.get(num)
 
这个时候可以采用ray.wait()方法,ray.wait()返回执行完毕的和未执行完毕的任务结果,执行完成的结果可以继续后续的操作
- import random
 - @ray.remote
 - def do_some_work(x):
 - time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
 - return
 - def process_incremental(sum, result):
 - time.sleep(1) # Replace this with some processing code.
 - return sum + result
 - start = time.time()
 - result_ids = [do_some_work.remote(x) for x in range(4)]
 - sum = 0
 - while len(result_ids):
 - done_id, result_ids = ray.wait(result_ids)
 - sum = process_incremental(sum, ray.get(done_id[0]))
 - print("duration =", time.time() - start, "\nresult = ", sum)
 - # duration = 5.270821809768677
 - # result = 6
 
2.3 集群部署
Ray的架构遵循master-slave的模式。Head Node 可以认为是Master,其他的Node为worker。在集群部署时,Head Node需要首先启动ray start --head, 其他机器依次启动worker,注意需要指定head Node的地址确定关系,ray start --address 10.8.xx.3:6379。
关闭服务,需要每一台机器执行 ray.stop
- # To start a head node.
 - #ray start --head --num-cpus=
 --num-gpus= - ray start --head --node-ip-address 10.8.xx.3 --port=6379
 - # To start a non-head node.
 - # ray start --address= --num-cpus=
 --num-gpus= - ray start --address 10.8.xx.3:6379 --node-ip-address 10.8.xx.3 --num-cpus 10 --temp-dir={your temp path}
 
- import ray
 - ray.init(10.8.xx.3:6379)
 
- import numpy as np
 - # Define two remote functions. Invocations of these functions create tasks
 - # that are executed remotely.
 - @ray.remote
 - def multiply(x, y):
 - return np.dot(x, y)
 - @ray.remote
 - def zeros(size):
 - return np.zeros(size)
 - # Start two tasks in parallel. These immediately return futures and the
 - # tasks are executed in the background.
 - x_id = zeros.remote((100, 100))
 - y_id = zeros.remote((100, 100))
 - # Start a third task. This will not be scheduled until the first two
 - # tasks have completed.
 - z_id = multiply.remote(x_id, y_id)
 - # Get the result. This will block until the third task completes.
 - z = ray.get(z_id)
 - print(z)
 
- @ray.remote
 - class Counter(object):
 - def __init__(self):
 - self.n = 0
 - def increment(self):
 - self.n += 1
 - def read(self):
 - return self.n
 - counters = [Counter.remote() for i in range(4)]
 - # 不断的执行可以每个counter计数不断增加
 - [c.increment.remote() for c in counters]
 - futures = [c.read.remote() for c in counters]
 - print(ray.get(futures))
 - # [1, 1, 1, 1]
 - # [11, 11, 11, 11]
 
这里举一个简单的例子:
- @ray.remote
 - def map(obj, f):
 - return f(obj)
 - @ray.remote
 - def sum_results(*elements):
 - return np.sum(elements)
 - items = list(range(100))
 - map_func = lambda i : i*2
 - remote_elements = [map.remote(i, map_func) for i in items]
 - # simple reduce
 - remote_final_sum = sum_results.remote(*remote_elements)
 - result = ray.get(remote_final_sum)
 - # tree reduce
 - intermediate_results = [sum_results.remote(
 - *remote_elements[i * 20: (i + 1) * 20]) for i in range(5)]
 - remote_final_sum = sum_results.remote(*intermediate_results)
 - result = ray.get(remote_final_sum)
 
参见 https://docs.ray.io/en/latest/using-ray-with-pytorch.html
本文分享了高效的Python分布式计算框架Ray,希望对你有帮助。总结如下:
                网页题目:机器学习分布式框架Ray
                
                标题链接:http://www.csdahua.cn/qtweb/news24/168674.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网