Kafka - 스트림 처리(2): 카프카 스트림즈 예시


  • 카프카 스트림즈를 사용해서 주가의 이동 평균을 계산하는 예시

목차


개발 환경

  • mac os
  • openjdk 17.0.12
  • zookeeper 3.9.2
  • apache kafka: 3.8.0 (스칼라 2.13.0 에서 실행되는 3.8.0 버전)

1. 카프카 스트림즈: 예시

여기서는 카프카 스트림즈 API 를 사용하는 예시에 대해 알아본다.

아파치 카프카는 저수준의 Processor API 와 고수준의 스트림즈 DSL, 2개의 스트림 API 를 제공한다.
여기서는 고수준의 스트림즈 API 를 사용할 것이며, 이 DSL 을 사용하면 스트림에 포함된 이벤트에 일련의 연속적인 변환을 정의함으로써 스트림 처리 애플리케이션을 정의할 수 있다.
여기서 변환은 필터와 같이 단순한 것일 수도 있고, 스트림-스트림 조인처럼 복잡한 것일수도 있다.

저수준 API 는 변환을 직접 생성할 수 있게 해준다.

저수준 API 에 대한 내용은 Processor API 를 참고하세요.

DSL API 를 사용하는 애플리케이션은 항상 StreamsBuilder 를 사용하여 처리 토폴로지를 생성함으로써 시작한다.
처리 토폴로지는 스트림 안의 이벤트에 적용되는 변환을 정점으로 하는 유향 비순환 그래프(DAG, Directed Acyclic Graph) 이다.

  • 유향(Directed)
    • 데이터 흐름이 명확하게 한 방향으로만 이동함
    • 예) Source → Transform → Sink (출발점에서 시작하여 종료점으로 이동)
  • 비순환(Acyclic)
    • 사이클이 존재하지 않음
    • 데이터가 한번 흐르면 같은 지점을 반복해서 방문하지 않음
    • 무한 루프나 데이터 처리의 중복을 방지함
  • 그래프(Graph)
    • 각 노드는 정점(Vertex) 이 되고, 데이터의 흐름(연산)은 간선(Edge) 로 연결됨
    • 예) 필터링, 매핑, 조인, 집계 등의 변환 연산이 각각 정점이 됨

처리 토폴로지를 생성하면 KafkaStreams 실행 객체를 생성하고, KafkaStreams 객체를 시작시키면 스트림 안의 이벤트에 처리 토폴로지를 적용하는 다수의 스레드가 시작된다.
KafkaStreams 객체를 닫으면 처리는 끝난다.

<DAG 의 장점>

  • 명확한 데이터 흐름
    • 데이터가 어떻게 처리되고 이동하는지 한 눈에 파악할 수 있음
  • 최적화 가능
    • KafkaStreams 엔진이 DAG 를 기반으로 병렬 처리 및 최적화를 수행함
  • 에러 방지
    • 순환이 없기 때문에 무한 루프와 같은 문제가 발생되지 않음

<Kafka Streams 의 토폴로지 구성 요소>

  • Source Processor
    • 입력 데이터를 가져오는 시작점
    • 예) 토픽에서 데이터를 읽어오는 부분
  • Processor
    • 데이터를 처리하거나 변환하는 연산이 이루어지는 노드
    • 예) map(), filter(), flatMap() 등의 연산
  • State Store(상태 저장소)
    • 상태를 유지하기 위한 저장소
    • 예) 집계, 조인 사용 시, KTable 의 상태 관리
  • Sink Processor
    • 처리된 데이터를 외부로 출력하는 노드
    • 예) 다른 토픽으로 다시 전송, DB 저장

간단한 DAG 구조

val builder = StreamsBuilder()

// Source Node(입력 데이터 수신)
val sourceStream: KStream<String, String> = builder.stream("input-topic")

// Processor Node(데이터 변환)
val processedStream = sourceStream
  .fileter { _, value -> value.contains("important") }  // 필터링 노드
  .mapValues { value -> value.uppercase() } // 변환 노드

// Sink Node(출력 데이터 전송)
processedStream.to("output-topic")  // 결과 전송 노드

위를 DAG 구조로 표현하면 아래와 같다.

[input-topic] --> [filter] --> [mapValues] --> [output-topic]
  • 정점(Vertex)
    • 각 변환 연산
  • 간선(Edge)
    • 데이터 흐름의 방향성
  • 비순환성(Acyclic)
    • 사이클없이 직선적인 처리 흐름 유지

