Data Engineering/Databricks & Delta Lake
[TIL] Delta Table에 upsert 하기
minjiwoo
2024. 2. 14. 23:35
728x90
https://docs.databricks.com/en/delta/merge.html#language-python
Databricks documentation
docs.databricks.com
데이터를 증분적재해야 하는 경우 merge() 혹은 MERGE INTO sql 문을 사용할 수 있다.
[Python Code Snippet]
(targetDF.alias("t") # DeltaTable 이 Target이 되어야 한다.
.merge(sourceDF.alias("s"), "s.key = t.key") # merge 조건을 정한다. Source는 DataFrame이다.
.whenMatchedUpdateAll() # key값이 동일한 경우 모두 변경 반영
.whenNotMatchedInsertAll() # key값이 없는 경우 모두 insert
.whenNotMatchedBySourceDelete() # source에 없는 경우 delete
.execute()
)
단, 여기서 targetDF는 DeltaTable 이어야하고, UPSERT 를 하고 싶은 데이터는 sourceDF로 DataFrame이어야 한다. 생각해 보면 작업하기 편하도록 merge 함수를 만들었다고 이해할 수 있다. 업데이트 및 적재의 타겟이 되는 것은 DeltaTable이 될 것이고, 새롭게 배치 작업을 통해 들어오게 되는 데이터는 DataFrame으로 읽어들여서 DeltaTable에 Upsert 연산을 할 것이기 때문이다.
동일한 작업에 대해 SQL 으로 작성하는 경우는 아래와 같다.
[SQL Code Snippet]
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE
728x90