1. Spark Data Skew 란?
Spark 클러스터에서, Data Skew 는 특정 키 또는 파티션에 데이터가 쏠려서 불균형이 일어나는 현상이다. 여기서 특정 키 (Key) 라는 의미는 주로 Join, GroupBy, Aggregation 같은 연산에서 특정 키에 과도한 데이터가 집중되는 것을 의미한다. 또한 파티션 (Partition) 이란, Spark 가 데이터를 나누어 저장하고 처리하는 최소 단위이다. Spark 는 각 파티션을 개별 태스크에서 처리하게 된다.
Data Skew가 발생하면 다음과 같은 문제가 발생할 수 있다.
- OOM (Out of Memory) : 특정 파티션에 과도하게 데이터가 몰리게 되면, 해당 파티션을 처리하는 태스크(Task) 가 많은 메모리를 소비하게 된다. Spark 는 기본적으로 JVM 메모리를 사용하여 연산을 수행하는데, Data Skew가 발생하게 되면 Spark 가 모든 메모리에 데이터를 유지하지 못하고 JVM Heap 메모리가 부족해져서, OOM 이 발생하여 Spark 어플리케이션이 비정상적으로 종료된다.
- 전체 작업 지연 : 특정 태스크 (Task)에 불균형한 데이터가 할당되어 다른 태스크보다 훨씬 비정상적으로 오래 실행되게 된다. Spark 는 기본적으로 모든 태스크가 끝나야 다음 스테이지로 넘어가게 된다. 따라서 이런 태스크가 병목이 되어 전체 작업이 지연되는 원인이 된다.
- Disk Spilling : Spark 는 기본적으로는 메모리 내에서 연산을 수행한다. 그렇지만, 메모리가 부족하면 데이터의 일부를 디스크에 저장하게 된다. 즉 데이터가 넘쳐서 메모리에서 디스크로 엎질러지게 (Spilling) 된다. 디스크에 데이터를 저장하게 되면 디스크에서 다시 읽어와야 하므로 IO 오버헤드가 증가하여 성능이 저하된다.
2. Data Skew 의 원인
- 데이터의 불균형한 분산 : 실제 real-world 에서는, 데이터가 항상 고르게 분포하지 않은 경우가 더 많다. 데이터가 편향된 외부 소스에서 유입되는 경우 이미 데이터가 불균형한 상태일 수 있다. 2.1 의 예시와 비슷한 경우이다.
- Join 연산 / GroupBy 연산 : Join 연산이 발생할 때, 특정 key 에 데이터가 집중되면, 해당 key를 처리하는 태스크에 과부하가 발생할 수 있다. 특히 Shuffle 이 발생하는 Join 연산에서 문제가 될 수 있으며, 반면 Broadcast Join 을 사용하는 경우에는 Data Skew 문제를 완화할 수 있다.
- 기본 파티셔닝 전략 문제 : Spark 는 기본적으로 Hash Partitioning 을 사용하는데, 일부 키가 해시 충돌을 일으키거나 데이터가 균등하지 않으면 특정 파티션이 커질 수 있다.
3. Data Skew 가 일어났는지 확인하기
3.1 특정 컬럼 (Key) 값의 데이터 개수 분포를 확인
from pyspark.sql.functions import count, col
# 샘플 데이터 생성
data = [
("A", 10), ("A", 20), ("A", 30), ("A", 40), ("A", 50), # 'A' 값이 많음 (Data Skew)
("B", 60), ("C", 70), ("D", 80), ("E", 90) # 다른 키들은 데이터가 적음
]
df = spark.createDataFrame(data, ["skewed_column", "value"])
# 특정 키별 데이터 개수 확인
df.groupBy("skewed_column").agg(count("*").alias("count")).orderBy(col("count").desc()).show()
A 라는 key 값이 다른 값들보다 많으므로 이런 경우 Data Skew 발생 가능성이 높은것으로 볼 수 있다.
+-------------+-----+
| skewed_column |count|
+-------------+-----+
| A | 5 |
| B | 1 |
| C | 1 |
| D | 1 |
| E | 1 |
+-------------+-----+
3.2 파티션 별 데이터 개수를 확인하기
df = df.repartition(4, "skewed_column") # 4개 파티션으로 분할
# 각 파티션의 데이터 개수 확인
df.rdd.mapPartitions(lambda partition: [len(list(partition))]).collect()
특정 파티션에만 데이터가 집중되어 있으므로, 과부하 발생 가능성이 있다.
[1, 1, 1, 5]
4. Data Skew 의 원인
- 데이터의 불균형한 분산 : 실제 real-world 에서는, 데이터가 항상 고르게 분포하지 않은 경우가 더 많다. 데이터가 편향된 외부 소스에서 유입되는 경우 이미 데이터가 불균형한 상태일 수 있다. 2.1 의 예시와 비슷한 경우이다.
- Join 연산 / GroupBy 연산 : Join 연산이 발생할 때, 특정 key 에 데이터가 집중되면, 해당 key를 처리하는 태스크에 과부하가 발생할 수 있다.
5. Data Skew 핸들링하기
5.1 AQE (Adaptive Query Execution) 의 동작
AQE 는 Adaptive Query Execution 으로, Spark 3.0 버전 이후부터 동적으로 최적화 작업을 해주는 프레임 워크이다. Spark 3.2부터는 SQE enabled 설정이 디폴트로 True가 되었다. AQE 는 shuffle 이 끝난 이후 partition 을 적절하게 병합해주는 기능을 한다.
spark.conf.set("spark.sql.adaptive.enabled", True)
AQE는 다음과 같은 기능을 제공한다.
- Skewed Join Optimization : 최적화된 Join 을 적용하는 기능이다. Join 연산에서 특정 키에 데이터가 집중되면, AQE가 자동으로 작은 파티션으로 분리하여 최적화한다.
- Dynamic Coalescing : Partition의 수를 줄여주는 기능이다. 너무 많은 Partition 은 많은 Task 를 필요하거나 I/O 를 발생시킨다. (1 Partition = 1 Task). Spark 연산 실행 중 작은 파티션을 합쳐서 병렬처리를 최적화한다.
5.2 Repartitioning
Spark 의 디폴트 파티셔닝 값에서 직접 repartitioning 을 통해서, 파티션을 조정할 수 있다. 비교적 고르게 분산된 numeric한 key 값 컬럼을 이용하여 repartitioning 하는 전략을 세울 수 있다.
df = df.repartition(100, "column_a")
지정한 key (column) 을 기준으로 데이터를 다시 파티셔닝해서 특정 노드에 부하가 집중되는 현상을 완화할 수 있다.
5.3 Join 시 salting 기법
Join 연산 시에 특정 key 에 데이터가 몰리는 경우, Salting 기법을 사용해서 데이터 분포를 인위적으로 균등하게 만드는 방법이다. 새로운 salt 작업용 컬럼을 추가하고, 균등하게 분포되도록 값을 지정해준다. 그리고 join 연산에 join 할 대상 컬럼과 salting 된 컬럼을 포함하여 조인한다.
from pyspark.sql.functions import monotonically_increasing_id
# Salting 기법 적용 - salt 라는 컬럼을 새로 추가한다.
# salt 컬럼에 들어가는 값은 골고루 분포하도록 인위적으로 조정한다.
def add_salt(df, column, salt_range=10):
return df.withColumn("salt", (monotonically_increasing_id() % salt_range))
fact_df = add_salt(fact_df, "join_key")
dim_df = add_salt(dim_df, "join_key")
# Salting된 컬럼을 포함하여 조인
joined_df = fact_df.join(dim_df, ["join_key", "salt"], "inner")
5.4 Join 시 Broadcasting Join 기법 사용하기
spark broadcasting join 을 사용하는 경우, 작은 테이블을 모든 spark worker node에 복제하게 된다. 따라서 shuffle 현상을 방지해서 data skew를 방지할 수 있다. 단, 대상 테이블이 작을 경우에 효과적이다.
from pyspark.sql.functions import broadcast
joined_df = fact_df.join(broadcast(dim_df), "join_key", "inner")
6. 결론
Data Skew 는 Spark 작업 성능을 저하시키는 중요한 문제 중 하나여서, 관련된 Spark 튜닝 기법에 대해 정리해보았다. Data Skew 현상을 잘 해결해야 Spark 분산처리의 이점을 극대화시킬 수 있다.
Reference
'Data Engineering > Apache Spark' 카테고리의 다른 글
Spark 성능 튜닝 기법 정리 (0) | 2024.07.07 |
---|---|
[Spark] Spark JDBC 연결시 발생하는 data skew 현상 해결하기 (0) | 2024.05.10 |
[Spark] Spark Streaming 과 Structure Streaming 비교하기 (0) | 2024.03.03 |
[Spark] Ch.15 클러스터에서 스파크 실행하기 (0) | 2024.01.04 |
[Spark] 스파크 완벽 가이드 Ch09. 데이터 소스 (0) | 2023.11.27 |