반응형
카프카는 대규모 데이터 처리를 위한 분산 메시징 시스템으로, 스프링 프레임워크와 함께 사용하면 더욱 편리한 애플리케이션 개발이 가능합니다.
이 글은 스프링 부트와 스프링 카프카를 사용하여 카프카와 연결하는 예제 코드를 포함하고 있습니다.
1. Maven 또는 Gradle을 사용하여 스프링 카프카 및 카프카 클라이언트를 프로젝트에 추가합니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
2. 카프카 프로듀서와 컨슈머를 생성할 때 사용할 설정을 정의합니다.
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.consumer.group-id}")
private String consumerGroupId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
return props;
}
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
3. 카프카 프로듀서를 생성하고 메시지를 전송합니다.
@RestController
public class KafkaProducerController {
private final KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
public KafkaProducerController(KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@PostMapping("/messages")
public ResponseEntity<String> sendMessage(@RequestBody Message message) {
kafkaTemplate.send("my-topic", message);
return ResponseEntity.ok("Message sent successfully");
}
4. 카프카 컨슈머를 생성하고 메시지를 수신합니다.
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic")
public void receiveMessage(Message message) {
// Process the message
System.out.println("Received message: " + message);
}
}
예제에서는 KafkaConfig 클래스에서 카프카 프로듀서와 컨슈머를 위한 설정을 정의하고,
KafkaProducerController 클래스에서 KafkaTemplate을 사용하여 메시지를 전송하고,
KafkaConsumerService 클래스에서 @KafkaListener 어노테이션을 사용하여 메시지를 수신하고 처리합니다.
이와 같은 방식으로 스프링 프레임워크와 카프카를 연동하여 데이터를 신속하게 처리할 수 있습니다.
반응형
댓글