Kafka - 스트림 처리(3): 토폴로지, 스트림 처리 프레임워크
- 스트림 처리의 다른 모범적인 활용 사례
- 카프카와 함께 사용할 스트림 처리 프레임워크를 선택하기 위한 기준들
목차
개발 환경
- mac os
- openjdk 17.0.12
- zookeeper 3.9.2
- apache kafka: 3.8.0 (스칼라 2.13.0 에서 실행되는 3.8.0 버전)
1. 카프카 스트림즈: 아키텍처
Kafka - 스트림 처리(2): 카프카 스트림즈 예시 에서 카프카 스트림즈 API 를 사용해서 잘 알려진 스트림 처리 디자인 패턴을 구현하는 방법에 대해 알아보았다.
여기서는 카프카 스트림즈 라이브러리가 실제로 어떻게 동작하고 규모를 확장시키는지에 대해 더 잘 이해하기 위해 내부를 열고 디자인 원칙을 이해해본다.
1.1. 토폴로지 생성
모든 스트림즈 애플리케이션은 하나의 토폴로지를 구현하고 생성한다.
토폴로지는 DAG(Directed Acyclic Graph) 라고도 불리며, 모든 이벤트가 입력에서 출력으로 이동하는 동안 수행되는 작업과 변환 처리의 집합이다.
아래는 1.1. 단어 개수 세기: 맵/필터 패턴, 간단한 집계 연산 예시의 토폴로지이다.
토폴로지는 프로세스들로 구성되는데 위 그림에서 프로세서는 타원으로 표현하였다.
대부분의 프로세서는 필터, 맵, 집계 연산과 같은 데이터에 대한 처리 작업을 구현한다.
토픽으로부터 데이터를 읽어와서 넘겨주는 소스 프로세서와 데이터를 넘겨받아서 토픽에 쓰는 싱크 프로세스도 있다.
토폴로지는 항상 하나 이상의 소스 프로세서로 시작해서 하나 이상의 싱크 프로세스로 끝난다.
1.2. 토폴로지 최적화: StreamsConfig.TOPOLOGY_OPTIMIZATION
카프카 스트림즈는 DSL API 를 사용해서 개발된 애플리케이션의 각 DSL 메서드를 독립적으로 저수준 API 로 변환하여 실행한다. 즉, 카프카 스트림즈 DSL 은 직관적인 고수준 API 를 제공하지만 실행 시엔 저수준 Processor API 로 변환된다.
필요 시 Processor API 로 세밀한 제어도 가능하며, 더 정밀한 제어가 필요하다면 직접 Processor API 를 사용하는 것도 고려할 수 있다.
예를 들어 아래와 같은 DSL 코드가 있다고 해보자.
val builder = StreamsBuilder()
val wordCounts = builder.stream<String, String>("input-topic")
.flatMapValues { text -> text.split(" ") }
.groupBy({ _, word -> word })
.count()
wordCounts.toStream().to("output-topic")
내부적으로 변환되는 Processor 토폴로지 개념은 아래와 같다.
- 소스 노드
- 카프카로부터 데이터를 읽어옴 (input-topic)
- 프로세서 노드
- flatMapValues 로 단어 분리
- 프로세서 노드
- groupBy 로 단어 그룹화
- 상태 저장(State Store)
- count 로 연산 결과 저장
- 싱크 노드
- 결과를 카프카 토픽에 기록(output-topic)
카프카 스트림즈 애플리케이션의 실행은 아래 3단계로 이루어진다.
KStream
,KTable
객체를 생성하고 여기에 필터, 조인과 같은 DSL 작업을 수행함으로써 논리적 토폴로지 정의StreamsBuilder.build()
메서드가 논리적 토폴로지로부터 물리적 토폴로지를 생성함- 최적화가 수행되는 곳
KafkaStreams.start()
가 토폴로지를 실행시킴- 데이터를 읽고 처리하고 쓰는 곳이 여기임
최적화를 하기 위해서 StreamsConfig.TOPOLOGY_OPTIMIZATION
설정을 StreamsConfig.OPTIMIZE
로 설정한 뒤 build(props)
를 호출하면 된다.
이 설정 없이 build()
를 호출하면 최적화는 적용되지 않는다.
애플리케이션을 테스트할 때는 최적화된 것과 안된 것을 비교해서 실행 시간과 카프카에 쓰여지는 데이터의 양을 비교해보는 것이 좋다.
최적화를 하면 이점만 있을 것 같지만 모든 상황에서 그렇지 않기 때문에 비교 테스트가 필요하다.
- 최적화가 항상 성능 향상을 보장하지 않음
- 카프카 스트림즈의 최적화는 특정 패턴이나 구조에서만 성능 개선이 가능함
- 예를 들어 최적화된 토폴로지가 내부적으로 연산을 병합하거나 중복 연산을 제거하는 등의 작업을 하지만 모든 애플리케이션이 그런 구조로 짜여있지는 않음
- 오히려 추가적인 최적화 과정이 CPU 오버헤드를 발생시켜서 성능 저하가 발생할 수도 있음
- 디버깅과 유지 보수의 어려움
- 최적화를 활성화하면 토폴로지의 구조가 변경됨
- 최적화된 토폴로지는 개발자가 작성한 코드와는 다르게 실행될 수 있기 때문에 디버깅이 어려워질 수 있음
- 문제가 발생했을 때 최적화가 적용된 상태에서는 로직의 흐름을 추적하기가 더 복잡해질 수 있음
- 메모리 사용량과 자원 소비 차이
- 최적화된 경우 연산이 병합되거나 캐싱 전략이 달라질 수 있기 때문에 메모리 사용량이라 디스크 I/O 패턴이 예상과 다르게 나타날 수 있음
- 최적화의 효과를 파악하기 위한 기준 필요
- 최적화를 적용하기 전과 후의 실행 시간, 처리량, 리소스 사용량 등을 비교해야 최적화가 실제로 이득이 되는지 명확히 판단할 수 있음
- 이는 성능 테스트의 baseline 을 설정하는 과정으로 항상 필수적임
따라서 무조건 최적화를 하는 것이 아니라 최적화 적용 전후를 비교한 뒤 적용하는 것이 안정적이고 최적의 결과를 얻을 수 있는 방법이다.
1.3. 토폴로지 테스트: TopologyTestDriver
카프카 스트림즈 애플리케이션의 주된 테스트 툴은 TopologyTestDriver
이다.
입력 데이터를 정의하고, 목업 입력 토픽에 데이터를 쓰고, 테스트 드라이버를 써서 토폴로지를 실행시키고, 목업 출력 토픽에서 결과를 읽은 뒤 예상하는 결과와 비교 검증하는 것이다.
하지만 이 툴은 카프카 스트림즈의 캐시 기능을 시뮬레이션 해주지는 않으므로 찾을 수 없는 에러도 많다.
통합 테스트를 할 때 카프카 스트림즈의 경우 EmbeddedKafkaCluster
와 Testcontainers
두 통합 테스트 프레임워크가 자주 사용된다.
EmbeddedKafkaCluster
는 JVM 상에 카프카 브로커를 하나 띄워주는 방식이고, Testcontainers
는 도커 컨테이너를 사용하여 카프카 브로커와 기타 테스트에 필요한 다른 요소들을 띄워주는 방식이다.
도커를 사용해서 카프카와 그 의존성, 사용되는 리소스를 테스트 애플리케이션으로부터 완전히 격리시키기 때문에 Testcontainers
를 더 권장한다.
토폴로지 테스트에 대한 상세한 예제 코드는 Doc:: Testing Kafka Streams – A Deep Dive 를 참고하세요.
1.4. 토폴로지 규모 확장
카프카 스트림즈는 하나의 애플리케이션 인스턴스 안에 다수의 스레드가 실행될 수 있게 함으로써 규모 확장과 서로 다른 애플리케이션 인스턴스 간에 부하 분산이 이루어지도록 한다.
카프카 스트림즈 엔진은 토폴로지 실행을 다수의 태스크로 분할함으로써 병렬 처리한다.
스트림즈 엔진은 애플리케이션이 처리하는 토픽의 파티션 수에 따라 태스크의 수를 결정하고, 각 태슼는 전체 파티션 중 일부를 처리한다.
즉, 각 태스크는 자신이 담당하는 파티션들을 구독해서 이벤트를 읽어온 후, 이벤트를 읽어올 때마다 이 파티션에 적용될 모든 처리 단계를 실행시킨 다음에 결과를 싱크에 쓴다.
이러한 태스크들은 서로 완전히 독립적으로 실행될 수 있기 때문에 카프카 스트림즈에서 병렬 처리의 기본 단위가 된다.
개발자는 애플리케이션 인스턴스가 실행시킬 스레드의 수를 결정할 수 있다.
만일 다수의 스레드를 활용할 수 있다면, 각 스레드는 해당 애플리케이션이 생성하는 전체 태스크의 일부를 실행하게 될 것이고, 다수의 애플리케이션 인스턴스가 다수의 서버에서 실행될 경우 각 서버의 스레드별로 서로 다른 태스크가 실행될 것이다.
- 하나의 인스턴스에 다수의 스레드 활용(멀티스레딩)
- 각 스레드는 병렬적으로 태스크 처리
- 멀티스레딩은 CPU 코어를 더 효율적으로 사용하고, 대기 시간이 긴 I/O 작업을 병렬 실행하는데 유용함
- 다수의 애플리케이션 인스턴스(스케일 아웃)
- 각 인스턴스는 서로 다른 프로세스로 동작하며, 독립적인 작업을 수행함
- 카프카 스트림즈는 여러 개의 애플리케이션 인스턴스를 실행하면 파티션 단위로 워크로드가 자동으로 분배됨
- 다수의 서버에서 실행(분산 시스템)
- 다수의 서버에서 동일한 애플리케이션이 실행되면 각 서버의 스레드들은 서로 다른 태스크들을 병렬로 처리함
- 카프카, Spark, Flink 같은 스트리밍 프레임워크에서는 파티션을 기반으로 워크로드를 자동으로 분배함
- 이를 통해 장애 복구 및 부하 분산이 용이해 짐
즉, 하나의 애플리케이션 인스턴스 내에서는 여러 스레드가 병렬로 작업이 가능하고, 여러 애플리케이션 인스턴스를 실행하면 서로 다른 서버에서 독립적으로 실행되며 병렬 처리가 가능하다.
최적의 성능을 위해서는 멀티스레딩 + 다중 인스턴스 + 여러 서버의 조합을 고려하는 것이 좋다.
처리하는 토픽의 파티션 수만큼의 태스크를 생성하는 것이 스트리밍 애플리케이션이 규모를 확장하는 방식이다.
만일 더 빨리 처리하고 싶다면 스레드 수를 늘리고, 서버의 자원이 고갈되었다면 다른 서버에 추가 인스턴스를 띄우면 된다.
카프카는 자동으로 작업을 코디네이션 한다. 즉, 카프카가 각 태스크에 파티션을 나눠서 할당해주면 각 태스크는 자신이 할당받은 파티션에서 독립적으로 이벤트를 받아와서 처리하고, 토폴로지에 정의된 집계 연산에 관련된 로컬 상태를 유지한다.
1.4.1. 태스크 간 의존 관계
1.4.1.1. 다수의 파티션에서 입력을 가져와서 처리해야 하는 경우
예를 들어 1.3. 클릭 스트림 확장: 스트리밍 조인 처럼 두 스트림을 조인해야 한다면 결과를 내놓기 위해 각 파티셔으로부터 데이터를 읽어와야 한다.
카프카 스트림즈는 각 조인 작업에 필요한 모든 파티션들을 하나의 태스크에 할당함으로써 해당 태스크가 필요한 파티션 전부로부터 데이터를 읽어온 뒤 독립적으로 조인을 수행할 수 있도록 함으로써 이러한 상황을 해결한다.
카프카 스트림즈가 조인 작업에 사용될 모든 토픽에 대해 동일한 조인 키로 파티션된 동일한 수의 파티션을 가질 것을 요구하는 이유가 바로 이 때문이다.
1.4.1.2. 애플리케이션이 리파티셔닝을 해야하는 경우
클릭 스트림 확장의 예시에서 모든 이벤트는 userId 를 키 값으로 갖는데 만일 이를 우편번호별로 통계를 생성하고 싶다면 카프카 스트림즈를 데이터를 우편번호 기준으로 리파티션 한 뒤에 새로운 파티션을 가지고 집계 연산을 실행시켜야 한다.
태스크 1이 파티션 1의 데이터를 처리한 후 데이터 리파티셔닝을 수행하는 프로세스(= groupBy)가 뒤따른다고 하자.
이 경우 셔플을 하거나 다른 태스크로 이벤트를 보내야한다.
카프카 스트림즈는 리파티션이 호출되면 새로운 키와 파티션을 가지고 새로운 토픽에 이벤트를 쓰고, 이후에 오는 태스크들이 새로운 토픽에서 이벤트를 읽어와서 처리를 계속한다.
따라서 리파티셔닝은 전체 토폴로지를 2개의 서브 토폴로지로 분할한다.
두 번째 서브 토폴로지는 첫 번째 서브 토폴로지에 의존하게 되지만 첫 번째 태스크 집합은 자기의 속도대로 데이터를 토픽에 쓰고, 두 번째 태스크 집합 역시 자기 속도대로 토픽에서 데이터를 읽어와서 처리하기 때문에 두 태스크 집합은 여전히 독립적이고 병렬로 실행된다.
태스크 사이에 공유된 리소스가 없기 때문에 동일한 스레드나 서버에서 실행될 필요도 없다.
이렇게 파이프라인의 서로 다른 부분 사이에 의존성을 줄여주는 점이 카프카의 장점 중 하나이다.
1.5. 장애 처리
애플리케이션에 장애가 발생해서 재시작이 필요한 경우, 장애가 발생하기 전 마지막 커밋된 오프셋을 가져옴으로써 스트림의 마지막으로 처리된 지점부터 처리를 재개할 수 있다.
로컬 상태 저장소를 갖고 있던 서버를 교체한다던지의 로컬 상태 저장소가 유실되었을 경우엔 스트림즈 애플리케이션은 항상 카프카로부터 체인지로그를 읽어옴으로써 로컬 상태 저장소를 복구한다.
카프카 스트림즈는 태스크 고가용성을 위해 카프타의 컨슈머 코디네이터 기능을 사용한다.
만일 태스크에 장애가 발생하면 해다아 태스크는 사용 가능한 다른 스레드에서 재시작하게 된다.
카프카의 ‘정확히 한 번’ 의미구조, 정적 그룹 멤버십, 협력적 리밸런스와 같은 카프카 컨슈머 그룹 코디네이션의 기능이 카프카 스트림즈에도 적용된다.
한 가지 주의할 점은 복구 속도이다.
장애가 발생한 스레드에서 실행되고 있던 태스크를 다른 스레드가 넘겨받아 처리를 시작해야 할 때, 가장 먼저 해야할 일ㄹ은 현재 집계중인 윈도우 등 저장된 상태를 복구시키는 것이다.
카프카에 저장된 내부 토픽을 다시 읽어와서 카프카 스트림즈의 상태 저장소를 업데이트하는 식으로 복구할 수도 있지만, 이 작업을 수행하는 동안 일부 데이터에 대해서는 스트림 처리 작업이 진행되지 않을 것이고 그만큼 가용성이 줄어들게 된다.
복구 시간을 줄이는 방법은 카프카 스트림즈 토픽에 매우 강력한 압착 설정을 걸어놓는 것이다.
min.compaction.lag.ms
는 낮추고, 세그먼트 크기는 기본값이 1GB 대신 100MB 정도로 낮추면 된다.
장애 복구를 가장 빠르게 하는 방법은 스탠바이 레플리카를 설정하는 것으로 이 방법을 사용할 것을 권장한다.
스탠바이 레플리카는 스트림 처리 애플리케이션에서 현재 작동 중인 태스크를 단순히 따라가기만 하는 태스크로써, 다른 서버에서 현재의 상태를 유지하는 역할을 한다.
장애가 발생하면 이 태스크는 이미 최신 상태이므로 중단 시간이 거의 없이 바로 처리를 재개할 수 있다.
카프카 스트림즈의 규모 확장성과 고가용성에 대한 좀 더 상세한 내용은 Doc:: State restoration during workload rebalance 를 참고하세요.
2. 스트림 처리 프레임워크
스트림 처리 프레임워크를 선택할 때는 어떤 형태의 애플리케이션을 개발하고자 하는지를 고려해야 한다.
개발하고자 하는 애플리케이션의 종류에 따라 사용해야 하라 스트림 처리 솔루션 종류 역시 달라진다.
- 데이터 수집
- 하나의 시스템에서 다른 시스템으로 데이터를 전달하는 것이 목적임
- 대상 시스템에 맞춰 약간의 변형을 가할 수 있음
- 스트림 처리 시스템에 필요한 것인지 아니면 좀 더 단순한, 수집에 최적화된 카프카 커텍트같은 시스템이 필요한 것인지 고민할 필요가 있음
- 밀리초 단위 작업
- 거의 즉각적인 응답을 필요로 함
- 사기 탐지 활용 사례의 상당수가 여기에 속함
- 카프카 스트림즈를 사용하는 것 자체를 다시 생각해봐야 함
- 이 경우 대체로 요청-응답 패턴이 더 나음
- 비동기 마이크로서비스
- 큰 비즈니스의 일부로서 재고 변경 등 단일한 기능을 수행함
- 성능 향상을 위해 로컬 상태에 이벤트 캐시를 유지할 수 있어야 함
- 사용하는 메시지 버스(예-카프카)와 잘 통합되고, 업스트림의 변경 사항을 마이크로서비스의 로컬 상태에 쉽게 반영할 수 있으며 로컬 상태를 캐시 혹은 구체화된 뷰 형태로 활용 가능한 스트림 처리 시스템이 필요함
- 준 실시간 데이터 분석
- 이러한 애플리케이션들은 데이터를 작게 분할해서 비즈니스에 유용한 인사이트를 얻어내기 위해 복잡한 집계 연산과 조인을 수행함
- 로컬 저장소를 잘 지원하는 스트림 처리 시스템이 필요함
- 로컬 캐시나 구체화된 뷰를 유지하는 것 보다는 로컬 캐시 없이는 구현하기 까다로운 복잡한 집계 연산, 윈도우, 조인 등을 잘 지원하는 것이 더 중요함
- 커스텀 집계, 윈도우, 다양한 조인 타입을 지원하는 API 도 필요함
정리하며..
참고 사이트 & 함께 보면 좋은 사이트
본 포스트는 김병부 저자의 O’REILLY 카프카 핵심 가이드 2판를 기반으로 스터디하며 정리한 내용들입니다.
- 카프카 핵심 가이드
- 예제 코드 & 오탈자
- Doc:: Kafka
- Git:: Kafka
- Doc:: Testing Kafka Streams – A Deep Dive
- Doc:: State restoration during workload rebalance