KTable 조인의 DAG 구조 예시는 아래와 같다.

[user-topic] ----
                  --> [join] --> [output-topic]
[order-topic] ---

집계와 필터링 DAG 구조 예시는 아래와 같다.

[input-topic] --> [filter] --> [groupByKey] --> [aggregate] --> [output-topic]

1.1. 단어 개수 세기: 맵/필터 패턴, 간단한 집계 연산

소스는 github 에 있습니다.

여기서는 카프카 스트림즈를 이용하여 단어 개수를 세는 예시에 대해 알아본다.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.assu.study</groupId>
        <artifactId>kafka_me</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>chap14_1</artifactId>

    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <kafka.version>3.8.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>

    </dependencies>
</project>

스트림 처리 애플리케이션을 개발하기 위해 가장 먼저 할 일은 카프카 스트림즈를 설정하는 것이다.

package com.assu.study.chap14_1;

import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;

public class WordCountExample {
  public static void main(String[] args) {
    Properties props = new Properties();

    // 모든 카프카 스트림즈 애플리케이션은 애플리케이션 ID 를 가짐
    // 애플리케이션 ID 는 내부에서 사용하는 로컬 저장소와 여기 연관된 토픽에 이름을 정할때도 사용됨
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");

    // 카프카 스트림즈 애플리케이션은 항상 카프카 토픽에서 데이터를 읽어서 결과를 카프카 토픽에 씀
    // 카프카 스트림즈 애플리케이션은 인스턴스끼리 서로 협력하도록 할 때도 카프카를 사용하므로 애플리케이션이 카프카를 찾을 방법을 지정해주어야 함
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // 데이터를 읽고 쓸 때 애플리케이션은 직렬화/역직렬화를 해야하므로 기본값으로 쓰일 Serde 클래스 지정
    // 필요하다면 스트림즈 토폴로지를 생성할 때 재정의할 수도 있음
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  }
}

설정을 했으니 이제 스트림즈 토폴로지를 생성한다.

// StreamBuilder 객체를 생성하고, 앞으로 입력으로 사용할 토픽 지정
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("wordcount-input");

final Pattern pattern = Pattern.compile("\\W+");
KStream counts =
    source
        // 입력 토픽에서 읽어오는 각 이벤트는 단어들로 이루어진 문자열 한 줄임
        // 정규식을 사용하여 이 문자열을 다수의 단어들로 분할한 후 현재 이벤트 레코드의 밸류값인 각 단어를 가져다가 이벤트 레코드 키로 넣어줌으로써 그룹화에 사용될 수 있도록 함 
        .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))  // KStream<String, String> 반환
        .map((key, value) -> new KeyValue<Object, Object>(value, value))  // KStream<Object, Object> 반환
        // 단어 "the" 를 필터링함 (필터링을 이렇게 쉽게 할 수 있음)
        .filter((key, value) -> (!value.equals("the"))) // KStream<Object, Object> 반환
        // 키 값을 기준으로 그룹화함으로써 각 단어별로 이벤트의 집합을 얻어냄
        .groupByKey() // KGroupedStream<Object, Object> 반환
        .count()  // KTable<Object, Long> 반환
        // 각 집합에 얼마나 많은 이벤트가 포함되어 있는지 셈
        // 계산 결과는 Long 타입임
        .mapValues(value -> Long.toString(value))
        .toStream();

// 결과를 카프카에 씀
counts.to("wordcount-output");

애플리케이션을 수행할 변환의 흐름을 정의했으니 이제 실행시킨다.

// 위에서 정의한 토폴로지와 설정값을 기준으로 KafkaStreams 객체 정의
KafkaStreams streams = new KafkaStreams(builder.build(), props);

// 동작을 재설정하기 위한 것임
// 프로덕션에서는 절대 사용하지 말 것
// 시작할 때마다 애플리케이션이 Kafka 의 상태를 재로드함
streams.cleanUp();

// 카프카 스트림즈 시작
streams.start();

Thread.sleep(5_000L);

// 잠시 뒤 멈춤
streams.close();

groupBy() 를 사용해서 데이터를 리파티션한 뒤 각 단어의 개수를 셀 때마다 각 단어를 키 값으로 갖는 레코드의 개수를 저장하는 단순한 로컬 상태를 유지한다.
만일 입력 토픽이 여러 개의 파티션을 갖고 있다면 애플리케이션 인스턴스를 여러 개 띄움으로써 카프카 스트림즈 처리 클러스터를 구성할 수 있다.

