반응형
Apache Kafka 토픽을 구독하는 Java 소스 코드입니다. 이 코드는 KafkaConsumer를 사용하여 특정 토픽을 구독하고, 메시지를 지속적으로 소비하는 예입니다.
필수 라이브러리 등록
• Kafka를 사용하려면 kafka-clients 라이브러리를 pom.xml에 추가해야 합니다.
• Maven 설정 (pom.xml)
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version> <!-- 최신 버전 확인 후 업데이트 -->
</dependency>
</dependencies>
Kafka Consumer 코드
• 아래 Java 코드는 Kafka 토픽에서 메시지를 소비하는 예제입니다.
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaTopicSubscriber {
public static void main(String[] args) {
// Kafka Consumer 설정
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092"); // Kafka 브로커 주소
properties.put("group.id", "my-consumer-group"); // Consumer Group ID
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.offset.reset", "earliest"); // 가장 처음부터 읽기
// Kafka Consumer 생성
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
String topic = "my-topic"; // 구독할 토픽명
consumer.subscribe(Collections.singletonList(topic));
try {
while (true) {
// 메시지를 1초 동안 폴링하여 가져옴
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.close(); // 종료 시 Consumer를 닫아줌
}
}
}
• 설명
bootstrap.servers | Kafka 브로커 주소 |
group.id | 동일한 Group ID를 가진 Consumer들은 메시지를 분산하여 처리함 |
key.deserializer, value.deserializer | 메시지를 역직렬화하는 클래스 지정 |
auto.offset.reset | 새 Consumer가 시작될 때 처음부터 메시지를 읽도록 설정 |
consumer.subscribe(Collections.singletonList(topic)) | 특정 토픽을 구독 |
consumer.poll(Duration.ofMillis(1000)) | 1초마다 새로운 메시지를 가져옴 |
반응형
'IT > Kafka' 카테고리의 다른 글
Apache Kafka 구성요소 (1) | 2025.02.26 |
---|---|
Apache Kafka - API (0) | 2025.02.26 |
Apache Kafka 유사한 오픈소스 및 상용솔루션 사례 (0) | 2025.02.26 |
Apache Kafka - 신뢰성 보증 방안 (0) | 2025.02.26 |
Apache Kafka 소개 (4) | 2025.02.25 |