개발노트

25.03.11 Kafka 란? 본문

MSA

25.03.11 Kafka 란?

ddong-kka 2025. 3. 11. 00:18

개요

오늘부터 프로젝트를 시작하게되었다 아마도 RabbitMQ 보단 kafka를 쓰게될 것 같다.

redis도 kafka 도 처음 프로젝트에 적용하는거라 잘할 수 있을지 걱정이 앞선다.

확실히 이해하고 넘어가기위해서 정리하면서 복습해본다.

 

Kafka 란?

kafka는 분산 스트리밍 플랫폼이자 메시지 큐 시스템이다. 원래 LinkedIn에서 로그 데이터를 처리하기 위해 개발되었으며,

이후 Apach 프로젝트로 오픈 소스화 되었다. kafka는 주로 대규모 데이터 파이프라인 구축, 로그 수집, 실시간 데이터 스트리밍 및 메시지 큐로 사용된다.

 

kafka의 주요 특징은 내구성, 확장성, 고가용성이다. 데이터는 파티션을 통해 분산 저장되며, 여러 브로커에서 데이터가 복제되어 장애에 강하게 설계되어있다.


 

Kafka의 주요 구성 요소

  • Producer: Kafka에 데이터를 전송하는 클라이언트 애플리케이션입니다. Producer는 특정 토픽(Topic)에 메시지를 전송한다.
  • Consumer: Kafka에서 메시지를 읽어오는 클라이언트 애플리케이션입니다. Consumer는 컨슈머 그룹을 통해 메시지를 읽어온다.
  • Broker: Kafka 서버로, 데이터를 저장하고 Producer와 Consumer 간의 메시지를 전달하는 역할을 합니다.
  • Topic: 메시지가 저장되는 논리적인 채널이다. Producer는 특정 토픽에 메시지를 전송하고, Consumer는 토픽에서 메시지를 읽어온다.
  • Partition: 토픽 내의 실제 데이터 저장 단위입니다. 각 토픽은 하나 이상의 파티션을 가질 수 있습니다. 파티션은 데이터를 분산하고 병렬 처리를 가능하게 합니다.
  • Zookeeper: Kafka의 메타데이터를 관리하는 분산 코디네이터입니다. Kafka 2.x부터는 KRaft(Kafka Raft) 모드로 Zookeeper 없이 운영할 수 있는 기능도 제공됩니다.

Kafka의 메시징 모델

publish-subscribe 모델을 사용한다.. Producer는 메시지를 특정 토픽에 발행(publish)하고, Consumer는 해당 토픽에서 메시지를 구독(subscribe)하여 읽어옵니다. 다수의 Consumer는 컨슈머 그룹을 구성하여 병렬로 메시지를 처리할 수 있습니다.

 

RabbitMQ와 비슷한듯 다른 느낌?

 


 

Kafka의 메시지 전송 흐름

  1. Producer는 Kafka 클러스터의 특정 토픽에 메시지를 보낸다.
  2. 메시지는 파티션에 분산 저장된다.
  3. Consumer는 토픽에서 메시지를 읽어온다. 하나의 Consumer 그룹 안에서 메시지는 한 번만 처리된다.
  4. Kafka Broker는 메시지를 안전하게 저장하고 여러 브로커 간에 메시지를 복제하여 고가용성을 보장한다.

Spring에서 Kafka를 사용하는 예시

의존성 추가

dependencies {
	implementation 'org.springframework.kafka:spring-kafka'
}

 

 

application.yml  kafka 설정

spring.application.name=producer
server.port=8090

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

 

spring.kafka.bootstrap-servers 브로커의 주소를 지정하는 설정이다. 

Producer가 전송할 메시지의 키와 값을 직렬화 하는 방식을 설정한다. kafka는 바이너리 데이터를 처리하므로, 메시지르 전송하기 전에 직렬화해야 한다.

 

