개인 프로젝트에서 Kafka를 이용해 CQRS를 적용하던 중 아래와 같은 에러가 발생하였습니다.
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "postId" (class com.example.core.domain.messaging.command.post.PostCreateMessage), not marked as ignorable (2 known properties: "postCreate", "userId"]) at [Source: (String)"{"postId":{"id":"fd94aee3-2ede-4c9a-a0b8-dd0262e73eb3"},"title":"성공인가?","content":"kafka test"}"; line: 1, column: 96] (through reference chain: com.example.core.domain.messaging.command.post.PostCreateMessage["postId"])
UnrecognizedPropertyException인데 왜 발생한 것인지 알아보겠습니다.
기존의 Json 전송방법
기존에는 Kafka를 전송할때 Json자체를 전송하였습니다.
ConsumerConfig를 확인해 보겠습니다.
@Configuration
public class KafkaConsumerConfig {
/**
* ConsumerFactory 설정
*/
@Bean
public ConsumerFactory<String, KafkaPostCreate> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
new JsonDeserializer<>(KafkaPostCreate.class, new ObjectMapper())
);
}
/**
* topic으로부터 message를 전달받는 kafka-listener-factory 설정
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPostCreate> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaPostCreate> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaPostCreate를 직접적으로 보내고 있습니다.
하지만 이렇게 보내면 다른 DTO를 보낼때 추가적인 설정을 해줘야할것 같았습니다.
그래서 ObjectMapper를 사용해서 String으로 전환하여 보내는 방식으로 변경하였습니다.
ObjectMapper를 사용해 객체 전달하기
Value설정을 할때 JsonDeserializer에서 StringDeserializer변경한것 밖에 없습니다.
@Configuration
public class KafkaConsumerConfig {
/**
* ConsumerFactory 설정
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config
// new StringDeserializer(),
// new JsonDeserializer<>(PostCreateMessage.class, om)
);
}
/**
* topic으로부터 message를 전달받는 kafka-listener-factory 설정
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
그럼 문제는 어디서 발생한 것일까요?
DTO 전달문제와 DB 동기화
기존에는 바로 KafkaPostCreate를 보내었습니다.
바로 테스트를 위해서 보냈던 것인데요...
Post 저장 성공시 QueryDB에도 동기화가 필요하기에 저장할 DTO를 전송해야 했습니다.
하지만 KafkaPostCreate에서는 ID값이 없어서 새로운 DTO를 만들어야 해결이 가능합니다.
기존 KafkaPostCreate는 연습용이였기때문에 이름이 KafkaPostCreate였습니다.
용도에 맞게 Client의 요청은 PostCreateMessage
QueryDB동기화를 위해 Kafka로 보내지는 DTO는 KafkaPostCreate로 변경하였습니다.

변경 후 Kafka를 통해 Message가 전송되는것을 확인할 수 있었습니다.

'Project > 개인프로젝트' 카테고리의 다른 글
자바 스크립트 fetch, await, async (0) | 2023.12.03 |
---|---|
MongoDB Capped collection 버퍼 크기 에러 (0) | 2023.11.21 |
[Error] 멀티모듈 BeanCreationException (0) | 2023.10.20 |
[Error] Multi Module 적용 에러 (0) | 2023.10.19 |
Spring Integration 적용하기 (0) | 2023.10.07 |
개인 프로젝트에서 Kafka를 이용해 CQRS를 적용하던 중 아래와 같은 에러가 발생하였습니다.
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "postId" (class com.example.core.domain.messaging.command.post.PostCreateMessage), not marked as ignorable (2 known properties: "postCreate", "userId"]) at [Source: (String)"{"postId":{"id":"fd94aee3-2ede-4c9a-a0b8-dd0262e73eb3"},"title":"성공인가?","content":"kafka test"}"; line: 1, column: 96] (through reference chain: com.example.core.domain.messaging.command.post.PostCreateMessage["postId"])
UnrecognizedPropertyException인데 왜 발생한 것인지 알아보겠습니다.
기존의 Json 전송방법
기존에는 Kafka를 전송할때 Json자체를 전송하였습니다.
ConsumerConfig를 확인해 보겠습니다.
@Configuration
public class KafkaConsumerConfig {
/**
* ConsumerFactory 설정
*/
@Bean
public ConsumerFactory<String, KafkaPostCreate> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
new JsonDeserializer<>(KafkaPostCreate.class, new ObjectMapper())
);
}
/**
* topic으로부터 message를 전달받는 kafka-listener-factory 설정
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPostCreate> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaPostCreate> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
KafkaPostCreate를 직접적으로 보내고 있습니다.
하지만 이렇게 보내면 다른 DTO를 보낼때 추가적인 설정을 해줘야할것 같았습니다.
그래서 ObjectMapper를 사용해서 String으로 전환하여 보내는 방식으로 변경하였습니다.
ObjectMapper를 사용해 객체 전달하기
Value설정을 할때 JsonDeserializer에서 StringDeserializer변경한것 밖에 없습니다.
@Configuration
public class KafkaConsumerConfig {
/**
* ConsumerFactory 설정
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_1");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(
config
// new StringDeserializer(),
// new JsonDeserializer<>(PostCreateMessage.class, om)
);
}
/**
* topic으로부터 message를 전달받는 kafka-listener-factory 설정
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
그럼 문제는 어디서 발생한 것일까요?
DTO 전달문제와 DB 동기화
기존에는 바로 KafkaPostCreate를 보내었습니다.
바로 테스트를 위해서 보냈던 것인데요...
Post 저장 성공시 QueryDB에도 동기화가 필요하기에 저장할 DTO를 전송해야 했습니다.
하지만 KafkaPostCreate에서는 ID값이 없어서 새로운 DTO를 만들어야 해결이 가능합니다.
기존 KafkaPostCreate는 연습용이였기때문에 이름이 KafkaPostCreate였습니다.
용도에 맞게 Client의 요청은 PostCreateMessage
QueryDB동기화를 위해 Kafka로 보내지는 DTO는 KafkaPostCreate로 변경하였습니다.

변경 후 Kafka를 통해 Message가 전송되는것을 확인할 수 있었습니다.

'Project > 개인프로젝트' 카테고리의 다른 글
자바 스크립트 fetch, await, async (0) | 2023.12.03 |
---|---|
MongoDB Capped collection 버퍼 크기 에러 (0) | 2023.11.21 |
[Error] 멀티모듈 BeanCreationException (0) | 2023.10.20 |
[Error] Multi Module 적용 에러 (0) | 2023.10.19 |
Spring Integration 적용하기 (0) | 2023.10.07 |