ÀÖÓãµç¾º

½ÌÓýÐÐÒµA¹ÉIPOµÚÒ»¹É£¨¹ÉƱ´úÂë 003032£©

È«¹ú×Éѯ/ͶËßÈÈÏߣº400-618-4000

SparkÓöµ½Êý¾ÝÇãбÔõô°ì?

¸üÐÂʱ¼ä:2023Äê12ÔÂ06ÈÕ10ʱ08·Ö À´Ô´:ÀÖÓãµç¾º ä¯ÀÀ´ÎÊý:

ºÃ¿Ú±®ITÅàѵ

¡¡¡¡µ±SparkÓöµ½Êý¾ÝÇãбʱ£¬Õâ¿ÉÄܵ¼ÖÂ×÷ÒµÐÔÄÜϽµ¡£Êý¾ÝÇãбÊÇÖ¸Êý¾ÝÔÚ·ÖÇøÖзֲ¼²»¾ùÔÈ£¬µ¼Ö²¿·ÖÈÎÎñ´¦ÀíÁ˴󲿷ÖÊý¾Ý¶øÆäËûÈÎÎñ´¦ÀíÁ˺ÜÉÙµÄÊý¾Ý¡£ÒÔÏÂÊÇһЩ½â¾öÊý¾ÝÇãбµÄ·½·¨£º

sparkÓöµ½Êý¾ÝÇãбÔõô°ì

¡¡¡¡1. Êý¾Ý̽²é

¡¡¡¡Ê×ÏÈ£¬ÐèҪȷÈÏÊý¾ÝÇãбµÄÀ´Ô´¡£¿ÉÒÔͨ¹ýÒÔÏ·½Ê½½øÐÐÊý¾Ý̽²é£º

val df = spark.read.format("parquet").load("your_data_path")
df.groupBy("column_causing_skew").count().show()

¡¡¡¡2. Ôö¼Ó·ÖÇø

¡¡¡¡Èç¹ûÊý¾ÝÇãбÊÇÓÉÓÚ·ÖÇø²»¾ùÔȵ¼ÖµÄ£¬³¢ÊÔÔö¼Ó·ÖÇø¿ÉÒÔ»º½âÕâ¸öÎÊÌ⣺

val df = spark.read.format("parquet").option("basePath", "path_to_data").load("your_data_path")

val newDF = df.repartition(100, col("column_causing_skew"))

¡¡¡¡3. ʹÓÃËæ»úǰ׺

¡¡¡¡Í¨¹ýÔÚÁ¬½Ó¼üÖÐÌí¼ÓËæ»úǰ׺À´·ÖÉ¢Êý¾Ý£º

import org.apache.spark.sql.functions.{col, concat, lit}

val df1 = df.withColumn("random_prefix", (lit(Math.random()) * 10).cast("int"))
val df2 = df.withColumn("random_prefix", (lit(Math.random()) * 10).cast("int"))

val joinedDF = df1.join(df2, concat(df1("common_key"), df1("random_prefix")) === concat(df2("common_key"), df2("random_prefix")))

¡¡¡¡4. ¾ÛºÏÔÙÁ¬½Ó

¡¡¡¡³¢ÊÔÔÚÁ¬½Ó֮ǰ½øÐоۺϲÙ×÷£¬ÒÔ¼õÉÙÒ»²àÊý¾ÝµÄ´óС£º

val aggregatedDF1 = df1.groupBy("common_key").agg(sum("value") as "agg_value")
val aggregatedDF2 = df2.groupBy("common_key").agg(sum("value") as "agg_value")

val joinedDF = aggregatedDF1.join(aggregatedDF2, "common_key")

¡¡¡¡5. BroadcastС±í

¡¡¡¡Èç¹ûÆäÖÐÒ»¸öDataFrameºÜС£¬¿ÉÒÔ½«Æä¹ã²¥µ½ËùÓнڵãÉϱÜÃâÊý¾ÝÇãб£º

import org.apache.spark.sql.functions.broadcast

val smallDF = // Ñ¡ÔñСµÄDataFrame
val bigDF = // Ñ¡Ôñ´óµÄDataFrame

val broadcastSmallDF = broadcast(smallDF)
val joinedDF = bigDF.join(broadcastSmallDF, "common_key")

¡¡¡¡6. ×Ô¶¨Òå·ÖÇø

¡¡¡¡×Ô¶¨Òå·ÖÇø²ßÂÔ¿ÉÒÔ°ïÖúÊý¾Ý¸ü¾ùÔȵطֲ¼µ½²»Í¬µÄ·ÖÇø£º

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{row_number, col}

def customPartition(df: DataFrame, partitionColumn: String, numPartitions: Int): DataFrame = {
  val windowSpec = Window.partitionBy(partitionColumn).orderBy(col("some_unique_column"))
  val partitionedDF = df.withColumn("partition_id", row_number().over(windowSpec) % numPartitions)
  partitionedDF
}

val partitionedDF = customPartition(df, "column_causing_skew", 100)

¡¡¡¡ÒÔÉÏ·½·¨ÖеÄÑ¡ÔñÈ¡¾öÓÚÊý¾ÝÇãбµÄ¾ßÌåÇé¿öºÍÊý¾ÝÌØµã¡£ÊÔÑ鲻ͬµÄ·½·¨£¬²¢¸ù¾Ýʵ¼ÊÇé¿öÑ¡Ôñ×îÊʺϵķ½·¨À´½â¾öSparkÖеÄÊý¾ÝÇãбÎÊÌâ¡£

0 ·ÖÏíµ½£º
ºÍÎÒÃÇÔÚÏß½»Ì¸£¡
¡¾ÍøÕ¾µØÍ¼¡¿¡¾sitemap¡¿