Apache Kafka - 높은 처리량과 실시간으로 대량의 데이터를 취급하는 카프카

Apache Kafka는 여러 대의 분산 서버에서 대량의 데이터를 처리하는 분산 메시징 시스템입니다. 메시지를 받고, 받은 메시지를 다른 시스템이나 장치에 보내기 위해 사용하는데요, `높은 처리량과 실시간으로 대량의 데이터를 취급`하기 위해 초점을 맞췄으나 이제는 기능과 신뢰성을 향상시켜 현재는 데이터가 들어오는 대로 순차 실행하는 플랫폼이 되고 있습니다. 애플, 드롭박스, 넷플릭스, 트위터, 우버 등에서 카프카를 사용하고 있습니다.

오픈소스인 카프카는 링크드인에서 개발했습니다. 링크드인 웹사이트에서 생성되는 로그를 처리하여 웹사이트 활동을 추적하는 것을 목적으로 개발했습니다. 링크드인에서 누군가의 프로필을 조회하거나 아티클을 보거나 특정 키워드로 검색하거나 혹은 광고를 클릭하는 등의 모든 행위가 낳는 로그들은 매우 방대했습니다. 우리는 이것을 빅데이터라고 부릅니다. 이 빅데이터 파이프라인에서 카프카는 주요 멤버로 자리매김했습니다.

이처럼 방대한 양의 데이터를 카프카는 비동기로 처리할 수 있는 시스템으로 주목받고 있습니다. 현재 재직 중인 회사에서도 트래픽이 증가하고 새로운 프로세스들이 추가되면서 다양한 정보, 이벤트 로그가 기하급수적으로 늘어나 데이터 처리가 점점 방대해지면서 기존 RabbitMQ에서 Kafka로 마이그레이션 하면서 카프카를 접했습니다.

카프카를 사용하면서 주요하다고 생각한 개념과 실전 경험을 요약하여 정리했습니다.
그럼 함께 살펴보시죠 :)

카프카 특징

  • 높은 처리량으로 실시간 처리
    • 방대한 엑세스 데이터를 처리해야 하기에 처리량이 우수해야 함
    • 사용자의 활동을 신속하게 파악하거나 사용자의 활동에 따라 즉시 피드백하기 위함
    • 실시간 처리는 수집부터 시작해 수백 밀리초에서 수 초 안에 데이터가 처리돼야 함
  • 임의의 타이밍에서 데이터를 읽음
    • 실시간이 아니어도 배치 처리를 할 수 있어야 함
  • 다양한 제품과 시스템에 쉽게 연동
    • 데이터베이스나 데이터 웨어하우스 등 다른 제품과의 연결이 쉬워야 함
  • 메시지를 잃지 않아야 함
    • 약간의 중복이 있더라도 데이터를 잃지 않는 것이 중요 / At Least Once
      • 건마다 엄격하게 관리하면 처리 오버헤드가 커짐
    • 높은 처리량으로 실시간 처리라는 요건과의 균형을 가미함

카프카는 높은 처리량으로 실시간 처리하고 임의의 타이밍에 데이터를 읽고 다양한 제품과 시스템에 쉽게 연동하기 위해 메시징 모델을 채용합니다. 일반적인 메시징 모델은 다음 세 가지 요소로 구성됩니다.

  • Producer : 메시지 생산자
  • Broker : 메시지 수집/전달 역할
  • Consumer : 메시지 소비자

카프카는 Message Queuing Model과 Pub/Sub Model의 특징을 겸비했습니다. 큐잉 모델에서 실현한 여러 컨슈머가 분산 처리로 메시지를 소비하는 모델을 도입했고, 펍/섭 메시징 모델에서 실현한 여러 subscriber들에게 동일한 메시지를 전달하고 토픽 기반으로 전달 내용을 변경하는 모델은 카프카에서 컨슈머 그룹이라는 개념을 도입하게 되었습니다.