카프카 스트림즈 API 를 사용하면 단순히 애플리케이션 인스턴스를 여러 개 띄우는 것만으로도 처리 클러스터를 구성할 수 있다.


1.2. 주식 시장 통계: 윈도우 집계

소스는 github 에 있습니다.

주식을 프로듀싱하는 코드는 Git:: 주식 시장 통계 전체 코드 를 참고하세요.

여기서는 주식의 종목 콛, 호가와 수량을 포함하는 주식 시장 거래 이벤트 스트림을 읽어오는 예시에 대해 알아본다.

  • 매도 호가(ask price): 매도자가 팔고자 하는 가격
  • 매수 호가(bid price): 매수자가 사고자 하는 가격
  • 매도량(ask size): 매도자가 지정된 가격으로 팔고자 하는 주식의 양

여기서는 단순화를 위해 매수 쪽은 완전히 무시하고, 데이터에 타임스탬프도 포함하지 않고 카프카 프로듀서가 부여하는 이벤트 시간을 대신 사용한다.

아래와 같은 윈도우가 적용된 통계값을 포함하는 결과 스트림을 생성한다.

  • 5초 단위 시간 윈도우별로 가장 좋은(최저) 매도가
  • 5초 단위 시간 윈도우별 거래량
  • 5초 단위 시간 윈도우별 평균 매도가

모든 통계값은 매초 갱신되며, 단순화를 위해 10개의 종목만 거래되고 있다고 가정한다.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.assu.study</groupId>
        <artifactId>kafka_me</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>

    <artifactId>chap14_2</artifactId>

    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <kafka.version>3.8.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.12.1</version>
        </dependency>
    </dependencies>
</project>

먼저 카프카 스트림즈를 설정한다.

Chap142Application.java

// input 은 거래의 스트림임
// output 은 2개의 스트림임
// - 10초마다 최소 및 평균 매도 호가를 가짐
// - 매분 최소 매도 호가가 가장 낮은 상위 3개 주식
public class Chap142Application {

  public static void main(String[] args) throws IOException {
    // ===== 애플리케이션 설정
    Properties props;
    if (args.length == 1) {
      props = LoadConfig.loadConfig(args[0]);
    } else {
      props = LoadConfig.loadConfig();
    }
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat-2");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName());
  }

  // 키는 문자열이지만 밸류는 종목 코드, 매도 호가, 매도량을 포함하는 Trade 를 사용할 것이므로 이 객체를 
  // 직렬화/역직렬화하기 위해 Gson 라이브러리를 사용해서 자바 객체에 대한 시리얼라이저/디시리얼라이저 생성
  public static final class TradeSerde extends CustomWrapperSerde<Trade> {
    public TradeSerde() {
      super(new CustomJsonSerializer<Trade>(), new CustomJsonDeserializer<>(Trade.class));
    }
  }
}

1.1. 단어 개수 세기: 맵/필터 패턴, 간단한 집계 연산 에서는 키와 밸류 모두 문자열을 사용했기 때문에 시리얼라이저와 디시리얼라이저 둘 다 Serde.String() 클래스를 사용했다.
여기서는 키는 여전히 문자열이지만 밸류는 종목 코드, 매도 호가, 매도량을 포함하는 Trade 를 사용할 것이기 때문에 이 객체를 직렬화/역직렬화하기 위해 커스텀 시리얼라이저https://assu10.github.io/dev/2024/06/22/kafka-producer-2/#11-%EC%BB%A4%EC%8A%A4%ED%85%80-%EC%8B%9C%EB%A6%AC%EC%96%BC%EB%9D%BC%EC%9D%B4%EC%A0%80와 디시리얼라이저를 생성한다.

편한 작업을 위해 Gson, Avro, Protobuf 와 같은 라이브러리를 사용해서 Serde 를 생성하는 것이 좋다.

LoadConfig.java

package com.assu.study.chap14_2;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;

/**
 * 파일에서 구성 로드
 * 주로 연결 정보를 위한 것으로 다시 컴파일하지 않고도 클러스터 간에 전환할 수 있음
 */
public class LoadConfig {
    private static final String DEFAULT_CONFIG_FILE =
            System.getProperty("user.home") + File.separator + ".ccloud" + File.separator + "config";

    static Properties loadConfig() throws IOException {
        return loadConfig(DEFAULT_CONFIG_FILE);
    }

