티스토리 뷰

이전 글: https://gojs.tistory.com/22

 

Kafka 기본 개념

카프카를 구성하는 주체카프카 브로커는 데이터를 분산 저장하며 클라이언트와 데이터를 주고 받는 주체이다.하나의 서버 인스턴스에 하나의 카프카 브로커 프로세스를 실행하며, 여러 카프카

gojs.tistory.com

 

이번 포스팅에서는 Spring Kafka 기반의 프로젝트를 생성한 후, Producer와 Consumer의 설정을 조정하며 Kafka의 동작원리를 파악해본다.

프로젝트: https://github.com/jaeseok-go/kafka-practice

 

Spring Kafka

Spring Kafka 라이브러리를 활용하게 되면 Spring 기반의 프로젝트에서 API를 활용하여 Kafka Client를 손쉽게 구현할 수 있다.

 

Gradle 기반에서는 아래와 같이 디펜던시를 추가해준다.

implementation 'org.springframework.kafka:spring-kafka'

 

Spring Kafka Producer

Spring Kafka 기반으로 Producer를 구성하는 경우 일반적으로는 KafkaTemplate를 사용한다.

spring.kafka.producer.acks
spring.kafka.producer.batch-size
spring.kafka.producer.bootstrap-servers
spring.kafka.producer.buffer-memory
spring.kafka.producer.client-id
spring.kafka.producer.compression-type
spring.kafka.producer.key-serializer
spring.kafka.producer.properties.*
spring.kafka.producer.retries
spring.kafka.producer.trasaction-id-prefix
spring.kafka.producer.value-serializer

KafkaTemplate의 옵션의 목록은 위와 같으며, 이에 대한 설명은 아래에서 이어갈 예정이다.

 

이 설정을 세팅할 수 있는 방법은 두 가지로 나뉜다.

1. application.yml에 producer 관련 설정

2. ProducerFactory에 설정 값 세팅, KafkaTemplate 생성자에 주입 (해당 방법으로 진행)

 

우선 아래와 같이 KafkaTemplate 타입의 messageKafkaTemplate 빈을 생성하였다.

@Configuration
public class ProducerConfiguration {

    @Bean
    public KafkaTemplate<String, String> messageKafkaTemplate() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-demo-instance:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.ACKS_CONFIG, "all");

        ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(properties);

        return new KafkaTemplate<>(producerFactory);
    }
}

key-serializer, value-serializer, acks 등의 설정은 일단 임의로 설정했다.

 

그리고 Producer 역할을 수행할 클래스를 아래와 같이 구현한다.

public interface KafkaProducer {
    void produce(String message);
}


@Slf4j
@RequiredArgsConstructor
@Component
public class MessageKafkaProducer implements KafkaProducer {

    private final KafkaTemplate<String, String> messageKafkaTemplate;

    private final KafkaStorage producerStorage;

    private static final String TOPIC_NAME = "message";

    @Override
    public void produce(String message) {
        messageKafkaTemplate
                .send(TOPIC_NAME, message)
                .whenComplete((result, exception) -> {
                    log.info("TOPIC={} {}: result={}",
                            TOPIC_NAME,
                            ObjectUtils.isEmpty(exception) ? "SUCCESS" : "FAILURE",
                            ObjectUtils.isEmpty(exception) ? result : exception.getMessage()
                    );

                    producerStorage.save(message);
                });
    }
}

KafkaTemplate의 send 메서드는 CompletableFuture 타입을 반환하기 때문에 whenComplete 메서드로 결과에 대한 콜백 로직을 구현할 수 있다.

produce/consume에 대한 결과를 가지고 테스트를 구현하기 위해서 KafkaStorage 인터페이스와 ProducerStorage를 구현하여 결과를 적재하였다.

public interface KafkaStorage {

    void initialize();

    void save(Object data);

    Object get(int offset);
}