카프카는 디스크 영속화를 함에도 높은 처리량을 제공하는 특징입니다. 데이터를 받아들이면서 한 묶음으로 장기 보존을 목적으로 영속화하기 때문에 카프카를 스토리지 시스템으로도 간주 할 수 있습니다. 그래서 카프카는 메시지를 잃지 않는 전달 보증이 특징으로 Ack와 Offset Commit 개념이 같이 생겼습니다.

  • Ack
    • 브로커가 메시지를 수신했을 때 프로듀서에게 수신 완료했다는 응답
  • Offset Commit
    • 컨슈머가 브로커로부터 메시지를 받을 때 컨슈머가 어디까지 메시지를 받았는지 관리


카프카 5가지 구성 요소

  • Broker
    • 데이터를 수신, 전달하는 서비스
  • Message
    • 카프카에서 다루는 데이터의 최소 단위
    • 메시지는 Key와 Value를 갖게 되며 나중에 언급할 메시지 전송할 때 파티셔닝에 이용
  • Producer
    • 데이터의 생산자이며 브로커에 메시지를 보내는 애플리케이션
  • Consumer
    • 브로커에서 메시지를 취득하는 애플리케이션
  • Topic
    • 메시지를 종류별로 관리하는 스토리지
    • 브로커에 배치되어 관리됨
    • 프로듀서와 컨슈머는 특정 토픽을 지정하여 메시지를 송수신함으로써 단일 카프카 클러스터에서 여러 종류의 메시지를 중계함

더 상세히 살펴보는 카프카 구성 요소

Broker / 브로커

브로커는 하나의 서버(또는 인스턴스) 당 하나의 데몬 프로세스로 동작하여 메시지 수신/전달 요청을 받아들입니다.
이것을 여러 대의 클러스터로 구성할 수 있으며 브로커를 추가함으로써 수신/전달의 처리량 향상, 스케일 아웃이 가능합니다.
브로커에서 받은 데이터는 모두 디스크로 내보내기(영속화)가 이루어져 디스크의 총 용량에 따라 장기간 데이터를 보존할 수 있습니다.

Partition / 파티션

토픽에 대한 대량의 메시지 입출력을 지원하기 위해, 브로커 상의 데이터를 읽고 쓰는 것은 파티션이라는 단위로 분할합니다.
토픽을 구성하는 파티션은 브로커 클러스터 안에 분산 배치되며 프로듀서로부터 메시지 수신, 컨슈머에게 배달을 분산 실시함으로써 하나의 토픽에 대한 대규모 데이터 수신과 전달을 지원하게 됩니다.

  • 적정 파티션 수
    • 구성 및 요구 사항에 따라 다르기 때문에 시스템을 설계할 때 고려
      • 메시지 처리 속도, 컨슈머 그룹 내 컨슈머 개수, 컨슈머내 스레드 수 등을 동시에 고려해야 함
    • 파티션 수는 증가할 수는 있지만 한 번 증가한 파티션 수는 다시 줄일 수 없음

Producer / 프로듀서

프로듀서는 프로듀서 API를 이용하여 브로커에 데이터를 송신하기 위해 구현된 애플리케이션입니다.
각종 로그 전송 및 미들웨어와 연동하여 동작하기 때문에 프로듀서 API를 내포한 도구, 미들웨어를 통해 이용하는 형태 등으로 다양합니다.

  • 프로듀서가 토픽의 파티션에 메시지를 송신할 때 버퍼 기능처럼 프로듀서의 메모리를 이용하여 일정량을 축적 후 송신
  • 데이터의 송신에 대해서는 지정한 크기까지 메시지가 축적되거나
    • batch.size
  • 지정한 대기 시간에 도달하는 것 중 하나를 트리거로 전송
    • linger.ms
  • 토픽에 메시지 전송 시 파티셔닝
    • Key의 해시값을 사용한 송신
      • 메시지는 Key와 Value로 이뤄져 있는데 이 Key를 이용하여 송신처 파티션을 결정
      • 동일한 Key를 가진 메시지는 동일한 ID를 가진 파티션에 송신
      • partitionkey를 이용하면 메시지 순서 보장이 가능하나 대신 imbalance를 챙겨야 함
    • 라운드 로빈에 의한 송신
      • 메시지 Key를 지정하지 않고 Null로 한 경우 여러 파티션으로 메시지 송신을 라운드 로빈 방식으로 실행
  • callback을 통해 브로커로 메시지 송신 결과를 비동기 처리 할 수 있음
    • future.get()을 하면 브로커 상태와 설정에 따라 쓰레드가 무한 점유될 수 있으므로 매우 위험
    • CompletableFuture를 이용