    static Properties loadConfig(String configFile) throws IOException {
        if (!Files.exists(Paths.get(configFile))) {
            throw new RuntimeException(configFile + "does not exists.");
        }

        final Properties cfg = new Properties();

        try (InputStream inputStream = new FileInputStream(configFile)) {
            cfg.load(inputStream);
        }
        return cfg;
    }
}

/model/Trade.java

package com.assu.study.chap14_2.model;

public class Trade {
  String type;
  String ticker;
  double price;
  int size;

  public Trade(String type, String ticker, double price, int size) {
    this.type = type;
    this.ticker = ticker;
    this.price = price;
    this.size = size;
  }

  @Override
  public String toString() {
    return "Trade{"
        + "type='"
        + type
        + '\''
        + ", ticker='"
        + ticker
        + '\''
        + ", price="
        + price
        + ", size="
        + size
        + '}';
  }
}

/serde/CustomJsonSerializer.java

package com.assu.study.chap14_2.serde;

import com.google.gson.Gson;
import java.nio.charset.Charset;
import java.util.Map;
import org.apache.kafka.common.serialization.Serializer;

public class CustomJsonSerializer<T> implements Serializer<T> {
  private Gson gson = new Gson();

  @Override
  public void configure(Map<String, ?> map, boolean b) {}

  @Override
  public byte[] serialize(String topic, T t) {
    return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
  }

  @Override
  public void close() {}
}

/serde/CustomJsonDeserializer.java

package com.assu.study.chap14_2.serde;

import com.google.gson.Gson;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;

public class CustomJsonDeserializer<T> implements Deserializer<T> {
  private Gson gson = new Gson();
  private Class<T> deserializedClass;

  public CustomJsonDeserializer(Class<T> deserializedClass) {
    this.deserializedClass = deserializedClass;
  }

  public CustomJsonDeserializer() {}

  @Override
  @SuppressWarnings("unchecked")
  public void configure(Map<String, ?> map, boolean b) {
    if (deserializedClass == null) {
      deserializedClass = (Class<T>) map.get("serializedClass");
    }
  }

  @Override
  public T deserialize(String s, byte[] bytes) {
    if (bytes == null) {
      return null;
    }

    return gson.fromJson(new String(bytes), deserializedClass);
  }

  @Override
  public void close() {}
}

/serde/CustomWrapperSerde.java

package com.assu.study.chap14_2.serde;

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

public class CustomWrapperSerde<T> implements Serde<T> {
    final private Serializer<T> serializer;
    final private Deserializer<T> deserializer;

    public CustomWrapperSerde(Serializer<T> serializer, Deserializer<T> deserializer) {
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        serializer.configure(configs, isKey);
        deserializer.configure(configs, isKey);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}

설정이 끝났으면 이제 토폴로지를 생성한다.

전체 코드

package com.assu.study.chap14_2;

import com.assu.study.chap14_2.model.Trade;
import com.assu.study.chap14_2.model.TradeStats;
import com.assu.study.chap14_2.serde.CustomJsonDeserializer;
import com.assu.study.chap14_2.serde.CustomJsonSerializer;
import com.assu.study.chap14_2.serde.CustomWrapperSerde;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.state.WindowStore;

// input 은 거래의 스트림임
// output 은 2개의 스트림임
// - 10초마다 최소 및 평균 매도 호가를 가짐
// - 매분 최소 매도 호가가 가장 낮은 상위 3개 주식
public class Chap142Application {

