본문 바로가기

IT/Kafka

Kafka 메시지를 구독하는 자바 소스 예시

반응형

Apache Kafka  토픽을 구독하는 Java 소스 코드입니다. 이 코드는 KafkaConsumer를 사용하여 특정 토픽을 구독하고, 메시지를 지속적으로 소비하는 예입니다.

Apache Kafka

 

필수 라이브러리 등록

  • Kafka를 사용하려면 kafka-clients 라이브러리를 pom.xml에 추가해야 합니다.
  • Maven 설정 (pom.xml)

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.6.0</version>  <!-- 최신 버전 확인 후 업데이트 -->
    </dependency>
</dependencies>

Kafka Consumer 코드

  • 아래 Java 코드는 Kafka 토픽에서 메시지를 소비하는 예제입니다.

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaTopicSubscriber {
    public static void main(String[] args) {
        // Kafka Consumer 설정
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092"); // Kafka 브로커 주소
        properties.put("group.id", "my-consumer-group"); // Consumer Group ID
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("auto.offset.reset", "earliest"); // 가장 처음부터 읽기

        // Kafka Consumer 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        String topic = "my-topic"; // 구독할 토픽명
        consumer.subscribe(Collections.singletonList(topic));

        try {
            while (true) {
                // 메시지를 1초 동안 폴링하여 가져옴
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
                            record.key(), record.value(), record.partition(), record.offset());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close(); // 종료 시 Consumer를 닫아줌
        }
    }
}


  • 설명

   
bootstrap.servers Kafka 브로커 주소
group.id 동일한 Group ID를 가진 Consumer들은 메시지를 분산하여 처리함
key.deserializer, value.deserializer 메시지를 역직렬화하는 클래스 지정
auto.offset.reset 새 Consumer가 시작될 때 처음부터 메시지를 읽도록 설정
consumer.subscribe(Collections.singletonList(topic)) 특정 토픽을 구독
consumer.poll(Duration.ofMillis(1000)) 1초마다 새로운 메시지를 가져옴

 

반응형

'IT > Kafka' 카테고리의 다른 글

Apache Kafka 구성요소  (1) 2025.02.26
Apache Kafka - API  (0) 2025.02.26
Apache Kafka 유사한 오픈소스 및 상용솔루션 사례  (0) 2025.02.26
Apache Kafka - 신뢰성 보증 방안  (0) 2025.02.26
Apache Kafka 소개  (4) 2025.02.25