Consumer

컨슈머 API를 이용해 브로커에서 메시지를 취득하도록 구현된 애플리케이션입니다. 브로커는 메시지를 디스크에 영속화하기 위해 브로커에 도달하는 즉시 컨슈머에서 취득해야 하는 제약이 없어 디스크에 보관된 동안은 메시지 취득이 가능합니다. 일정 기간 데이터를 축적한 스토리지에서 데이터 추출 및 시간 처리를 위한 애플리케이션의 데이터 입력 등으로 이용됩니다.

  • RetryTemplate 등을 통해 컨슈머 장애시 재처리를 시도하도록 해야함
import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class FirstAppProducer {

    private static String topicName = "first-app";

    public static void main(String[] args) {

        // 1. KafkaProducer에 필요한 설정
        Properties conf = new Properties();
        conf.setProperty("bootstrap.servers", "kafka-broker01:9092,kafka-broker02:9092,kafka-broker03:9092");
        conf.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        conf.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. Kafka 클러스터에 메시지를 송신(produce)하는 객체를 생성
        Producer<Integer, String> producer = new KafkaProducer<>(conf);

        int key;
        String value;
        for(int i = 1; i <= 100; i++) {
            key = i;
            value = String.valueOf(i);

            // 3. 송신할 메시지를 생성
            ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, key, value);

            // 4. 메시지를 송신하고, Ack을 받았을 때에 실행할 작업(Callback)을 등록한다
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if( metadata != null) {
                        // 송신에 성공한 경우의 처리
                        String infoString = String.format("Success partition:%d, offset:%d", metadata.partition(), metadata.offset());
                        System.out.println(infoString);
                    } else {
                        // 송신에 실패한 경우의 처리
                        String infoString = String.format("Failed:%s", e.getMessage());
                        System.err.println(infoString);
                    }
                }
            });
        }

        // 5. KafkaProducer를 클로즈하여 종료
        producer.close();
    }
}

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class FirstAppConsumer {

    private static String topicName = "first-app";

    public static void main( String[] args ) {

        // 1. KafkaConsumer에 필요한 설정
        Properties conf = new Properties();
        conf.setProperty("bootstrap.servers", "kafka-broker01:9092,kafka-broker02:9092,kafka-broker03:9092");
        conf.setProperty("group.id", "FirstAppConsumerGroup");
        conf.setProperty("enable.auto.commit", "false");
        conf.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        conf.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2. Kafka클러스터에서 Message를 수신(Consume)하는 객체를 생성
        Consumer<Integer, String> consumer = new KafkaConsumer<>(conf);

        // 3. 수신(subscribe)하는 Topic을 등록
        consumer.subscribe(Collections.singletonList(topicName));

        for(int count = 0; count < 300; count++) {
            // 4. Message를 수신하여, 콘솔에 표시한다
            ConsumerRecords<Integer, String> records = consumer.poll(1);
            for(ConsumerRecord<Integer, String> record: records) {
                String msgString = String.format("key:%d, value:%s", record.key(), record.value());
                System.out.println(msgString);

                // 5. 처리가 완료한 Message의 Offset을 Commit한다
                TopicPartition tp = new TopicPartition(record.topic(), record.partition());
                OffsetAndMetadata oam = new OffsetAndMetadata(record.offset() + 1);
                Map<TopicPartition, OffsetAndMetadata> commitInfo = Collections.singletonMap(tp, oam);
                consumer.commitSync(commitInfo);
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex ) {
                ex.printStackTrace();
            }
        }

        // 6. KafkaConsumer를 클로즈하여 종료
        consumer.close();
    }
}

