
Введение
Dask представляет собой коллекцию инструментов, которые позволяют использовать возможности параллельных вычислений, что особенно ценно при работе с объемными данными или при создании эффективных конвейеров на основе модулей scikit-learn с помощью параллельных процессов. В этой статье рассматривается, как применять Dask для масштабируемой обработки данных, даже если аппаратные ресурсы ограничены.
Пошаговое руководство
Хотя этот набор данных не является огромным, калифорнийский датасет по жилью имеет достаточный объем, чтобы послужить подходящим примером для демонстрации совместного использования Dask и scikit-learn в обработке данных в масштабе.
Модуль dataframe в Dask имитирует многие функции объектов DataFrame из Pandas, что помогает справляться с большими наборами данных, которые могут не помещаться полностью в оперативную память. Мы загрузим данные из CSV-файла в репозитории GitHub с помощью структуры Dask DataFrame следующим образом:
import dask.dataframe as dd
url = "https://raw.githubusercontent.com/gakudo-ai/open-datasets/refs/heads/main/housing.csv"
df = dd.read_csv(url)
df.head()
Важный момент: чтобы узнать размеры датасета — количество строк и столбцов — нельзя просто применить df.shape. Вместо этого нужно использовать подход вроде такого:
num_rows = df.shape[0].compute()
num_cols = df.shape[1]
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")Результат:
Number of rows: 20640
Number of columns: 10Здесь мы применили compute() из Dask для отложенного вычисления количества строк, но не для столбцов. Метаданные датасета позволяют сразу получить число столбцов (признаков), в то время как определение числа строк в потенциально распределенном датасете, который может превышать объем памяти и быть разделенным на части, требует распределенных вычислений. Метод compute() берет это на себя автоматически.
Предобработка данных обычно предшествует созданию модели машинного обучения или оценщика. Прежде чем перейти к этой части, поскольку основное внимание в статье уделяется демонстрации применения Dask для обработки данных, сначала подготовим и очистим набор.
Один из типичных этапов подготовки — работа с пропущенными значениями. В Dask это делается так же просто, как в Pandas. Например, следующий код удаляет строки с пропусками в любых атрибутах:
df = df.dropna()
num_rows = df.shape[0].compute()
num_cols = df.shape[1]
print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")Теперь датасет уменьшился более чем на 200 записей и содержит в общей сложности 20433 строки.
Далее можно нормализовать числовые признаки, используя StandardScaler из scikit-learn или другой подходящий метод масштабирования:
from sklearn.preprocessing import StandardScaler
numeric_df = df.select_dtypes(include=["number"])
X_pd = numeric_df.drop("median_house_value", axis=1).compute()
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_pd)Обратите внимание: для цепочки операций над датасетом в Dask, таких как удаление строк с пропусками и последующее исключение целевой колонки "median_house_value", необходимо добавить compute() в конце последовательности. Это связано с тем, что преобразования датасета в Dask выполняются отложенно. После вызова compute() результат цепочки материализуется в виде DataFrame из Pandas (Dask опирается на Pandas, поэтому импорт Pandas не требуется, если вы не используете исключительно его функции).
Если требуется обучить модель машинного обучения, то следует выделить целевую переменную "median_house_value" и применить тот же принцип для преобразования в объект Pandas:
y = df["median_house_value"]
y_pd = y.compute()Дальнейшие шаги — разделение датасета на обучающую и тестовую выборки, обучение регрессионной модели вроде RandomForestRegressor и оценка ошибки на тестовых данных — полностью аналогичны стандартному подходу с Pandas и scikit-learn. Поскольку модели на основе деревьев не чувствительны к масштабированию признаков, можно использовать как ненормализованные (X_pd), так и нормализованные (X_scaled) признаки. Ниже приведен пример с нормализованными:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import numpy as np
# Use the scaled feature matrix produced earlier
X_train, X_test, y_train, y_test = train_test_split(X_scaled, y_pd, test_size=0.2, random_state=42)
model = RandomForestRegressor(n_estimators=100, random_state=42, n_jobs=-1)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
print(f"RMSE: {rmse:.2f}")Результат:
RMSE: 49673.99Заключение
Комбинация Dask и scikit-learn позволяет реализовывать масштабируемые параллельные рабочие процессы для обработки данных, например, для эффективной предобработки больших наборов перед построением моделей машинного обучения. В статье показано, как загружать, очищать, подготавливать и преобразовывать данные с помощью Dask, а затем применять стандартные инструменты scikit-learn для моделирования — все это с оптимизацией использования памяти и ускорением конвейера при работе с обширными датасетами.