Data Engineering/Apache Spark

[Spark] Spark Data Skew의 발생 원인과 해결방법

minjiwoo 2025. 3. 16. 11:02
728x90

1. Spark Data Skew 란?

Spark 클러스터에서, Data Skew 는 특정 키 또는 파티션에 데이터가 쏠려서 불균형이 일어나는 현상이다. 여기서 특정 키 (Key) 라는 의미는 주로 Join, GroupBy, Aggregation 같은 연산에서 특정 키에 과도한 데이터가 집중되는 것을 의미한다. 또한 파티션 (Partition) 이란, Spark 가 데이터를 나누어 저장하고 처리하는 최소 단위이다. Spark 는 각 파티션을 개별 태스크에서 처리하게 된다. 

Data Skew가 발생하면 다음과 같은 문제가 발생할 수 있다.

  • OOM (Out of Memory) : 특정 파티션에 과도하게 데이터가 몰리게 되면, 해당 파티션을 처리하는 태스크(Task) 가 많은 메모리를 소비하게 된다. Spark 는 기본적으로 JVM 메모리를 사용하여 연산을 수행하는데, Data Skew가 발생하게 되면 Spark 가 모든 메모리에 데이터를 유지하지 못하고 JVM Heap 메모리가 부족해져서, OOM 이 발생하여 Spark 어플리케이션이 비정상적으로 종료된다. 
  • 전체 작업 지연 : 특정 태스크 (Task)에 불균형한 데이터가 할당되어 다른 태스크보다 훨씬 비정상적으로 오래 실행되게 된다. Spark 는 기본적으로 모든 태스크가 끝나야 다음 스테이지로 넘어가게 된다. 따라서 이런 태스크가 병목이 되어 전체 작업이 지연되는 원인이 된다. 
  • Disk Spilling : Spark 는 기본적으로는 메모리 내에서 연산을 수행한다. 그렇지만, 메모리가 부족하면 데이터의 일부를 디스크에 저장하게 된다. 즉 데이터가 넘쳐서 메모리에서 디스크로 엎질러지게 (Spilling) 된다. 디스크에 데이터를 저장하게 되면 디스크에서 다시 읽어와야 하므로 IO 오버헤드가 증가하여 성능이 저하된다.

JVM 에서의 메모리 영역은 위와 같다. 메모리 영역이 제대로 관리되지 않는 경우, Spark 어플리케이션이 비정상 종료될 수 있다. 출처 : https://www.devkuma.com/docs/jvm/memory-structure/

2. Data Skew 의 원인 

  • 데이터의 불균형한 분산 : 실제 real-world 에서는, 데이터가 항상 고르게 분포하지 않은 경우가 더 많다. 데이터가 편향된 외부 소스에서 유입되는 경우 이미 데이터가 불균형한 상태일 수 있다. 2.1 의 예시와 비슷한 경우이다. 
  • Join 연산 / GroupBy 연산 : Join 연산이 발생할 때, 특정 key 에 데이터가 집중되면, 해당 key를 처리하는 태스크에 과부하가 발생할 수 있다. 특히 Shuffle 이 발생하는 Join 연산에서 문제가 될 수 있으며, 반면 Broadcast Join 을 사용하는 경우에는 Data Skew 문제를 완화할 수 있다. 
  • 기본 파티셔닝 전략 문제 : Spark 는 기본적으로 Hash Partitioning 을 사용하는데, 일부 키가 해시 충돌을 일으키거나 데이터가 균등하지 않으면 특정 파티션이 커질 수 있다. 

3. Data Skew 가 일어났는지 확인하기 

3.1 특정 컬럼 (Key) 값의 데이터 개수 분포를 확인

from pyspark.sql.functions import count, col


# 샘플 데이터 생성
data = [
    ("A", 10), ("A", 20), ("A", 30), ("A", 40), ("A", 50),  # 'A' 값이 많음 (Data Skew)
    ("B", 60), ("C", 70), ("D", 80), ("E", 90)  # 다른 키들은 데이터가 적음
]

df = spark.createDataFrame(data, ["skewed_column", "value"])

# 특정 키별 데이터 개수 확인
df.groupBy("skewed_column").agg(count("*").alias("count")).orderBy(col("count").desc()).show()

A 라는 key 값이 다른 값들보다 많으므로 이런 경우 Data Skew 발생 가능성이 높은것으로 볼 수 있다. 

+-------------+-----+
| skewed_column |count|
+-------------+-----+
|           A |   5 |
|           B |   1 |
|           C |   1 |
|           D |   1 |
|           E |   1 |
+-------------+-----+

