
Intro
현재 글또 9기에서 만난 팀원들과 같이 Resumait 라는 제품을 만들고 있다. CS 스터디를 진행하다가 뜻이 맞아서 사이드 프로젝트까지 개발을 하게 되었다. Resumait 은 LLM 을 기반으로, 사용자의 이력서 작성을 도와주는 서비스이다. Resumait는 특히 주니어 및 경력 개발자를 주요 타겟으로하고 있다. 개발자 채용 시장 특성상 하나의 이력서를 기반으로 여러 회사의 공고에 지원하게 되기 때문이다. 뿐만 아니라, 종종 개발자에게도 자기소개서를 요구하는 경우가 많다.
Resumait 에서는 Credit 이 서비스를 사용할 수 있는 단위이다. 사용자는 본인의 이력서를 기반으로 자기소개서 글을 작성할 때 Credit 을 소비하게 된다. 그런데 유의할 점은 이 Credit은 사용자가 Resumait에서 서비스를 사용할 수 있는 할당량과도 같기 때문에,
Credit 을 사용하거나 충전하는 과정에서 Credit 값에 항상 문제가 없어야 한다. 예를 들어서, Credit 을 사용하는 과정에서 API 통신에 문제가 생겨 두번 사용되게 하면 문제가 발생한다. 즉, Credit 을 사용한 결제 API 에서는 '멱등성'을 보장해야 한다. 멱등성을 어떻게 보장하며, 멱등성이라는 개념이 정확하게 무엇인지 정리해보았다!
멱등성(Idempotency)이란?
멱등성(Idempotency)이란, 연산을 여러번 실행하더라도 결과가 달라지지 않는 성질을 의미한다. 멱등성은 개발할 때에도 종종 등장하게 되는 개념인데, 개념 자체는 동일하지만 멱등성을 보장해야 하는 다양한 케이스가 있다.
1. HTTPS 메서드에서의 멱등성
흔히 멱등성이라는 단어를 HTTP 통신에서 많이 듣게 된다. HTTP 메서드에서 멱등성을 보장하는 메서드도 있고, 그렇지 않은 메서드도 있다. 예를 들어서 GET 메서드의 경우에는 리소스를 조회하기만 한다. 따라서 여러번 GET 요청을 보내더라도, 동일한 값을 리턴값으로 받게 될 것이다. 이러한 경우 GET메서드는 멱등성을 보장한다 라고 할 수 있다.
HTTP 메서드 종류 | 멱등성 유무 |
GET | O |
POST | X |
PUT | O |
PATCH | X |
DELETE | O |
GET 메서드야 당연히 값에 대해 '조회' 만하니까 멱등성을 보장한다는 것이 쉽게 이해되는데 다른 메소드의 경우 헷갈리기 도한다. PUT의 경우 값에 대해 업데이트하는 메서드이다. 그렇지만 여러번 호출하게 되어도 항상 같은 값에 대해서 업데이트 하기 때문에, 이 경우에도 멱등성을 보장한다고 할 수 있다.
DELETE의 경우에도 값을 삭제한 후, 다시 요청을 보내더라도 값이 이미 삭제된 상태는 동일할 것이다. 따라서 DELETE도 멱등하다.
2. ETL 파이프라인에서의 멱등성
데이터 ETL 파이프라인을 구축할 때에도 멱등성은 중요하다. 내가 데이터엔지니어링 업무를 하며 경험한 ETL 파이프라인의 성격은 크게 2가지였다.
- 마스터 성 테이블에 대한 ETL 작업: 일정한 주기 (ex. 일 단위) 마다 테이블의 전체 데이터에 대해서 모든 값이 update 하는 ETL 작업
- 증분 적재가 필요한 테이블에 대한 ETL 작업: 일정한 주기마다 테이블에 데이터를 누적하여 쌓아가는 ETL 작업
특히, 2번의 증분적재가 필요한 테이블인 경우에 table row의 식별값이 되는 key 컬럼들을 기준으로 데이터가 증분적재 될 것이다. 그런데 파이프라인에 장애가 일어나서 그날 파이프라인을 2번 실행시켰을 때 데이터가 중복되어서 2번 들어가게 되면 문제가 된다. 즉, ETL 파이프라인에서도 멱등성을 보장해 주어야 하는 것이다. 주로 ETL 파이프라인에서 멱등성을 보장해주기 위해서 UPSERT 트랜잭션을 지원한다면 UPSERT 연산을 사용한다.
아래는 DataLake에서 ACID 트랜잭션을 지원하는 DeltaTable 에서 pyspark 를 사용한 UPSERT 예시이다. key 컬럼을 기준으로 동일한 값에 대해서 모두 업데이트하고, 없는 경우에는 insert 하게 된다.
아래의 코드에서는 target_table이 데이터를 증분적재하는 대상 테이블이며, sourceDF가 새로 들어온 데이터에 해당한다.
from delta.tables import DeltaTable
target_table = DeltaTable.forName(spark, "existing delta table name")
(target_table.alias("t")
.merge(sourceDF.alias("s"), "s.key = t.key")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())
연산을 지원하지 않는 경우에는 key 컬럼 값들을 사용하여 이미 타겟 테이블에 데이터가 적재가 되어있어서 중복되는 값들을 DELETE 한 후에, 타겟 테이블에 그날 증분 적재할 데이터를 INSERT 하게 된다.
-- 1. 날짜 범위에 해당하는 중복되는 데이터 삭제
DELETE FROM target_table
WHERE key_column IN (
SELECT key_column
FROM source_table
WHERE 1=1
AND key_column >= '2024-10-12'
AND key_column < '2024-10-13'
);
-- 2. 새로운 증분 데이터를 타겟 테이블에 삽입
INSERT INTO target_table
SELECT *
FROM source_table
WHERE 1=1
AND key_column >= '2024-10-12'
AND key_column < '2024-10-13';
3. 결제 (Credit) 시스템에서 멱등성
결제 시스템에서 멱등성을 보장하기 위해서 토스 페이먼츠 결제 시스템의 글을 많이 참고하였다. 멱등성 key API에 포함시켜서, 멱등성을 보장하고 안전한 결제 요청을 할 수 있게 하였다.
이에 따라서 Litestar Application의 Credit API 를 개발할 때에도 멱등키 (idemponency_key) 라는 값을 만들게 되었다. 그리고 API 요청이 들어올 때, 멱등키가 포함되어 있는지 확인한다. 이 멱등키는 멱등키만을 저장하기 위한 table을 만들었다. 멱등키가 포함된 요청이 들어왔을 때, 멱등키 table에 해당 멱등키와 매칭되는 요청 기록이 있는지 확인해서 중복되는 처리를 방지하였다.
Resumait에서 LLM 과 관련된 비즈니스 로직을 개발하기 위한 웹 개발 프레임워크로는 Litestar 를 사용했다. 우선, Credit API 작업을 위한 entity 정의는 아래와 같이 했다. Wallet 이라는 개념이 있으며, Wallet Entity 에 credit 값을 저장한다. CreditHistory는 credit을 사용한 내용을 로깅하기 위한 Entity이다. 마지막으로 IdempotencyKey가 멱등성 키를 저장하기 위한 테이블과 연동된다.
from uuid import UUID
from advanced_alchemy.base import UUIDAuditBase
from sqlalchemy import Integer, String
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.dialects.postgresql import UUID as psql_UUID
from typing import Literal
class CreditWallet(UUIDAuditBase):
user_id: Mapped[UUID] = mapped_column(psql_UUID(as_uuid=True), unique=True, nullable=False)
credit: Mapped[int] = mapped_column(Integer, nullable=False)
class CreditHistory(UUIDAuditBase):
user_id: Mapped[UUID] = mapped_column(psql_UUID(as_uuid=True), nullable=False)
current_credit: Mapped[int] = mapped_column(Integer, nullable=False)
transaction_type: Mapped[Literal['use', 'charge']] = mapped_column(String, nullable=False)
amount: Mapped[str] = mapped_column(Integer, nullable=False)
class IdempotencyKey(UUIDAuditBase):
idempotency_key: Mapped[UUID] = mapped_column(psql_UUID(as_uuid=True), unique=True, nullable=False)
Credit 을 사용하여 충전, 사용하는 비즈니스 로직은 다음과 같다.
class CreditWalletImpl(SQLAlchemyAsyncRepositoryService[CreditWallet]):
repository_type = CreditWalletRepo
async def check(self
, user_id: UUID) -> CreditWallet | None:
"""check current credit"""
credit = await self.repository.get_one_or_none(user_id=user_id)
if credit is None:
raise HTTPException(detail="Not Found", status_code=404)
return credit
async def create_wallet(self
, user_id:UUID) -> CreditWallet:
"""create wallet entity"""
wallet = CreditWallet(user_id=user_id, credit=1500)
await self.repository.add(wallet)
await self.repository.session.commit()
await self.repository.session.refresh(wallet)
return wallet
async def charge(self, user_id: UUID, amount: int) -> bool:
"""charge credit"""
wallet = await self.repository.get_one_or_none(user_id=user_id)
if wallet is None:
raise HTTPException(detail="Not Found", status_code=404)
wallet.credit += amount
await self.repository.update(wallet)
await self.repository.session.commit()
return True
async def use(self, user_id: UUID, amount: int) -> bool:
"""use credit"""
wallet = await self.repository.get_one_or_none(user_id=user_id)
if wallet and wallet.credit >= amount:
wallet.credit -= amount
await self.repository.update(wallet)
return True
return False
async def delete_wallet(self, user_id:UUID) -> bool:
"""delete wallet entity when the user account deleted"""
wallet = await self.repository.get_one_or_none(user_id=user_id)
if wallet is None:
raise HTTPException(detail="Not Found", status_code=404)
await self.repository.delete(wallet.id)
await self.repository.session.commit()
return True
다음으로, 멱등성키를 조회하고 추가하는 로직이다.
class IdempotencyKeyImpl(SQLAlchemyAsyncRepositoryService[IdempotencyKey]):
repository_type = IdempotencyKeyRepo
async def get_by_key(self, idempotency_key:UUID) -> IdempotencyKey | None:
"""search by idempotency key"""
return await self.repository.get_one_or_none(idempotency_key=idempotency_key)
async def add_new_key(self, idempotency_key:UUID) -> IdempotencyKey:
"""add a new idempotency key"""
key = IdempotencyKey(idempotency_key=idempotency_key)
await self.repository.add(key)
await self.repository.session.commit()
await self.repository.session.refresh(key)
return key
멱등성 키를 사용하여, 중복되는 요청 처리를 방지하는 서비스 레이어의 로직은 다음과 같다. charge_credit 함수(credit 충전)와 use_credit 함수(credit 사용)에서 멱등성 키를 parameter로 받고 있다.
@dataclass
class CreditService:
wallet_impl: CreditWalletImpl
history_impl: CreditHistoryImpl
idempotency_impl: IdempotencyKeyImpl
async def check_credit_wallet(self, user_id:UUID) -> CreditWalletSchema:
wallet = await self.wallet_impl.check(user_id=user_id)
return self.wallet_impl.to_schema(wallet, schema_type=CreditWalletSchema)
async def create_credit_wallet(self, user_id:UUID) -> CreditWalletSchema:
wallet = await self.wallet_impl.create_wallet(user_id=user_id)
return self.wallet_impl.to_schema(wallet, schema_type=CreditWalletSchema)
async def charge_credit(self
, user_id:UUID
, amount: int
, idempotency_key:UUID) -> CreditHistorySchema | dict[str:str]:
existing_key = await self.idempotency_impl.get_by_key(idempotency_key=idempotency_key)
if existing_key:
"""이미 처리된 요청"""
return {"message": "request was already processed"}
await self.idempotency_impl.add_new_key(idempotency_key=idempotency_key)
result = await self.wallet_impl.charge(user_id=user_id, amount=amount)
if result:
obj = await self.wallet_impl.check(user_id=user_id)
history = await self.history_impl.record_history(user_id=user_id
, amount=amount
, current=obj.credit
, transaction_type="charge")
return self.history_impl.to_schema(history, schema_type=CreditHistorySchema)
else:
return {"message": "user_id is not valid !"}
async def use_credit(self
, user_id:UUID
, amount: int
, idempotency_key:UUID) -> CreditHistorySchema | dict[str:str]:
existing_key = await self.idempotency_impl.get_by_key(idempotency_key=idempotency_key)
if existing_key:
"""이미 처리된 요청"""
return {"message": "request was already processed"}
await self.idempotency_impl.add_new_key(idempotency_key=idempotency_key)
result = await self.wallet_impl.use(user_id=user_id, amount=amount)
if result:
obj = await self.wallet_impl.check(user_id=user_id)
history = await self.history_impl.record_history(user_id=user_id
, amount=amount
, current=obj.credit
, transaction_type="use")
return self.history_impl.to_schema(history, schema_type=CreditHistorySchema)
else:
obj = await self.wallet_impl.check(user_id=user_id)
if obj:
return {"message": "insufficient credit !"}
return {"message": "user_id is not valid !"}
async def delete_credit_wallet(self, user_id:UUID) -> bool:
return await self.wallet_impl.delete_wallet(user_id=user_id)
멱등성에 관한 개념과 예시, 그리고 우리 서비스 Resumait의 Credit API 에 멱등성을 보장하고 있는 예시에 대해서 소개했다. Resumait의 유저들이 생성된 자기소개서를 잘 받아보고, Credit이 잘못 쓰이는 문제들이 없었으면 좋겠다

Reference
https://docs.tosspayments.com/blog/what-is-idempotency
멱등성이 뭔가요? | 토스페이먼츠 개발자센터
생소한 표현이지만 알고 보면 쉬워요. 멱등성에 대해 이해하고 API를 멱등하게 제공하기 위한 방법도 함께 알아봐요.
docs.tosspayments.com
'Backend' 카테고리의 다른 글
[sqlalchemy] Entity.metadata.create_all() 자동으로 테이블 생성하기 (0) | 2024.08.08 |
---|---|
[GitHub] REMOTE HOST IDENTIFICATION HAS CHANGED 해결방법 (0) | 2023.03.26 |