  public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException {
    // ===== 애플리케이션 설정
    Properties props;
    if (args.length == 1) {
      props = LoadConfig.loadConfig(args[0]);
    } else {
      props = LoadConfig.loadConfig();
    }
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stockstat-2");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, TradeSerde.class.getName());

    // 미리 로드된 동일한 데이터를 데모 코드로 재실행하기 위해 오프셋을 earliest 로 설정
    // 데모를 재실행하려면 오프셋 재설정 도구를 사용해야 함
    // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    // 집계 윈도우의 시간 간격(ms)
    long windowSize = 5000; // (5s)

    // AdminClient 를 생성하고 클러스터의 브로커 수를 확인하여 원하는 replicas 수를 알 수 있음
    AdminClient adminClient = AdminClient.create(props);
    DescribeClusterResult describeClusterResult = adminClient.describeCluster();
    int clusterSize = describeClusterResult.nodes().get().size();

    if (clusterSize < 3) {
      props.put("replication.factor", clusterSize);
    } else {
      props.put("replication.factor", 3);
    }

    // ===== 스트림즈 토폴로지 생성
    // StreamBuilder 객체를 생성하고, 앞으로 입력으로 사용할 토픽 지정
    StreamsBuilder builder = new StreamsBuilder();
    KStream<String, Trade> source = builder.stream(Constants.STOCK_TOPIC);

    // 토폴로지 생성
    KStream<Windowed<String>, TradeStats> stats =
        source
            // 입력 토픽에서 이벤트를 읽어오지만 그룹화는 하지 않음
            // 대신 이벤트 스트림이 레코드 키 기준으로 파티셔닝되도록 해줌
            // 이 경우 토픽에 데이터를 쓸 때의 키 값을 가지는 데이터를 사용하고, groupByKey() 를 호출하기 전에 변경하지 않았으므로
            // 데이터는 여전히 키 값을 기준으로 파티셔닝되어 있음
            .groupByKey() // KGroupedStream<String, Trade>
            // 윈도우 정의
            // 이 경우 윈도우는 5초의 길이를 갖고 있으며, 매초 전진함
            .windowedBy(
                TimeWindows.of(Duration.ofMillis(windowSize))
                    .advanceBy(Duration.ofSeconds(1))) // TimeWindowedKStream<Stream<String, Trade>>
            // 데이터가 원하는대로 파티셔닝되고 윈도우도 적용되었으면 집계 작업을 시작함
            // aggregate() 는 스트림을 서로 중첩되는 윈도우들도 나눈 뒤(여기서는 1초마다 겹치는 5초 길기의 시간 윈도우)
            // 각 윈도우에 배정된 모든 이벤트에 대해 집계 연산을 적용함
            .<TradeStats>aggregate(
                // 첫 번째 파라메터는 집계 결과를 저장할 새로운 객체를 받음, 여기서는 TradeStats
                // 이 객체는 각 시간 윈도우에서 알고자하는 모든 통계를 포함하기 위해 생성한 객체로 최저 매도가, 평균 매도가, 거래량을 포함함
                () -> new TradeStats(),
                // 실제로 집계를 수행하는 메서드 지정
                // 여기서는 새로운 레코드를 생성함으로써 해당 윈도우에서의 최저 매도가, 거래량, 매도 총량을 업데이트하기 위해 TradeStats.add() 를
                // 사용함
                (k, v, tradestats) -> tradestats.add(v),
                Materialized
                    // 윈도우가 적용된 집계 작업에서는 상태를 저장할 로컬 저장소를 유지할 필요가 있음
                    // aggregate() 의 마지막 파라메터는 상태 저장소 설정임
                    // Materialized 는 저장소를 설정하는데 사용되는 객체로서 여기서는 저장소의 이름을 trade-aggregates 로 함
                    .<String, TradeStats, WindowStore<Bytes, byte[]>>as("trade-aggregates")
                    // 상태 저장소 설정의 일부로서 집계 결과인 Tradestats 를 직렬화/역직렬화하기 위한 Serde 객체를 지정해주어야 함
                    .withValueSerde(new TradeStatsSerde())) // KTable<Windowed<String>, TradeStats>
            // 집계 결과는 종목 기호와 시간 윈도우를 기본키로, 집계 결과를 밸류값으로 하는 테이블임
            // 이 테이블을 이벤트 스트림으로 되돌릴것임
            .toStream() // KStream<Windowed<String>, TradeStats>
            // 평균 가격을 갱신해 줌
            // 현재 시점에서 집계 결과는 가격과 거래량의 합계를 포함함
            // 이 레코드를 사용하여 평균 가격을 계산한 뒤 출력 스트림으로 내보냄
            .mapValues((trade) -> trade.computeAvgPrice());

    // 결과를 stockstats-output 스트림에 씀
    // 결과물이 윈도우 작업의 일부이므로 결과물을 윈도우 타임스탬프와 함께 윈도우가 적용된 데이터 형식으로 저장하는 WindowedSerde 를 생성해줌
    // 윈도우 크기가 직렬화 과정에서 사용되는 것은 아니지만 출력 토픽에 윈도우의 시작 시간이 저장되기 때문에 역직렬화는 윈도우의 크기를 필요로하므로 Serde 의 일부로서
    // 전달함
    stats.to(
        "stockstats-output",
        Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class, windowSize)));