Consumer Group

카프카에서는 컨슈머가 카프카 클러스터에서 메시지를 얻어 처리합니다. 이때 컨슈머는 컨슈머그룹이라 불리는 하나 이상의 컨슈머들로 이루어진 그룹을 형성하여 메시지를 얻습니다. 컨슈머 그룹은 Group ID라는 ID로 구분됩니다. 이 Group ID는 KafkaConsumer를 생성할 떄 지정하는 옵션으로 group.id라는 파라미터로 지정하며, 특정 컨슈머는 여러 컨슈머 그룹에 속하지 않고 항상 하나의 컨슈머 그룹에 속합니다.
카프카 클러스터에서 수신할 메시지는 컨슈머 그룹 안에서 어느 하나의 컨슈머가 수신합니다. 즉, 카프카 클러스터에서 수신할 메시지를 동일 컨슈머 그룹에 속하는 컨슈머 사이에서 분산하여 수신합니다. 이는 컨슈머에서 분산 스트림 처리도 고려해 설계된 것입니다.

메시지를 컨슈머 그룹의 어느 컨슈머가 수신하는가에 대한 할당은 수신할 토픽에 존재하는 파티션과 그룹 내 컨슈머를 매핑함으로써 가능합니다. 컨슈머와 파티션의 매핑은 각 파티션에 반드시 하나의 컨슈머가 매핑됩니다. 반대로 파티션 수에 따라 하나의 컨슈머에 여러 파티션이 할당되는 경우가 있습니다. 컨슈머 그룹에서 기대한 대로 분산하여 메시지를 수신하기 위해서는 파티션 수가 적어도 각 컨슈머 그룹에 속하는 컨슈머보다 많아야 합니다. 토픽의 파티션보다 컨슈머 쪽이 많을 경우 파티션이 할당되지 않은 컨슈머가 발생할 수 있습니다.

컨슈머를 생성할 때 partition.assignment.strategy 옵션을 활용하면 Assignor1를 선택할 수 있습니다.

Offset

  • 각 파티션에서 수신한 메시지에는 각각 일려번호를 부여
  • 파티션 단위로 메시지 위치를 나타내는 오프셋이라는 관리 정보를 이용해 컨슈머가 취득하는 메시지의 범위 및 재시도를 제어
  • 오프셋 설정 값 종류
    • Log-End-Offsset(LEO) : 파티션 데이터의 끝
    • Current Offset : 컨슈머가 어디까지 메시지를 읽었는가를 나타냄
    • Commit Offset : 컨슈머가 어디까지 커밋했는지를 나타냄
      • 컨슈머는 메시지에 대해 확인했음을 다시 브로커에게 알리는데 이것이 바로 Commit Offset임
  • 자동 오프셋 리셋
    • 컨슈머가 시작할 때 오프셋 커밋 기록이 존재하지 않거나 기록되어 있어도 유효하지 않은 경우 사용하는 정책
      • 유효하지 않은 경우는 메시지가 retention이 지나 없는 경우
    • 컨슈머 애플리케이션에서 컨슈머를 생성할 때 지정하는 auto.offset.reset=earliest 옵션으로 설정
    • 정책자동 오프셋 리셋의 동작
      latest- 해당 파티션의 가장 새로운 오프셋으로 초기화- 카프카 클러스터에 이미 존재하는 메시지는 처리 되지 않음
      earliest- 해당 파티션에 존재하는 가장 오래된 오프셋으로 초기화- 카프카 클러스터에 이미 존재하는 메시지 모두에 대해 처리 실시
      none- 유효한 오프셋 커밋 정보가 없는 경우에 예외를 반환- seek() 메서드 등으로 명시적으로 오프셋을 지정해야 함

Offset Commit

