在 Python 中使用 Spark,主要通过PySpark库来实现。它是 Apache Spark 的 Python API,语法风格与 Pandas 有相似之处,但核心思想是分布式计算和惰性求值。
以下是 PySpark 核心语法的系统性介绍(基于目前主流的 Spark 3.x / 4.x 版本):
1. 入口:SparkSession
所有 PySpark 程序的起点,替代了旧版的SparkContext。
frompyspark.sqlimportSparkSession spark=(SparkSession.builder.appName("MyApp").master("local[*]")# 本地测试用,生产环境由集群管理器指定.config("spark.some.config","value").getOrCreate())2. DataFrame 创建
DataFrame 是 PySpark 最核心的数据结构,等价于分布式表。
| 方式 | 代码示例 |
|---|---|
| 从文件读取 | spark.read.parquet("/path")/.csv()/.json()/.table("db.tbl") |
| 从列表创建 | spark.createDataFrame([(1,"a"),(2,"b")], ["id","name"]) |
| 从 Pandas 转换 | spark.createDataFrame(pandas_df) |
| SQL 查询创建 | spark.sql("SELECT * FROM table WHERE dt='2026-06-30'") |
3. 核心变换语法 (Transformations)
⚠️关键概念:惰性求值。以下操作不会立即执行,只是记录逻辑计划,直到遇到 Action 操作才真正触发计算。
3.1 列操作 (pyspark.sql.functions)
这是 PySpark 最常用的模块,通常简写为F:
frompyspark.sqlimportfunctionsasF df.select(F.col("name"),# 引用列F.lit(2026).alias("year"),# 常量列F.concat_ws("-",F.col("y"),F.col("m")),# 字符串拼接F.when(F.col("age")>18,"adult").otherwise("minor"),# 条件判断F.date_add(F.current_date(),-7),# 日期函数F.explode(F.col("tags"))# 数组展开)3.2 常用 DataFrame 方法
# 过滤df.filter(F.col("amount")>100)df.where("region = 'cn' AND dt = '2026-06-30'")# 也支持SQL表达式字符串# 聚合df.groupBy("region").agg(F.sum("amount").alias("total"),F.countDistinct("user_id").alias("uv"))# 关联df1.join(df2,on="user_id",how="left")df1.join(df2,on=(df1.id==df2.uid)&(df1.dt==df2.dt),how="inner")# 窗口函数frompyspark.sql.windowimportWindow w=Window.partitionBy("dept").orderBy(F.desc("salary"))df.withColumn("rank",F.row_number().over(w))# 其他df.dropDuplicates(["user_id"])# 去重df.na.fill(0,subset=["amount"])# 空值填充df.repartition(10,"dt")# 重分区(影响Shuffle)df.cache()/df.persist()# 缓存到内存/磁盘4. 动作语法 (Actions)
触发实际计算并返回结果到 Driver 端:
df.show(20,truncate=False)# 打印前N行df.count()# 计数df.collect()# ⚠️ 拉取全部数据到Driver,大表慎用!df.toPandas()# ⚠️ 转为Pandas DataFrame,同样慎用df.write.parquet("/output")# 写出文件df.createOrReplaceTempView("tmp")# 注册临时视图供SQL使用5. Spark SQL 语法
如果不习惯 DataFrame API,可以直接写 SQL:
df.createOrReplaceTempView("sales")result=spark.sql(""" SELECT region, SUM(amount) as total FROM sales WHERE dt = '2026-06-30' GROUP BY region HAVING total > 10000 ORDER BY total DESC """)6. UDF(用户自定义函数)
当内置函数无法满足需求时使用,但性能较差(Python 与 JVM 间有序列化开销),应优先使用内置函数或 Pandas UDF。
# 普通 UDF(逐行处理,慢)@F.udf("string")defmask_phone(phone):returnphone[:3]+"****"+phone[7:]ifphoneelseNone# Pandas UDF / Vectorized UDF(向量化,快10-100倍)✅ 推荐@F.pandas_udf("double")defnormalize(s:pd.Series)->pd.Series:return(s-s.mean())/s.std()7. PySpark vs Pandas 语法速查对照
| 操作 | Pandas | PySpark |
|---|---|---|
| 选列 | df[["a","b"]] | df.select("a","b") |
| 过滤 | df[df.age>18] | df.filter(F.col("age")>18) |
| 新增列 | df["new"] = ... | df.withColumn("new", ...) |
| 重命名 | df.rename(columns={}) | df.withColumnRenamed("old","new") |
| 分组聚合 | df.groupby().agg() | df.groupBy().agg() |
| 排序 | df.sort_values() | df.orderBy()/df.sort() |
| 采样 | df.sample(frac=0.1) | df.sample(fraction=0.1) |
💡 最佳实践与避坑指南
- 避免
collect()/toPandas():除非确认数据量很小(< 几GB),否则会导致 Driver OOM。调试时用show()或limit(100).toPandas()。 - 优先用内置函数:
pyspark.sql.functions里的函数在 JVM 端执行,比 Python UDF 快几个数量级。 - 注意 Shuffle:
join、groupBy、repartition、distinct都会触发 Shuffle,是性能瓶颈。尽量用广播 Join(F.broadcast(small_df))减少 Shuffle。 - 合理设置分区数:分区太少导致并行度不足,太多导致任务调度开销大。一般每个分区 128MB~256MB 为宜。
- 类型安全:PySpark 是强类型的,字符串
"123"和整数123不能直接比较/关联,需用F.col().cast()显式转换。 - Spark Connect (Spark 4.x 新特性):如果你使用的是较新版本,推荐使用 Spark Connect 客户端模式,将 Driver 与 Cluster 解耦,支持更好的 IDE 补全和远程开发体验。