目录
包的概览
加载和转换数据
在前文中,我们学习了如何为建模准备数据。在本文中,我们将实际使用这些知识,使用 PySpark 的 MLlib 包构建一个分类模型。
MLlib 代表机器学习库。尽管 MLlib 现在处于维护模式,即它不再积极开发(并且很可能会在未来被弃用),但至少覆盖库的一些特性是有必要的。此外,MLlib 目前是唯一支持流式训练模型的库。
在这一部分中,你将学习如何执行以下操作:
- 使用 MLlib 为建模准备数据
- 执行统计测试
- 使用逻辑回归预测婴儿的生存机会
- 选择最可预测的特征并训练一个随机森林模型
包的概览
在高层次上,MLlib 提供了三个核心的机器学习功能:
- 数据准备:特征提取、转换、选择、分类特征的哈希以及一些自然语言处理方法
- 机器学习算法:实现了一些流行和先进的回归、分类和聚类算法
- 实用工具:描述性统计、卡方测试、线性代数(稀疏和密集矩阵和向量)以及模型评估方法等统计方法
正如你看到的,可用功能的范围允许你执行几乎所有基本的数据科学任务。
我们将构建两个分类模型:线性回归和随机森林。我们将使用我们从 http://www.cdc.gov/nchs/data_access/vitalstatsonline.htm 下载的 2014 年和 2015 年美国出生数据的一部分;在总共 300 个变量中,我们选择了 85 个特征来构建我们的模型。此外,在总共近 799 万条记录中,我们选择了一个平衡的样本,共有 45,429 条记录:22,080 条报告婴儿死亡的记录和 23,349 条婴儿存活的记录。
加载和转换数据
尽管 MLlib 设计时以 RDD 和 DStreams 为重点,为了便于转换数据,我们将读取数据并将其转换为 DataFrame。
我们首先指定数据集的模式。
这是代码:
import pyspark.sql.types as typ
labels = [('INFANT_ALIVE_AT_REPORT', typ.StringType()),('BIRTH_YEAR', typ.IntegerType()),('BIRTH_MONTH', typ.IntegerType()),('BIRTH_PLACE', typ.StringType()),('MOTHER_AGE_YEARS', typ.IntegerType()),('MOTHER_RACE_6CODE', typ.StringType()),('MOTHER_EDUCATION', typ.StringType()),('FATHER_COMBINED_AGE', typ.IntegerType()),('FATHER_EDUCATION', typ.StringType()),('MONTH_PRECARE_RECODE', typ.StringType()),...('INFANT_BREASTFED', typ.StringType())
]
schema = typ.StructType([typ.StructField(e[0], e[1], False) for e in labels])
接下来,我们加载数据。.read.csv(...) 方法可以读取未压缩或(像我们的情况)GZipped 逗号分隔值。将 header 参数设置为 True 表示第一行包含标题,我们使用 schema 指定正确的数据类型:
births = spark.read.csv('births_train.csv.gz', header=True, schema=schema)
我们的数据集中有许多以字符串形式表示的特征。这些大多是我们需要以某种方式转换为数字形式的分类变量。
我们将首先指定我们的重新编码字典:
recode_dictionary = {'YNU': {'Y': 1,'N': 0,'U': 0}
}
我们这一章的目标是预测 'INFANT_ALIVE_AT_REPORT' 是否为 1 或 0。因此,我们将丢弃所有与婴儿相关的特征,并将仅基于与其母亲、父亲和出生地相关的特征来尝试预测婴儿的生存机会:
selected_features = ['INFANT_ALIVE_AT_REPORT', 'BIRTH_PLACE', 'MOTHER_AGE_YEARS', 'FATHER_COMBINED_AGE', 'CIG_BEFORE', 'CIG_1_TRI', 'CIG_2_TRI', 'CIG_3_TRI', 'MOTHER_HEIGHT_IN', 'MOTHER_PRE_WEIGHT', 'MOTHER_DELIVERY_WEIGHT', 'MOTHER_WEIGHT_GAIN', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM'
]
births_trimmed = births.select(selected_features)
在我们的数据集中,有许多特征具有是/否/未知的值;我们只会将“是”编码为 1;其他所有值将被设置为 0。
母亲的吸烟数量编码也有一个小问题:0 表示母亲在怀孕前或怀孕期间没有吸烟,1-97 表示实际吸烟的香烟数量,98 表示 98 或更多,而 99 标识未知;我们将假设未知为 0 并相应地重新编码。
接下来,我们将指定我们的重新编码方法:
import pyspark.sql.functions as func
def recode(col, key):return recode_dictionary[key][col]
def correct_cig(feat):return func \.when(func.col(feat) != 99, func.col(feat))\.otherwise(0)
rec_integer = func.udf(recode, typ.IntegerType())
重新编码方法查找 recode_dictionary 中的正确键(给定键)并返回更正后的值。correct_cig 方法检查特征 feat 的值是否不等于 99,并(在那种情况下)返回特征的值;如果值等于 99,我们得到 0,否则。
我们不能直接在 DataFrame 上使用重新编码函数;它需要被转换为 Spark 能理解的 UDF。rec_integer 就是这样一个函数:通过传递我们指定的 recode 函数并指定返回值数据类型,然后我们就可以使用它来编码我们的是/否/未知特征。
那么,让我们开始吧。首先,我们将更正与吸烟数量相关的特征:
births_transformed = births_trimmed \.withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\.withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\.withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\.withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))
.withColumn(...) 方法将列名作为其第一个参数,转换作为第二个参数。在前面的案例中,我们没有创建新列,而是重用了相同的列。
现在,我们将专注于更正是/否/未知特征。首先,我们将找出这些特征,如下所示:
cols = [(col.name, col.dataType) for col in births_trimmed.schema]
YNU_cols = []
for i, s in enumerate(cols):if s[1] == typ.StringType():dis = births.select(s[0]) \.distinct() \.rdd \.map(lambda row: row[0]) \.collect() if 'Y' in dis:YNU_cols.append(s[0])
首先,我们创建了一个包含列名和相应数据类型的元组列表(cols)。接下来,我们遍历所有这些并计算所有字符串列的不同值;如果返回的列表中有 'Y',我们将列名添加到 YNU_cols 列表中。
DataFrame 可以批量转换特征,同时选择特征。为了说明这个想法,考虑以下示例:
births.select(['INFANT_NICU_ADMISSION', rec_integer('INFANT_NICU_ADMISSION', func.lit('YNU')) \.alias('INFANT_NICU_ADMISSION_RECODE')]).take(5)
这是我们得到的返回结果:
我们选择 'INFANT_NICU_ADMISSION' 列,并将特征名称传递给 rec_integer 方法。我们还重命名新转换的列为 'INFANT_NICU_ADMISSION_RECODE'。这样,我们还将确认我们的 UDF 是否按预期工作。
所以,为了一次性转换所有的 YNU_cols,我们将创建这样的转换列表,如下所示:
exprs_YNU = [rec_integer(x, func.lit('YNU')).alias(x) if x in YNU_cols else x for x in births_transformed.columns
]
births_transformed = births_transformed.select(exprs_YNU)
让我们检查一下我们是否正确得到了它:
births_transformed.select(YNU_cols[-5:]).show(5)
这是我们得到的:
看起来一切都按照我们的预期工作,那么让我们更好地了解我们的数据。