컨슈머는 어느 메시지까지 처리를 완료했는지 카프카 클러스터에 기록을 남길 수 있습니다. 정확하게는 다음 수신 및 처리해야 할 메시지의 오프셋 기록입니다.
오프셋 커밋의 기록은 컨슈머 그룹 단위로 이루어집니다. 컨슈머 그룹마다 각 토픽의 파티션에서 어느 오프셋까지 처리 완료했는지 정보를 기록합니다. 오프셋 커밋은 처리 완료 여부를 메시지마다 기록하는 것이 아니라 처리를 완료한 메시지 중에서 최대의 오프셋을 기록하는 형태로 이루어집니다.
이것은 카프카가 임의로 메시지를 처리하는 것이 아니라 파티션 안의 메시지를 연속적으로 처리하는 것을 가정하고 있기 때문입니다.

  • 중간에 메시지가 처리되지 않을 경우 무한 루프가 돌 수 있음 (수기로 메시지 커밋 하는 경우)
  • __consumer_offsets 라는 전용 토픽에 기록
  • Auto Offset Commit
    • 자동 오프셋 커밋은 일정 간격마다 자동으로 오프셋 커밋을 하는 방식
    • enable.auto.commit=true 설정
    • auto.commit.interval.ms=5000 설정을 통해 오프셋 커밋의 간격 설정. 기본 5초
    • 장점 : 컨슈머는 오프셋 커밋을 명시적으로 실시할 필요가 없음
    • 단점 : 컨슈머 장애가 발생했을 때 메시지가 손실되거나 메시지 중복이 발생할 수 있음
  • Manual Offset Commit
    • enable.auto.commit=false 설정
    • commitSync() 메서드를 통해 오프셋을 수동으로 커밋할 수 있음
    • 애플리케이션 안에서 언제라도 오프셋 커밋을 수행할 수 있음
    • 카프카 클러스터에서 메시지 취득 후 메시지 처리가 완료한 시점에서 커밋
    • 장점
      • 해당 메시지 처리는 반드시 완료되어 있기 때문에 메시지 손실이 발생하지 않음
      • 컨슈머 장애 발생 시 메시지 중복을 최소화 할 수 있음
    • 단점
      • 메시지 양에 따라 다르지만, 수동 오프셋 커밋이 자주 커밋 처리를 하므로 카프카 클러스터 부하가 높아진다는 점에는 주의가 필요

ZooKeeper

카프카의 브로커에 있어 분산 처리를 위한 관리 도구로 Apache ZooKeeper가 필요합니다. 주키퍼는 하둡 등 병렬 분산 처리용 OSS에 있어서 설정 관리, 이름 관리, 동기화를 위한 잠금 관리를 하는 구조로 자주 사용됩니다. 카프카에 있었어서는 분산 메시징의 메타 데이터(토픽과 파티션 등)를 관리하기 위한 구성 요소로 기능합니다. 주키퍼 클러스터는 주키퍼 앙상블이라고도 하며 구조상 3,5 처럼 홀수로 구성하는 것이 일반적입니다.

카프카 클러스터

카프카는 여러 대의 브로커 서버, 주키퍼 서버로 이루어진 클러스터링의 메시지 중계 기능과 메시지 송수신을 위한 라이브러리 그룹으로 구성됩니다.

카프카 데이터 견고함을 담보하는 복제의 구조

Replica / 레플리카

카프카는 메시지를 중계함과 동시에 서버가 고장 났을 때 수신한 메시지를 잃지 않기 위해 복제(Replication) 구조를 갖추고 있습니다. 레플리카 중 하나는 Leader이며, 나머지는 Follower라고 불립니다. Follower는 그 이름대로 Leader로부터 메시지를 계속적으로 취득하여 복제를 유지하도록 동작합니다. 다만 프로듀서/컨슈머와의 데이터 교환은 Leader가 맡고 있습니다.

Leader Replica의 복제 상태를 유지하고 있는 레플리카는 In-Sync Replica(ISR)로 분류됩니다. 또한 복제 수와는 독립적으로 최소 ISR 수(min.insync.replicas) 설정이 가능합니다. 고장 등으로 인한 일시적인 동기 지연을 허용하여 전체 읽고 쓰기를 계속하는 것이 가능합니다.

