目录
一、Row对象常见操作
二、Row、RDD、DataFrame互相转换
1、RDD—>DataFrame
2、DataFrame—>RDD
3、DataFrame—>Row
4、Row—>DataFrame
一、Row对象常见操作
from pyspark.sql import Row# 创建一个Row对象
row = Row(name="张三", age=25)# 使用索引、字段名访问字段
print(row[0], row.name) # 修改Row对象(通过转换为字典的方式进行修改)
dict_ = row.asDict()
dict_['age'] = 26
del dict_['name']
dict_['姓名'] = "李四"
new_row = Row(**dict_)# 值迭代
for field in row:print(field)#判断是否包含某个字段
print("name" in row)# 获取字段数量
len(row)
二、Row、RDD、DataFrame互相转换
1、RDD—>DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import Row# 初始化SparkSession
spark = SparkSession.builder.appName("test").getOrCreate()# 创建一个RDD
rdd = sc.parallelize([("Alice", 25), ("Bob", 30)])# 将RDD的元素转换为Row对象
row_rdd = rdd.map(lambda x: Row(name=x[0], age=x[1]))# 将Row RDD转换为DataFrame
df = spark.createDataFrame(row_rdd)
df.show()
2、DataFrame—>RDD
# 从DataFrame获取RDD
rdd_from_df = df.rdd# 进一步将RDD的元素转换为元组或其他格式
rdd_as_tuples = rdd_from_df.map(lambda row: (row.name, row.age))
rdd_as_tuples.collect()
3、DataFrame—>Row
DataFrame的每一行都是一个Row对象。
# 迭代DataFrame获取Row
for row in df.collect():print(f"name:{row.name} age:{row.age}")# 以下都会生成Row对象
df.limit(1)
df.first
4、Row—>DataFrame
# Row对象列表
rows = [Row(name="Alice", age=25), Row(name="Bob", age=30)]# 创建DataFrame
df = spark.createDataFrame(rows)
df.show()