Data Engineering/Databricks & Delta Lake

[TIL] Delta Table에 upsert 하기

minjiwoo 2024. 2. 14. 23:35
728x90

https://docs.databricks.com/en/delta/merge.html#language-python

 

Databricks documentation

 

docs.databricks.com

 

데이터를 증분적재해야 하는 경우 merge() 혹은 MERGE INTO sql 문을 사용할 수 있다. 

[Python Code Snippet]

(targetDF.alias("t") # DeltaTable 이 Target이 되어야 한다. 
  .merge(sourceDF.alias("s"), "s.key = t.key") # merge 조건을 정한다. Source는 DataFrame이다. 
  .whenMatchedUpdateAll() # key값이 동일한 경우 모두 변경 반영 
  .whenNotMatchedInsertAll() # key값이 없는 경우 모두 insert 
  .whenNotMatchedBySourceDelete() # source에 없는 경우 delete
  .execute()
)

단, 여기서 targetDF는 DeltaTable 이어야하고, UPSERT 를 하고 싶은 데이터는 sourceDF로 DataFrame이어야 한다. 생각해 보면 작업하기 편하도록 merge 함수를 만들었다고 이해할 수 있다. 업데이트 및 적재의 타겟이 되는 것은 DeltaTable이 될 것이고, 새롭게 배치 작업을 통해 들어오게 되는 데이터는 DataFrame으로 읽어들여서 DeltaTable에 Upsert 연산을 할 것이기 때문이다. 

동일한 작업에 대해 SQL 으로 작성하는 경우는 아래와 같다. 

[SQL Code Snippet]

MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE
728x90