亚洲激情专区-91九色丨porny丨老师-久久久久久久女国产乱让韩-国产精品午夜小视频观看

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么使用Apache Spark實現分布式隨機森林

發布時間:2021-12-27 14:09:11 來源:億速云 閱讀:203 作者:iii 欄目:大數據

本篇內容主要講解“怎么使用Apache Spark實現分布式隨機森林”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么使用Apache Spark實現分布式隨機森林”吧!

實驗概述

我們使用公共可用的紐約出租車數據集,并訓練一個隨機森林回歸器,該回歸器可以使用與乘客接送相關的屬性來預測出租車的票價金額。以2017年、2018年和2019年的出租車出行量為訓練集,共計300700143個實例。

硬件

Spark集群使用Amazon EMR進行管理,而Dask/RAPIDS集群則使用Saturn Cloud進行管理。

兩個集群都有20個工作節點,具有以下AWS實例類型:

Spark:r5.2xlarge

  • 8個CPU,64 GB RAM

  • 按需價格:0.504美元/小時

RAPIDS:g4dn.xlarge

  • 4個CPU,16 GB RAM

  • 1個GPU,16 GB GPU RAM(NVIDIA T4)

  • 按需價格:0.526美元/小時

Saturn Cloud也可以用NVIDIA特斯拉V100 GPU來啟動Dask集群,但我們在這個練習中選擇了g4dn.xlarge,保持與Spark集群相似的小時成本概況。

Spark

Apache Spark是一個在Scala中構建的開源大數據處理引擎,它有一個Python接口,可以調用Scala/JVM代碼。

它是Hadoop處理生態系統中的一個重要組成部分,圍繞MapReduce范例構建,并且具有用于數據幀和機器學習的接口。

設置Spark集群不在本文的討論范圍之內,但是一旦準備好集群,就可以在Jupyter Notebook中運行以下命令來初始化Spark:

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .config('spark.executor.memory', '36g')
        .getOrCreate())

findspark包檢測系統上的Spark安裝位置;如果可以知道Spark包的安裝位置,則可能不需要這樣做。

要獲得有性能的Spark代碼,需要設置幾個配置設置,這取決于集群設置和工作流。在這種情況下,我們設置spark.executor.memory以確保我們不會遇到任何內存溢出或Java堆錯誤。

RAPIDS

NVIDIA RAPIDS是一個開源的Python框架,它在gpu而不是cpu上執行數據科學代碼。類似于在訓練深度學習模型時所看到的,這將為數據科學工作帶來巨大的性能提升。

RAPIDS有數據幀、ML、圖形分析等接口。RAPIDS使用Dask來處理與具有多個gpu的機器的并行化,以及每個具有一個或多個gpu的機器集群。

設置GPU機器可能有點棘手,但是Saturn Cloud已經為啟動GPU集群預構建了映像,所以你只需幾分鐘就可以啟動并運行了!要初始化指向群集的Dask客戶端,可以運行以下命令:

from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster()
client = Client(cluster)

數據加載

數據文件托管在一個公共的S3 bucket上,因此我們可以直接從那里讀取csv。S3 bucket的所有文件都在同一個目錄中,所以我們使用s3fs來選擇我們想要的文件:

import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')
         if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]
         
cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance',
      'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount',
      'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

使用Spark,我們需要單獨讀取每個CSV文件,然后將它們組合在一起:

import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

# 手動指定模式,因為read.csv中的inferSchema非常慢
schema = StructType([
    StructField('VendorID', DoubleType()),
    StructField('tpep_pickup_datetime', TimestampType()),
    ...
    # 參考notebook獲得完整對象模式
]) 

def read_csv(path):
    df = spark.read.csv(path,
                        header=True,
                        schema=schema,
                        timestampFormat='yyyy-MM-dd HH:mm:ss',
                       )
    df = df.select(cols)
    return df

dfs = []
for tf in files:
    df = read_csv(tf)
    dfs.append(df)

taxi = functools.reduce(DataFrame.unionAll, dfs)
taxi.count()

使用Dask+RAPIDS,我們可以一次性讀取所有CSV文件:

import dask_cudf

taxi = dask_cudf.read_csv(files, 
                          assume_missing=True,
                          parse_dates=[1,2], 
                          usecols=cols, 
                          storage_options={'anon': True})
len(taxi)

特征工程

我們將根據時間生成一些特征,然后保存數據幀。在這兩個框架中,這將執行所有CSV加載和預處理,并將結果存儲在RAM中(在RAPIDS的情況下是GPU RAM)。我們將用于訓練的特征包括:

features = ['pickup_weekday', 'pickup_hour', 'pickup_minute',
            'pickup_week_hour', 'passenger_count', 'VendorID', 
            'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 
            'DOLocationID']

對于Spark,我們需要將特征收集到向量類中:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))

taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y', 1).otherwise(0))

taxi = taxi.withColumn('label', taxi.total_amount)  
taxi = taxi.fillna(-1)

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features',
)

pipeline = Pipeline(stages=[assembler])
assembler_fitted = pipeline.fit(taxi)
X = assembler_fitted.transform(taxi)
X.cache()
X.count()

對于RAPIDS,我們將所有浮點值轉換為float32,以便進行GPU計算:

from dask import persist
from dask.distributed import wait

taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
taxi = taxi.fillna(-1)

X = taxi[features].astype('float32')
y = taxi['total_amount']
X, y = persist(X, y)
_ = wait([X, y])
len(X)

訓練隨機森林

我們只需要幾行代碼就可以訓練隨機森林。

Spark:

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)
fitted = rf.fit(X)

RAPIDS:

from cuml.dask.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)

結果

我們對Spark(CPU)和RAPIDS(GPU)集群上的300700143個紐約出租車數據實例訓練了一個隨機森林模型。兩個集群都有20個工作節點,每小時價格大致相同。以下是工作流每個部分的結果:

TaskSparkRAPIDS
Load/rowcount20.6 seconds25.5 seconds
Feature engineering54.3 seconds23.1 seconds
Random forest36.9 minutes1.02 seconds

怎么使用Apache Spark實現分布式隨機森林

37分鐘的Spark 與1秒的RAPIDS

GPU勝利!想一想,一次擬合你不需要等待37分鐘了,這將加快之后迭代和改進模型的速度。而在CPU上,一旦添加了超參數調優或測試不同的模型,迭代都很容易累積到數小時或數天。

到此,相信大家對“怎么使用Apache Spark實現分布式隨機森林”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

华坪县| 开远市| 临桂县| 城市| 黎平县| 阳西县| 丽江市| 镇安县| 大关县| 漯河市| 阿克陶县| 安化县| 定兴县| 济宁市| 阿拉善右旗| 桂林市| 茂名市| 额敏县| 唐海县| 太谷县| 通辽市| 桃园市| 庄浪县| 潼南县| 田林县| 庆安县| 襄垣县| 丁青县| 余干县| 高邑县| 蒲江县| 长岭县| 邢台县| 公安县| 宁蒗| 仁化县| 武川县| 新泰市| 股票| 鹤山市| 惠来县|