StringSerializer 로 설정해 메시지의 키와 값이 문자열일 경우에 해당한다. 전송할 문자열인 경우 직렬화 방식으로 StringSerializer를 사용한다. 

 

키 직렬화는 메시지의 키를 어떻게 전송할지 결정하고, 값 직렬화는 메시지 본문을 어떻게 전송할지 결정한다.

키와 값에 대한 직렬화 방식은 데이터를 전송할 때와 받았을 때의 타입을 맞추기위해 일관되게 설정해야한다.

 

spring.application.name=consumer
server.port=8091

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

Consumer가 kafka 에서 수신한 메시지의 키와 값을 역직렬화 하는 방식을 정의한다.

kafka에서 수신한 메시지는 기본적으로 바이트 배열로 저장되므로, 해당 데이터를 애플리케이션에서 사용 가능한 객체로 변환하기 위해 역직렬화가 필요하다.

 

Producer KafkaConfig 설정

package com.kafka_sample.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class ProducerApplicationKafkaConfig {
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

 

ProducerFactory<String, String> 는 kafka Producer 의 설정을 관리하고, 메시지를 kafka 토픽에 전송할 수 있는 

Producer를 생성하는 데 사용된다. String 타입의 키와 값으로 메시지를 보낼것이므로 String 타입으로 지정했다.

 

ProducerConfig 클래스를 사용하여 Kafka Producer 설정을 정의한다.

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 은 kafka 브로커의 주소를 지정한다.

9092 포트는 로컬에서 실행중인 kafka 서버를 의미한다.

 

ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG와 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG 설정은 메시지의 키와 값이 String 타입임을 지정하고, 이를 kafka로 전송하기 전에 직렬화할 방법을 설정한다. 

StringSerializer.class를 사용하여 문자열 데이터를 바이트 배열로 변환한다.

 

DefaultKafkaProducerFactory 클래스는 ProducerFactory의 기본 구현체로 위의 설정을 사용하여 Producer를 생성한다.

 

 

KafkaTemplate<String, String>  KafkaTemplate은 Kafka와 상호작용하는 데 사용되는 주요 클래스이다.
Producer가 메시지를 Kafka로 전송할 때 이 템플릿을 사용하여 메시지를 전송한다. 여기서 String 타입의 키와 값으로 메시지를 송신한다.

KafkaTemplate은 기본적으로 ProducerFactory를 사용하여 Kafka Producer를 생성하고, 이를 이용해 메시지를 송신한다.

producerFactory()는 위에서 설명한 ProducerFactory를 반환하며, 이 KafkaTemplate은 실제로 Kafka에 메시지를 전송하는 기능을 한다.

 

ProducerController

@RestController
@RequiredArgsConstructor
public class ProducerController {

    private final ProducerService producerService;

    @GetMapping("/send")
    public String sendMessage(@RequestParam("topic") String topic,
                              @RequestParam("key") String key,
                              @RequestParam("message") String message){

        producerService.sendMessage(topic,key,message);
        return "Message send to kafka topic";
    }
}

 

ProducerSerivce

@Service
@RequiredArgsConstructor
public class ProducerService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String topic , String key, String message) {
        for (int i = 0; i < 10; i++) {

            kafkaTemplate.send(topic, key, message + " " + i);
        }

    }
}

 

send 메서드는 kafka에서 가장 기본적인 방법중 하나 이다.

KafkaProducer를 통해 Kafka 토픽으로 데이터를 비동기적으로 전송하는 역할을 한다.

파마미터 순으로 topicName, key, message 이다.

 

Consumer KafkaConfig

package com.kafka_sample.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import java.util.HashMap;
import java.util.Map;

// 이 클래스는 Kafka 컨슈머 설정을 위한 Spring 설정 클래스입니다.
@EnableKafka // Kafka 리스너를 활성화하는 어노테이션입니다.
@Configuration // Spring 설정 클래스로 선언하는 어노테이션입니다.
public class ConsumerApplicationKafkaConfig {