@Component
public class ProducerStorage implements KafkaStorage {
    private Map<Integer, Object> storeMap = new HashMap<>();

    private int offset = 0;

    @Override
    public void initialize() {
        storeMap = new HashMap<>();
        offset = 0;
    }

    @Override
    public synchronized void save(Object data) {
        storeMap.put(offset++, data);
    }

    @Override
    public Object get(int offset) {
        return storeMap.get(offset);
    }
}

멀티 스레드 환경에서 save 메서드에 접근하는 경우 offset이 중복되는 케이스가 발생하기 때문에, 각 스레드가 lock을 획득하여 save를 수행하도록 synchronized 메서드로 구현하였다.

 

@SpringBootTest
class MessageKafkaProducerTest {

    @Autowired
    private KafkaProducer messageKafkaProducer;

    @Autowired
    private KafkaStorage producerStorage;

    @BeforeEach
    void initializeStorage() {
        producerStorage.initialize();;
    }

    @Test
    void 카프카에_메세지를_전송한다() throws InterruptedException {
        // given
        String testMessage = "test message";

        // when
        messageKafkaProducer.produce(testMessage);
        Thread.sleep(1000L);

        // then
        Object data = producerStorage.get(0);
        Assertions.assertEquals(testMessage, (String) data);
    }

    @Test
    void 멀티스레드_환경에서_메세지를_전송한다() throws InterruptedException {
        // given
        String testMessagePrefix = "test message";

        int requestCount = 100;
        int activeThreadCount = 30;
        ExecutorService executorService = Executors.newFixedThreadPool(activeThreadCount);

        // when
        for (int i = 0; i < requestCount; i++) {
            String message = testMessagePrefix + i;
            executorService.submit(() -> {
                messageKafkaProducer.produce(message);
            });
        }
        Thread.sleep(1000);

        // then
        for (int offset = 0; offset < requestCount; offset++) {
            Object data = producerStorage.get(offset);
            Assertions.assertTrue(((String) data).startsWith(testMessagePrefix));
        }
    }
}

그리고 위와 같은 통합 테스트 코드를 작성하여 Producer가 Kafka에 메세지를 발행하고, 이에 대한 결과를 ProducerStorage에 정상적으로 적재하였는지 검증해보았다.

통합 테스트 결과
Kafka에 발행된 레코드 확인

 

Spring Kafka Consumer

Spring Kafka를 기반의 Consumer는 크게 두 가지 타입으로 구분된다.

1. 레코드 리스너: 한 번에 한 개의 레코드를 처리

2. 배치 리스너: 한 번에 여러 개의 레코드를 처리

 

그리고 7개의 AckMode 타입을 가진다.

1. RECORD: 레코드 단위로 프로세싱 후 커밋

2. BATCH: poll된 레코드 모두 처리 후 커밋 (기본 값)

3. TIME: 특정 시간 이후에 커밋 (AckTime 설정 필수)

4. COUNT: 특정 개수의 레코드만큼 처리된 후 커밋

5. COUNT_TIME: TIME, COUNT 중 하나라도 맞는 조건이 있으면 커밋

6. MANUAL: Acknowledgement.acknowledge()를 호출한 후, 다음번 poll()할 때 커밋

7. MANUAL_IMMEDIATE: Acknowledgement.acknowledge()를 호출하면 바로 커밋

 

스프링 카프카에서는 위 타입별 리스너를 구현해두었으며, 그대로 사용하거나 별도로 구성해서 사용해도 무방하다.

 