    // ===== 스트림 객체 생성 후 실행
    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, props);
    System.out.println(topology.describe());
    // 동작을 재설정하기 위한 것임
    // 프로덕션에서는 절대 사용하지 말 것
    // 시작할 때마다 애플리케이션이 Kafka 의 상태를 재로드함
    streams.cleanUp();

    streams.start();

    // SIGTERM 에 응답하고 카프카 스트림즈를 gracefully 하게 종료시키는 shutdown hook 추가
    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
  }

  // 키는 문자열이지만 밸류는 종목 코드, 매도 호가, 매도량을 포함하는 Trade 를 사용할 것이므로 이 객체를
  // 직렬화/역직렬화하기 위해 Gson 라이브러리를 사용해서 자바 객체에 대한 시리얼라이저/디시리얼라이저 생성
  public static final class TradeSerde extends CustomWrapperSerde<Trade> {
    public TradeSerde() {
      super(new CustomJsonSerializer<Trade>(), new CustomJsonDeserializer<>(Trade.class));
    }
  }

  public static final class TradeStatsSerde extends CustomWrapperSerde<TradeStats> {
    public TradeStatsSerde() {
      super(new CustomJsonSerializer<>(), new CustomJsonDeserializer<>());
    }
  }
}

Constants.java

package com.assu.study.chap14_2;

public class Constants {
  public static final String STOCK_TOPIC = "stocks";
}

/model/TradeStats.java

package com.assu.study.chap14_2.model;

public class TradeStats {
  String type;
  String ticker;
  int countTrades; //  평균 단가를 계산하기 위함
  double sumPrice;
  double minPrice;
  double avgPrice;

  public TradeStats add(Trade trade) {
    if (trade.type == null || trade.ticker == null) {
      throw new IllegalArgumentException("Invalid trade to aggregate: " + trade.toString());
    }

    if (this.type == null) {
      this.type = trade.type;
    }
    if (this.ticker == null) {
      this.ticker = trade.ticker;
    }

    if (!this.type.equals(trade.type) || !this.ticker.equals(trade.ticker)) {
      throw new IllegalArgumentException(
          "Aggregating stats for trade type: " + this.type + "and ticker: " + this.ticker);
    }

    if (countTrades == 0) {
      this.minPrice = trade.price;
    }

    this.countTrades = this.countTrades + 1;
    this.sumPrice = this.sumPrice + trade.price;
    this.minPrice = this.minPrice < trade.price ? this.minPrice : trade.price;

    return this;
  }

  // 평균 단가 계산
  public TradeStats computeAvgPrice() {
    this.avgPrice = this.sumPrice / this.countTrades;
    return this;
  }
}

이 예시는 윈도우가 적용된 집계 연산을 스트림에 대해 수행하는 방법을 보여주며, 아마도 이 예시가 가장 많이 사용되는 스트림 처리 사례일 것이다.
집계 작업의 로컬 상태를 유지하기 위해 단지 Serde 와 상태 저장소의 이름만 지정해주면 된다.

이 애플리케이션은 여러 인스턴스로 확장이 가능하면서도 각 인스턴스에 장애가 발생할 경우, 일부 파티션에 대한 처리 작업을 다른 인스턴스로 이전함으로써 자동으로 복구된다.

장애 발생 시 자동으로 복구되는 내용에 대해서는 추후 다룰 예정입니다. (p. 450)


1.3. 클릭 스트림 확장: 스트리밍 조인

소스는 github 에 있습니다.

여기서는 웹사이트 클릭 스트림을 확장하는 스트리밍 조인에 대해 알아본다.

모의 클릭 스트림, 가상의 프로필 DB 테이블에 대한 업데이트 스트림, 웹 검색 스트림을 생성한 후 사용자 행동에 대한 종합적인 뷰를 얻기 위해 이 셋을 조인할 것이다.
이러한 조인은 사용자들이 무엇을 검색하고, 검색 결과 중 무엇을 클릭하는지 등 분석 작업을 위한 풍부한 데이터 집합을 제공한다.
상품 추천은 보통 이러한 종류의 정보에 기반한다.

package com.assu.study.chap14_3;

import com.assu.study.chap14_3.model.PageView;
import com.assu.study.chap14_3.model.Search;
import com.assu.study.chap14_3.model.UserActivity;
import com.assu.study.chap14_3.model.UserProfile;
import com.assu.study.chap14_3.serde.CustomJsonDeserializer;
import com.assu.study.chap14_3.serde.CustomJsonSerializer;
import com.assu.study.chap14_3.serde.CustomWrapperSerde;
import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;

