¸üÐÂʱ¼ä:2023Äê12ÔÂ06ÈÕ10ʱ08·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:

¡¡¡¡µ±SparkÓöµ½Êý¾ÝÇãбʱ£¬Õâ¿ÉÄܵ¼ÖÂ×÷ÒµÐÔÄÜϽµ¡£Êý¾ÝÇãбÊÇÖ¸Êý¾ÝÔÚ·ÖÇøÖзֲ¼²»¾ùÔÈ£¬µ¼Ö²¿·ÖÈÎÎñ´¦ÀíÁ˴󲿷ÖÊý¾Ý¶øÆäËûÈÎÎñ´¦ÀíÁ˺ÜÉÙµÄÊý¾Ý¡£ÒÔÏÂÊÇһЩ½â¾öÊý¾ÝÇãбµÄ·½·¨£º

¡¡¡¡Ê×ÏÈ£¬ÐèҪȷÈÏÊý¾ÝÇãбµÄÀ´Ô´¡£¿ÉÒÔͨ¹ýÒÔÏ·½Ê½½øÐÐÊý¾Ý̽²é£º
val df = spark.read.format("parquet").load("your_data_path")
df.groupBy("column_causing_skew").count().show()
¡¡¡¡Èç¹ûÊý¾ÝÇãбÊÇÓÉÓÚ·ÖÇø²»¾ùÔȵ¼Öµģ¬³¢ÊÔÔö¼Ó·ÖÇø¿ÉÒÔ»º½âÕâ¸öÎÊÌ⣺
val df = spark.read.format("parquet").option("basePath", "path_to_data").load("your_data_path")
val newDF = df.repartition(100, col("column_causing_skew"))
¡¡¡¡Í¨¹ýÔÚÁ¬½Ó¼üÖÐÌí¼ÓËæ»úǰ׺À´·ÖÉ¢Êý¾Ý£º
import org.apache.spark.sql.functions.{col, concat, lit}
val df1 = df.withColumn("random_prefix", (lit(Math.random()) * 10).cast("int"))
val df2 = df.withColumn("random_prefix", (lit(Math.random()) * 10).cast("int"))
val joinedDF = df1.join(df2, concat(df1("common_key"), df1("random_prefix")) === concat(df2("common_key"), df2("random_prefix")))
¡¡¡¡³¢ÊÔÔÚÁ¬½Ó֮ǰ½øÐоۺϲÙ×÷£¬ÒÔ¼õÉÙÒ»²àÊý¾ÝµÄ´óС£º
val aggregatedDF1 = df1.groupBy("common_key").agg(sum("value") as "agg_value")
val aggregatedDF2 = df2.groupBy("common_key").agg(sum("value") as "agg_value")
val joinedDF = aggregatedDF1.join(aggregatedDF2, "common_key")
¡¡¡¡Èç¹ûÆäÖÐÒ»¸öDataFrameºÜС£¬¿ÉÒÔ½«Æä¹ã²¥µ½ËùÓнڵãÉϱÜÃâÊý¾ÝÇãб£º
import org.apache.spark.sql.functions.broadcast val smallDF = // Ñ¡ÔñСµÄDataFrame val bigDF = // Ñ¡Ôñ´óµÄDataFrame val broadcastSmallDF = broadcast(smallDF) val joinedDF = bigDF.join(broadcastSmallDF, "common_key")
¡¡¡¡×Ô¶¨Òå·ÖÇø²ßÂÔ¿ÉÒÔ°ïÖúÊý¾Ý¸ü¾ùÔȵطֲ¼µ½²»Í¬µÄ·ÖÇø£º
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{row_number, col}
def customPartition(df: DataFrame, partitionColumn: String, numPartitions: Int): DataFrame = {
val windowSpec = Window.partitionBy(partitionColumn).orderBy(col("some_unique_column"))
val partitionedDF = df.withColumn("partition_id", row_number().over(windowSpec) % numPartitions)
partitionedDF
}
val partitionedDF = customPartition(df, "column_causing_skew", 100)
¡¡¡¡ÒÔÉÏ·½·¨ÖеÄÑ¡ÔñÈ¡¾öÓÚÊý¾ÝÇãбµÄ¾ßÌåÇé¿öºÍÊý¾ÝÌØµã¡£ÊÔÑ鲻ͬµÄ·½·¨£¬²¢¸ù¾Ýʵ¼ÊÇé¿öÑ¡Ôñ×îÊʺϵķ½·¨À´½â¾öSparkÖеÄÊý¾ÝÇãбÎÊÌâ¡£
±±¾©Ð£Çø