spring.kafka.consumer.auto-commit-interval
spring.kafka.consumer.auto-offset-reset
spring.kafka.consumer.bootstrap-server
spring.kafka.consumer.client-id
spring.kafka.consumer.enable-auto-commit
spring.kafka.consumer.fetch-max-wait
spring.kafka.consumer.fetch-min-size
spring.kafka.consumer.group-id
spring.kafka.consumer.heartbeat-interval
spring.kafka.consumer.key-deserializer
spring.kafka.consumer.max-poll-records
spring.kafka.consumer.properties.*
spring.kafka.consumer.value-deserializer
spring.kafka.listener.ack-count
spring.kafka.listener.ack-mode
spring.kafka.listener.ack-time
spring.kafka.listener.client-id
spring.kafka.listener.concurrency
spring.kafka.listener.idle-event-interval
spring.kafka.listener.log-container-config
spring.kafka.listener.monitor-interval
spring.kafka.listener.no-poll-threshold
spring.kafka.listener.poll-timeout
spring.kafka.listener.type

스프링 카프카 컨슈머 관련 설정은 위와 같다.

컨슈머는 application.yml에 spring.kafka.listener.type을 설정한 후, AckMode 설정없이 스프링 카프카에서 오버로딩하여 구현해놓을 리스너를 바로 사용할 수 있다.

(파라미터에 따라 AckMode가 다르게 적용된다, type에 더불어 ack-mode 설정까지 해야하는 경우가 있다.)

 

그러나 각 리스너에 대한 부분을 모두 정리하는 것은 단순 나열이기에.. 직업 커스텀 리스너를 구현하는 방법에 대해서 바로 알아보자.

@Configuration
public class ConsumerConfiguration {

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> recordStringContainerFactory() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-demo-instance:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(properties);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setBatchListener(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        factory.setConsumerFactory(consumerFactory);

        return factory;
    }
}

위 코드와 같이 KafkaListenerContainerFactory 타입의 빈 객체를 선언한다.

 

@RequiredArgsConstructor
@Component
public class MessageKafkaListener {

    private final KafkaStorage consumerStorage;

    private final static String TOPIC_NAME = "message";

    private final static String GROUP_NAME = "kafka-consumer-group-00";

    @KafkaListener(
            topics = TOPIC_NAME,
            groupId = GROUP_NAME,
            containerFactory = "recordStringContainerFactory"
    )
    public void recordListener(String record) {
        log.warn("{}: {}", TOPIC_NAME, record);

        consumerStorage.save(record);
    }
}

그리고 KafkaListener 어노테이션의 containerFactory 옵션에 KafkaListenerContainerFactory 타입의 빈 객체 이름을 선언하여 컨슈머 관련 정보를 적용한다.

 

@Component
public class ConsumerStorage implements KafkaStorage {
    private Map<Integer, Object> storeMap = new HashMap<>();

    private int offset = 0;

    @Override
    public void initialize() {
        storeMap = new HashMap<>();
        offset = 0;
    }

    @Override
    public synchronized void save(Object data) {
        storeMap.put(offset++, data);
    }

    @Override
    public Object get(int offset) {
        return storeMap.get(offset);
    }
}

컨슈머의 경우에도 토픽의 데이터를 정상적으로 처리하였는지 기록하여 테스트하기 위해 ConsumerStorage 클래스를 구현했다.

 

@SpringBootTest
class KafkaConsumerTest {

    @Autowired
    private KafkaProducer messageKafkaProducer;

    @Autowired
    private ConsumerStorage consumerStorage;

    @BeforeEach
    void initialize() {
        consumerStorage.initialize();
    }

    @Test
    void 메세지를_발행하면_구독해서_처리() throws InterruptedException {
        // given
        String message = "test message for pub/sub";

        // when
        messageKafkaProducer.produce(message);
        Thread.sleep(1000L);

        // then
        assertEquals(consumerStorage.get(0).toString(), message);
    }
}

프로듀서가 Kafka로 메세지를 제공하면, 컨슈머가 받아서 처리하였는지 확인하는 테스트 코드를 작성하였다.

 

통합 테스트 결과

'공부 > Apache Kafka' 카테고리의 다른 글

Kafka 기본 구조 알아보기  (0) 2024.04.17
간단한 실습으로 Kafka 알아보기  (0) 2024.04.14
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2026/05   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
글 보관함