3.2 파티션 별 데이터 개수를 확인하기 

df = df.repartition(4, "skewed_column")  # 4개 파티션으로 분할

# 각 파티션의 데이터 개수 확인
df.rdd.mapPartitions(lambda partition: [len(list(partition))]).collect()

특정 파티션에만 데이터가 집중되어 있으므로, 과부하 발생 가능성이 있다. 

[1, 1, 1, 5]

4. Data Skew 의 원인 

  • 데이터의 불균형한 분산 : 실제 real-world 에서는, 데이터가 항상 고르게 분포하지 않은 경우가 더 많다. 데이터가 편향된 외부 소스에서 유입되는 경우 이미 데이터가 불균형한 상태일 수 있다. 2.1 의 예시와 비슷한 경우이다. 
  • Join 연산 / GroupBy 연산 : Join 연산이 발생할 때, 특정 key 에 데이터가 집중되면, 해당 key를 처리하는 태스크에 과부하가 발생할 수 있다. 

5. Data Skew 핸들링하기 

5.1 AQE (Adaptive Query Execution) 의 동작

AQE 는 Adaptive Query Execution 으로, Spark 3.0 버전 이후부터 동적으로 최적화 작업을 해주는 프레임 워크이다. Spark 3.2부터는 SQE enabled 설정이 디폴트로 True가 되었다. AQE 는 shuffle 이 끝난 이후 partition 을 적절하게 병합해주는 기능을 한다. 

spark.conf.set("spark.sql.adaptive.enabled", True)

AQE는 다음과 같은 기능을 제공한다. 

  • Skewed Join Optimization : 최적화된 Join 을 적용하는 기능이다. Join 연산에서 특정 키에 데이터가 집중되면, AQE가 자동으로 작은 파티션으로 분리하여 최적화한다. 
  • Dynamic Coalescing : Partition의 수를 줄여주는 기능이다. 너무 많은 Partition 은 많은 Task 를 필요하거나 I/O 를 발생시킨다. (1 Partition = 1 Task). Spark 연산 실행 중 작은 파티션을 합쳐서 병렬처리를 최적화한다. 

5.2 Repartitioning 

Spark 의 디폴트 파티셔닝 값에서 직접 repartitioning 을 통해서, 파티션을 조정할 수 있다. 비교적 고르게 분산된 numeric한 key 값 컬럼을 이용하여 repartitioning 하는 전략을 세울 수 있다. 

df = df.repartition(100, "column_a")

지정한 key (column) 을 기준으로 데이터를 다시 파티셔닝해서 특정 노드에 부하가 집중되는 현상을 완화할 수 있다. 

5.3 Join 시 salting 기법 

Join 연산 시에 특정 key 에 데이터가 몰리는 경우, Salting 기법을 사용해서 데이터 분포를 인위적으로 균등하게 만드는 방법이다. 새로운 salt 작업용 컬럼을 추가하고, 균등하게 분포되도록 값을 지정해준다. 그리고 join 연산에 join 할 대상 컬럼과 salting 된 컬럼을 포함하여 조인한다. 

from pyspark.sql.functions import monotonically_increasing_id

# Salting 기법 적용 - salt 라는 컬럼을 새로 추가한다. 
# salt 컬럼에 들어가는 값은 골고루 분포하도록 인위적으로 조정한다. 
def add_salt(df, column, salt_range=10):
    return df.withColumn("salt", (monotonically_increasing_id() % salt_range))

fact_df = add_salt(fact_df, "join_key")
dim_df = add_salt(dim_df, "join_key")

# Salting된 컬럼을 포함하여 조인
joined_df = fact_df.join(dim_df, ["join_key", "salt"], "inner")

5.4 Join 시 Broadcasting Join 기법 사용하기 

spark broadcasting join 을 사용하는 경우, 작은 테이블을 모든 spark worker node에 복제하게 된다. 따라서 shuffle 현상을 방지해서 data skew를 방지할 수 있다. 단, 대상 테이블이 작을 경우에 효과적이다. 

from pyspark.sql.functions import broadcast

joined_df = fact_df.join(broadcast(dim_df), "join_key", "inner")

broadcast join

6. 결론 

Data Skew 는 Spark 작업 성능을 저하시키는 중요한 문제 중 하나여서, 관련된 Spark 튜닝 기법에 대해 정리해보았다. Data Skew 현상을 잘 해결해야 Spark 분산처리의 이점을 극대화시킬 수 있다. 

 

Reference 

728x90