public class Chap143Application {

  public static void main(String[] args) throws InterruptedException {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "clicks");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.BROKER);

    // 스트림의 각 단계는 서로 다른 객체가 포함되기 때문에 기본 Serde 사용 불가

    // 미리 로드된 동일한 데이터를 데모 코드로 재실행하기 위해 오프셋을 earliest 로 설정
    // 데모를 재실행하려면 오프셋 재설정 도구를 사용해야 함
    // https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    StreamsBuilder builder = new StreamsBuilder();

    // ===== 다수의 스트림을 조인하는 토폴로지

    // 조인하고자 하는 2개의 스트림 객체인 클릭과 검색 스트림 생성
    // 스트림 객체 생성 시 입력 토픽 뿐 아니라 토픽 데이터를 읽어서 객체로 역직렬화할 때 사용될 키, 밸류에 대한 Serde 역시 지정해주어야 함
    KStream<Integer, PageView> views =
        builder.stream(
            Constants.PAGE_VIEW_TOPIC, Consumed.with(Serdes.Integer(), new PageViewSerde()));

    KStream<Integer, Search> searches =
        builder.stream(Constants.SEARCH_TOPIC, Consumed.with(Serdes.Integer(), new SearchSerde()));

    // 사용자 프로필을 저장할 KTable 객체 정의
    // KTable 은 변경 스트림에 의해 갱신되는 구체화된 저장소(materialized store)임
    KTable<Integer, UserProfile> profiles =
        builder.table(
            Constants.USER_PROFILE_TOPIC, Consumed.with(Serdes.Integer(), new ProfileSerde()));

    // 스트림-테이블 조인
    KStream<Integer, UserActivity> viewsWithProfile =
        // 클릭 스트림을 사용자 프로필 정보 테이블과 조인함으로써 확장함
        // 스트림-테이블 조인에서 스트림의 각 이벤트는 프로필 테이블의 캐시된 사본에서 정보를 받음
        // left join 이므로 해당하는 사용자 정보가 없는 클릭도 보존됨
        views.leftJoin(
            profiles,
            (page, profile) -> {
              if (profile != null) {
                // 조인 메서드임
                // 스트림과 레코드에서 하나씩 값을 받아서 또 다른 값을 리턴함
                // 두 개의 값을 결합해서 어떻게 하나의 결과로 만들지 여기서 결정해야 함
                // 여기서는 사용자 프로필과 페이지 뷰 둘 다 포함하는 하나의 UserActivity 객체를 생성함
                return new UserActivity(
                    profile.getUserId(),
                    profile.getUserName(),
                    profile.getZipcode(),
                    profile.getInterests(),
                    "",
                    page.getPage());
              } else {
                return new UserActivity(-1, "", "", null, "", page.getPage());
              }
            });

    // 스트림-스트림 조인
    KStream<Integer, UserActivity> userActivityKStream =
        // 같은 사용자에 의해 수행된 클릭 정보와 검색 정보 조인
        // 이번엔 스트림을 테이블에 조인하는 것이 아니라 두 개의 스트림을 조인하는 것임
        viewsWithProfile.leftJoin(
            searches,
            (userActivity, search) -> {
              if (search != null) {
                // 조인 메서드임
                // 단순히 맞춰지는 모든 페이지 뷰에 검색어들을 추가해줌
                userActivity.updateSearch(search.getSearchTerms());
              } else {
                userActivity.updateSearch("");
              }
              return userActivity;
            },
            // 스트림-스트림 조인은 시간 윈도우를 사용하는 조인이므로 각 사용자의 모든 클릭과 검색을 조인하는 것은 적절하지 않음
            // 검색 이후 짧은 시간 안에 발생한 클릭을 조인함으로써 검색과 거기 연관된 클릭만을 조인해야 함
            // 따라서 1초 길이의 조인 윈도우를 정의(= 검색 전과 후의 1초 길이의 윈도우)한 뒤 0초 간격으로 before() 를 호출해서 검색 후 1초 동안
            // 발생한 클릭만 조인함
            // 이 때 검색 전 1초는 제외됨
            // 그 결과는 관련이 있는 클릭과 검색어, 사용자 프로필을 포함하게 됨
            // 즉, 검색과 그 결과 전체에 대해 분석 수행이 가능해짐
            JoinWindows.of(Duration.ofSeconds(1)).before(Duration.ofSeconds(0)),
            // 조인 결과에 대한 Serde 를 정의함
            // 조인 양쪽에 공통인 키 값에 대한 Serde 와 조인 결과에 포함될 양쪽의 밸류값에 대한 Serde 를 포함함
            // 여기서는 키는 사용자 id 이므로 단순한 Integer 형 Serde 를 사용함
            StreamJoined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde()));

    userActivityKStream.to(
        Constants.USER_ACTIVITY_TOPIC, Produced.with(Serdes.Integer(), new UserActivitySerde()));

    KafkaStreams streams = new KafkaStreams(builder.build(), props);

    // 동작을 재설정하기 위한 것임
    // 프로덕션에서는 절대 사용하지 말 것
    // 시작할 때마다 애플리케이션이 Kafka 의 상태를 재로드함
    streams.cleanUp();

    streams.start();

    Thread.sleep(60_000L);

    // 잠시 뒤 멈춤
    streams.close();
  }

  public static final class PageViewSerde extends CustomWrapperSerde<PageView> {
    public PageViewSerde() {
      super(new CustomJsonSerializer<>(), new CustomJsonDeserializer<>(PageView.class));
    }
  }

  public static final class ProfileSerde extends CustomWrapperSerde<UserProfile> {
    public ProfileSerde() {
      super(new CustomJsonSerializer<>(), new CustomJsonDeserializer<>(UserProfile.class));
    }
  }

  public static final class SearchSerde extends CustomWrapperSerde<Search> {
    public SearchSerde() {
      super(new CustomJsonSerializer<>(), new CustomJsonDeserializer<>(Search.class));
    }
  }

  public static final class UserActivitySerde extends CustomWrapperSerde<UserActivity> {
    public UserActivitySerde() {
      super(new CustomJsonSerializer<>(), new CustomJsonDeserializer<>(UserActivity.class));
    }
  }
}