복제 사용 시 오프셋 관리에는 LEO(Log End Offset) 이외에 High Watermark라는 개념이 있습니다. Hight Watermark는 복제가 완료된 오프셋이며, 그 성질에서 반드시 LEO와 동일하거나 오래된 오프셋을 나타냅니다. 컨슈머는 High Watermark까지 기록된 메시지를 취득할 수 있습니다.

Ack

복제에 대한 중요한 구성 요소로 프로듀서의 메시지 송신 시 Ack 설정에 대해 알아보겠습니다. 브로커에서 프로듀서로 메시지가 송신된 것을 나타내는 Ack를 어느 타이밍에 송신할 것인지를 제어하는 것은 성능과 내장애성(브로커 서버 고장 시 데이터 분실 방지)에 큰 영향을 줍니다.

Ack 설정설명
0프로듀서는 메시지 송신 시 Ack를 기다리지 않고 다음 메시지를 송신함
1Leader Replica에 메시지가 전달되면 Ack를 반환
All모든 ISR의 수만큼 복제되면 Ack를 반환

프로듀서는 타임아웃 설정으로 Ack가 돌아오지 않고 타임아웃된 Send 처리를 ‘송신 실패’로 감지합니다. 참고로 Ack를 1 또는 all로 설정했을 경우 변환 타이밍이 의미하는 것은 각 복제에 ‘메시지가 전달’된 것으로 판단해 Ack를 반환하는 타이밍입니다. 이 타이밍에는 메시지가 디스크에 flush되는 것이 아니라 메모리(OS 버퍼)에 기록됩니다. 디스크에 flush하는 영속화 타이밍은 다른 속성에서 제어합니다.

In-Sync Replica

min.insync.replicas 설정은 서버 고장 시 ‘메시지를 잃지 않는 것’과 ‘메시징 시스템을 포함한 전체 시스템의 처리를 계속하는 것’사이의 균형을 조정합니다.
아래 예시 중 어느 것이 뛰어나다는 것이 아니라 시스템 요구 사항과 제약 조건에 의해 결정돼야 한다는 점에 주의해야 합니다.

In-Sync Replica와 Ack = all, 쓰기 계속성의 관계
브로커는 4대 레플리카 수는 3으로 브로커 1대가 고장나 레플리카를 하나 잃어버린 경우
설정상황
min.insync.replicas=3 Ack=all인 경우브로커 서버가 1대 고장난 경우 프로듀서는 비정상 상태로 간주하여잃어버린 레플리카가 ISR로 복귀할 때까지 데이터를 쓸 수 없음
min.insync.replicas=2 Ack=all인 경우- 브로커 서버가 1대 고장난 경우에도 Ack를 반환하고 처리하고 처리를 계속함 - 처리를 계속하는 점에 있어서는 위보다 나은 반면 - 나중에 추가된 파티션이 복제를 완료해 ISR로 승격될 떄까지 복제수가 2가 됨 - 복구 전에 2대가 고장난 경우는 처리 중인 메시지를 손실할 위험이 높아짐

min.insync.replicas는 프로듀서와 메시지를 보낼 때 송신처 파티션의 복제본 중 ISR에 속하는 복제본이 최소 몇 개나 필요한지를 설정하는 브로커와 토픽의 구성입니다. 브로커는 메시지를 수신한 직후에 특정 브로커에 장애가 발생했다고 해도 메시지를 분실하지 않기 위한 안정장치 역할을 합니다. 장애 허용 대수만큼의 브로커에 장애가 발생했을 때 카프카 클러스터에 영향 없이 서비스를 계속하기 위해서는 모든 파티션에서 ISR에 속하는 복제본의 수가 min.insync.replicas 수 이상이어야 합니다.


위 내용은 실전 아파치 카프카를 읽고 제 경험을 추가하여 정리한 내용으로 카프카 도입 예정이신 분들은 읽어보시면 도움 될 책입니다.
그럼 다음 편에서는 카프카 예제 코드로 찾아 뵙겠습니다.

참고하면 좋은 글