    // Kafka 컨슈머 팩토리를 생성하는 빈을 정의합니다.
    // ConsumerFactory는 Kafka 컨슈머 인스턴스를 생성하는 데 사용됩니다.
    // 각 컨슈머는 이 팩토리를 통해 생성된 설정을 기반으로 작동합니다.
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        // 컨슈머 팩토리 설정을 위한 맵을 생성합니다.
        Map<String, Object> configProps = new HashMap<>();
        // Kafka 브로커의 주소를 설정합니다.
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // 메시지 키의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 메시지 값의 디시리얼라이저 클래스를 설정합니다.
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 설정된 프로퍼티로 DefaultKafkaConsumerFactory를 생성하여 반환합니다.
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

    // Kafka 리스너 컨테이너 팩토리를 생성하는 빈을 정의합니다.
    // ConcurrentKafkaListenerContainerFactory는 Kafka 메시지를 비동기적으로 수신하는 리스너 컨테이너를 생성하는 데 사용됩니다.
    // 이 팩토리는 @KafkaListener 어노테이션이 붙은 메서드들을 실행할 컨테이너를 제공합니다.
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        // ConcurrentKafkaListenerContainerFactory를 생성합니다.
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        // 컨슈머 팩토리를 리스너 컨테이너 팩토리에 설정합니다.
        factory.setConsumerFactory(consumerFactory());
        // 설정된 리스너 컨테이너 팩토리를 반환합니다.
        return factory;
    }
}

 

ConsumerEndpoint

package com.kafka_sample.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;


@Slf4j
@Service
public class ConsumerEndpoint {

    /**
     * topic이 같고 group이 다르면 각 group 마다 같은 message를 받는다
     *
     * topic으로 설정된 group에만 message가 전송된다.
     *
     */
    
    // 이 메서드는 Kafka에서 메시지를 소비하는 리스너 메서드입니다.
    // @KafkaListener 어노테이션은 이 메서드를 Kafka 리스너로 설정합니다.
    @KafkaListener(groupId = "group_a", topics = "topic1")
    // Kafka 토픽 "test-topic"에서 메시지를 수신하면 이 메서드가 호출됩니다.
    // groupId는 컨슈머 그룹을 지정하여 동일한 그룹에 속한 다른 컨슈머와 메시지를 분배받습니다.
    public void consumeFromGroupA(String message) {
        log.info("Group A consumed message from topic1: " + message);
    }

    // 동일한 토픽을 다른 그룹 ID로 소비하는 또 다른 리스너 메서드입니다.
    @KafkaListener(groupId = "group_b", topics = "topic1")
    public void consumeFromGroupB(String message) {
        log.info("Group B consumed message from topic1: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
    @KafkaListener(groupId = "group_c", topics = "topic2")
    public void consumeFromTopicC(String message) {
        log.info("Group C consumed message from topic2: " + message);
    }

    // 다른 토픽을 다른 그룹 ID로 소비하는 리스너 메서드입니다.
    @KafkaListener(groupId = "group_c", topics = "topic3")
    public void consumeFromTopicD(String message) {
        log.info("Group C consumed message from topic3: " + message);
    }

    @KafkaListener(groupId = "group_d", topics = "topic4")
    public void consumeFromPartition0(String message) {
        log.info("Group D consumed message from topic4: " + message);
    }
}

 


요약

  • kafka는 고성능, 확정성, 내구성을 갖춘 분산 스트리밍 플랫폼이다.
  • Producer와 Consumer를 통해 메시지를 처리하고, Topic을 사용해 메시즈를 구분한다.
  • kafka는 대규모 실시간 데이터 스트리밍과 로그처리에 적합하며, 스프링에서 쉽게 연동할 수 있다.
  • KafkaTemplate과 @kafkaListener를 사용해 Producer와 Consumer를 구현할 수 있다.