기존 서비스의 문제점
누구나 리포터 Lab 에서 카카오톡 챗봇 개발을 AWS Serverless 서비스들을 이용해서 개발하고 있다. 그런데 기존에 비동기적으로 요청을 처리하지 않아서 AI Agent 와 요청 / 응답을 원활하게 주고받을 수 없었다. 그래서, AI Agent 가 보내는 응답이 유실되는 것 처럼 보였다. 이러한 문제점을 개선하기 위해서 Message Queue 를 도입하는 것이 필요했다.
Message Queue 란
메시지 큐는 queue 라는 선입선출 (First In First Out) 구조를 가진 자료구조를 채택하여 메세지를 전달하는 시스템이다. queue 라는 자료구조가 두개의 끝을 가지고 있고 각각의 끝은 입구와 출구가 된다. 메세지 큐를 통해서 메세지를 전달하는 시스템에서, 메세지를 생성하고 전달하는 부분을 Producer라고 하며 메세지 큐로부터 메세지를 받아서 소비하는 부분을 Consumer라고 한다. 메세지 큐는 Producer와 Consumer 사이의 미들웨어 역할을 하는 시스템으로, 둘 사이에서 메세지를 안전하게 전달하고 관리하는 중간 브로커 역할을 한다.
Message Queue 의 역할
1. 메세지의 비동기 처리 : MQ(Message Queue) 는 Producer가 보낸 메시지를 바로 Consumer 에게 즉시 전달하지 않고, queue 에 저장한다. 이를 통해서 Producer와 Consumer가 비동기적으로 작업을 처리할 수 있고, 시스템의 부하를 분산할 수 있다. 따라서 챗봇의 요청을 분산처리할 수 있으므로 답변 성능 (속도) 를 최적화할 수 있다.
2. 시스템 간의 결합도 감소 (Decoupling) : Producer와 Consumer 가 직접 통신하지 않기 때문에, 시스템 간의 결합도를 줄일 수 있다. 따라서 Producer와 Consumer가 서로의 상태에 의존하지 않고 독립적으로 개발 및 확장될 수 있다. 특히 현재 개발중인 시스템도 lambda 함수를 기반으로, 기능별로 나뉘어 개발 / 배포 / 운영되고 있다. 독립적으로 분리된 lambda 함수들이 message 를 주고 받기 위해서, MQ를 활용하면 확장성있는 처리가 가능하다.
Message Queue 의 종류
MQ 의 종류중 대표적인 서비스 및 오픈소스로는 Amazon SQS, Apache Kafka, Rabbit MQ가 있다. 비교하면 차이점은 다음과 같다.
Amazon SQS
- AWS에서 모든 인프라, 스케일링을 관리하는 완전 관리형 (managed) 서비스이다.
- 기본적으로 Queue 기반 메시징 모델을 지원하며, 간단한 메시지 큐로 사용하기 적합하다.
- 메세지 순서 보장은 FIFO 큐 옵션에서만 가능하고, 추가적인 비용이 발생한다.
- 서버리스 어플리케이션, 비동기처리, MSA에서 사용된다.
Apache Kafka
- 분산 스트리밍 플랫폼으로, 빠른 데이터 스트리밍과 로그 저장에 최적화된 플랫폼이다.
- publish-subsribe 모델로, 메세지를 여러 소비자가 동시에 구독할 수 있다.
- 초고속 처리가 가능 (초당 수백만개 메시지 처리) 하며, 파티션과 브로커를 추가하여 수평 스케일링이 가능하다.
- 실시간 데이터 스트리밍 (ex. IoT, 주식거래), 로그 수집 및 분석 등의 사례에서 사용된다.
Rabbit MQ
- Queue와 publish-subscribe 모델을 모두 지원한다.
- 다양한 라우팅 기능을 제공한다.
- 실시간 통신 시스템, 복잡한 라우팅 처리, MSA에 적합하다.
Lambda 서버리스 아키텍처에서, Amazon SQS 서비스가 적합한 이유
Amazon SQS (Simple Queue Service) 는 마이크로서비스, 분산 시스템 및 서버리스 어플리케이션을 위한 완전 관리형 메시지 대기열이다.
현재 서비스에서 사용중인 Lambda 서버리스 아키텍처와 통합이 쉽다. SQS에 데이터가 들어오는 이벤트를 기반으로 lambda 함수가 트리거 될 수 있다. Lambda 함수 간 단순한 비동기 통신을 구현하기에도 적합했다. 또한 이 서비스를 이어 받아서 개발하게될 누구나 데이터 팀에서 개발 인력이 없는 상황이어서, 인프라 유지 보수에 대한 공수가 거의 없는 완전 관리형 서비스가 적합하다.
Amazon SQS 로 비동기 처리 개선하기
초기에는 lambda 함수간 직접적으로 메세지를 주고받았으나, SQS를 도입한 이후 위와 같이 아키텍처가 확장되었다. kakao chatbot 을 통해, 사용자가 질문 (요청)을 하면, 이 요청에 대해서 Chat Request Lambda에서 처리한다. Chat Request Lambda 는 chat에 식별자를 부여한 후, on-message-request-sqs에 전송한다. Redis에는 해당 chat 이 처리중임을 알리기 위해서 lock 상태로 변환 한 후 저장한다.
export const sendMessage = async (
queueName: string,
message: SQSMessage,
): Promise<void> => {
const queueUrl = `${ENDPOINT}/${queueName}`;
console.log("Sending message to queue:", queueUrl);
try {
const params = {
QueueUrl: queueUrl,
MessageBody: JSON.stringify(message),
};
const command = new SendMessageCommand(params);
const response = await sqsClient.send(command);
console.log("Message sent successfully:", response);
} catch (error) {
console.error("Error sending message:", error);
throw error;
}
};
on-message-request-sqs 에 새로운 message가 들어오면, 이 event에 의해서 두번째 lambda 함수인 AIAgentRequestLambda가 실행된다. Event 처리는 SQSEvent 타입을 통해 처리할 수 있다. on-message-request-sqs에서 message를 꺼낸다. SQS는 완전 실시간으로 하나씩 메세지를 가져오는것이 아니라, batch 로 이벤트를 가져오므로, Records 라는 배열로 전달된다. Records를 순회하면서 메세지를 하나씩 처리해주어야 한다.
import type {
SQSEvent,
APIGatewayProxyStructuredResultV2,
Context
} from "aws-lambda";
export const handler: Handler = async (
event: SQSEvent,
_context: Context,
): Promise<APIGatewayProxyStructuredResultV2> => {
// 1. [on-message-request-sqs] 에서 record polling
console.log(`polling records from SQS queue : ${MESSAGE_REQUEST_HANDLE_QUEUE}`)
if (!event.Records || event.Records.length === 0) {
console.log("No SQS record received", event);
}
// 2. parsing SQS messages
const chatMessages: ChatSQSMessage[] = [];
for (const record of event.Records) {
if (!record.body) {
console.log("No record body received");
continue;
}
try {
const parsedRecord = JSON.parse(record.body);
const chatMessage = mapToChatSQSMessage(parsedRecord);
console.log("Received SQS [on-message-request-sqs] message: ", chatMessage);
if (chatMessage) {
chatMessages.push(chatMessage);
}
} catch (error) {
console.error("[Error] parsing [on-message-request-sqs] message failed: ", error);
console.log(record.body);
}
}
}
메세지에 담긴 사용자의 발화를 AIAgent로 전달하여 응답을 받는다. 그리고 이 응답을 on-message-send-sqs에 넣는다.
// 3. Request ai agent
for (const [id, chatMessage] of chatMessages.entries()) {
if (!chatMessage.payload) {
console.log("Payload is not available!:", chatMessage);
continue;
}
try {
// 하나의 payload 에 대해서 하나의 응답을 받음
const response = await requestAIAgent(AI_AGENT_ENDPOINT, chatMessage.chatId, chatMessage.payload, id);
// response 를 SQS [on-message-send-sqs] queue 에 전송
await sendSQSAIAgentMessage(MESSAGE_SEND_HANDLE_QUEUE, chatMessage.chatId, response.data.messages);
} catch (error) {
console.log(error);
}
}
마지막으로 callbackurl 을 통해서 카카오톡 챗봇 callback 함수를 사용하는 Chat Callback Response Lambda 함수에서 on-message-send-sqs 에 새로운 message가 들어온 이벤트를 감지하여 kakao talk chatbot으로 사용자에게 답변을 제공한다.
SQS를 통해 비동기로 전환한 과정과 MQ에 대한 기본적인 내용을 정리해보았다. 대세인 LLM 으로 챗봇을 만드는 프로젝트들이 많은데, LLM 특성상 프롬프트가 정교할 수록 응답 시간이 길게 걸린다. 비동기 처리를 통해 성능을 개선하는 것이 많이 도움이 될 것 같다.
'프로젝트 > 테크포임팩트' 카테고리의 다른 글
[테크포임팩트] KakaoTalk 챗봇 개발을 위한 서버리스 서비스 (AWS Lambda) 기반 아키텍처 (2) | 2024.12.22 |
---|