기본 설정해보기
의존성 설정 파일
build.gradle
// kafka
implementation 'org.springframework.kafka:spring-kafka'
application.yml
spring
kafka:
bootstrap-servers: localhost:9092
Producer 및 Consumer 설정
프로젝트 코드에서 설정
Producer 파트 설정
/**
* ProducerFactory 설정
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
HashMap<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 직렬화 방식
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 직렬화 방식
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); // 파티션으로 전달하는 방법을 정해준다.
return new DefaultKafkaProducerFactory<>(config);
}
/**
* Kafka Template
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> stringKafkaTemplate = new KafkaTemplate<>(
producerFactory());
// 위에서 설정한 Producer를 링크해준다.
stringKafkaTemplate.setProducerListener(kafkaProducerListener);
return stringKafkaTemplate;
}
Comsumer 파트 설정
/**
* ComsumerFactory 설정
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
HashMap<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
/**
* 카프카 리스너 설정
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// custom한 에러핸들러를 설정해서 지정해 줄 수 있다.
factory.setCommonErrorHandler(kafkaErrorHandler);
return factory;
}
직렬화 처리
카프카 데이터 직렬화 변환
public class KafkaEventSerializer {
private static final ObjectMapper objectMapper = new ObjectMapper();
private KafkaEventSerializer() {
throw new UnsupportedOperationException("이 클래스는 인스턴스 생성을 지원하지 않습니다.");
}
// 직렬화 (객체 -> JSON 문자열)
public static <T> String serialize(T object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
log.error("Failed to serialize object: {}", object, e);
throw new RuntimeException("Serialization error", e);
}
}
// 역직렬화 (JSON 문자열 -> 객체)
public static <T> T deserialize(String json, Class<T> clazz) {
try {
return objectMapper.readValue(json, clazz);
} catch (JsonProcessingException e) {
log.error("Failed to deserialize JSON: {}", json, e);
throw new RuntimeException("Deserialization error", e);
}
}
}
KafkaListener 설정
예시
- 아래의 예시에선 동일한 토픽을 다른 Consumer 그룹에서 독립적으면 처리한다.
// 주문 처리 서비스
@KafkaListener(
groupId = "order-processing-service",
topics = "order-events"
)
public void processOrder(String orderData) {
// 주문 처리 로직
}
// 알림 서비스 (같은 메시지를 독립적으로 처리)
@KafkaListener(
groupId = "notification-service",
topics = "order-events"
)
public void sendNotification(String orderData) {
// 알림 발송 로직
}
Topcis 파라미터의 역할
- Topics는 Kafka 메시지를 저장하는 논리적인 채널입니다.
특징
- 메시지 분류
- 특정 유형의 데이터나 이벤트를 분류하여 저장하는 카테고리 역할
- 데이터 스트림 구분
- 구독 대상 지정
- Consumer가 어떤 토픽의 메시지를 소비할지 결정
GroupId 파라미터의 역할
- Kafka Consumer의 그룹 식별자로, 여러 Consumer 인스턴스들을 하나의 논리적 그룹으로 묶는 역할을 합니다.
특징
- 메시지 분산 처리
- 같은 groupId를 가진 Consumer들은 토픽의 파티션을 나누어서 처리
- 각 파티션은 그룹 내에서 오작 하나의 Consumer만 담당
- 장애 복구
- 한 Consumer가 다운되면 같은 그룹의 다른 Consumer가 해당 파티션을 인계받음
- 자동 리밸런싱을 통해서 고가용성 보장
- 오프셋 관리
- 그룹 단위로 메시지 Offset 추적
- Consumer 재시작 시 마지막 처리 시점 부터 재개 가능
- 독립적인 소비
- 서로 다른 groupId를 가진 Consumer들은 동일한 메시지를 독립적으로 소비
- 하나의 토픽을 여러 서비스에서 가각ㄱ
추후 해볼 것
- 직접 작성한 직렬화를 사용하지 않고 카프카 자체 직렬화 기능을 사용해 보자