위 예시는 스트림 처리에서 가능한 2가지 서로 다른 조인 패턴을 보여준다.

  • 스트림과 테이블을 조인함으로써 스트림 이벤트를 테이블에 저장된 정보로 확장시킴
  • 시간 윈도우를 기준으로 2개의 스트림을 조인함

/model/PageView.java

package com.assu.study.chap14_3.model;

public class PageView {
  int userId;
  String page;

  public PageView(int userId, String page) {
    this.userId = userId;
    this.page = page;
  }

  public int getUserId() {
    return userId;
  }

  public String getPage() {
    return page;
  }
}

/model/Search.java

package com.assu.study.chap14_3.model;

public class Search {
  int userId;
  String searchTerms;

  public Search(int userId, String searchTerms) {
    this.userId = userId;
    this.searchTerms = searchTerms;
  }

  public int getUserId() {
    return userId;
  }

  public String getSearchTerms() {
    return searchTerms;
  }
}

/model/UserActivity.java

package com.assu.study.chap14_3.model;

public class UserActivity {
  int userId;
  String userName;
  String zipcode;
  String[] interests;
  String searchTerm;
  String page;

  public UserActivity(
      int userId,
      String userName,
      String zipcode,
      String[] interests,
      String searchTerm,
      String page) {
    this.userId = userId;
    this.userName = userName;
    this.zipcode = zipcode;
    this.interests = interests;
    this.searchTerm = searchTerm;
    this.page = page;
  }

  public UserActivity updateSearch(String searchTerm) {
    this.searchTerm = searchTerm;
    return this;
  }
}

/model/UserProfile.java

package com.assu.study.chap14_3.model;

public class UserProfile {
  int userId;
  String userName;
  String zipcode;
  String[] interests;

  public UserProfile(int userId, String userName, String zipcode, String[] interests) {
    this.userId = userId;
    this.userName = userName;
    this.zipcode = zipcode;
    this.interests = interests;
  }

  public UserProfile update(String zipcode, String[] interests) {
    this.zipcode = zipcode;
    this.interests = interests;
    return this;
  }

  public int getUserId() {
    return userId;
  }

  public String getUserName() {
    return userName;
  }

  public String getZipcode() {
    return zipcode;
  }

  public String[] getInterests() {
    return interests;
  }
}

/model/UserWindow.java

package com.assu.study.chap14_3.model;

public class UserWindow {
  int userId;
  long timestamp;
}

참고 사이트 & 함께 보면 좋은 사이트

본 포스트는 김병부 저자의 O’REILLY 카프카 핵심 가이드 2판를 기반으로 스터디하며 정리한 내용들입니다.






© 2020.08. by assu10

Powered by assu10