How to use Criteria & Metamodel API with Spring Boot to create dynamic queries
Sometimes you may need to create your sql queries on runtime with some specification given by your client. In these cases, you may create many JPQLs …
In distributed systems, we generally face the challenge of keeping our databases and external systems in sync.
For instance when we create an order we also notify (createOrderEvent) the ship service for the incoming order(s) using message broker such as kafka. If message broker or order save operation fails, our systems ends up in an inconsistent state.
The Outbox pattern tries to solve this problem by treating message publication as part of your database transaction. In other words instead of sending “createOrderEvent” directly into the message broker, we first save into the Outbox table at the same time we are saving the order (in a single transaction), then send it to the message broker periodically getting the records from that table.
Outbox pattern fixes a problem such as “save data and communicate with an external component” or “need to do two things at once”
Let’s do a practical example for Outbox pattern using spring boot. In our scenario we are creating an order then notify the other systems using message broker (kafka in this case)
If you only need to see the code, here is the github link
Please create spring boot application with the following dependencies: web, postgres, flyway, kafka, testcontainer
I am assuming that you already have started docker containers for postgres and kafka
Initialy, assuming that we have following migration script in db/migration folder (V1_init.sql):
CREATE TABLE orders (
id character varying(60) PRIMARY KEY,
product_name character varying(255) NOT NULL
);
With these setup:
@Entity
@Table(name = "orders")
public class OrderEntity {
@Id
@UuidGenerator(style = UuidGenerator.Style.TIME)
@Column(name = "id")
private String id;
@Column(name = "product_name")
private String productName;
}
@Repository
public interface OrderRepository extends JpaRepository<OrderEntity, String> {
}
// create and send event to the external systems using kafka
public record OrderEventCreated(
String id
) {
}
@RestController("api/order")
@RequiredArgsConstructor
public class OrderController {
private final OrderService orderService;
@PostMapping
public String createOrder(@RequestBody CreateOrderRequest createOrderRequest) {
return orderService.createOrder(createOrderRequest);
}
}
public record CreateOrderRequest(
String productName
) {
}
Let’s first see how code looks like without applying outbox pattern:
@Service
@RequiredArgsConstructor
public class OrderService {
public String createOrder(CreateOrderRequest createOrderRequest) throws JsonProcessingException {
OrderEntity createdEntity = orderRepository.save(OrderEntity.builder().productName(createOrderRequest.getProductName()).build());
String data = objectMapper.writeValueAsString(OrderEventCreated.builder().id(createdEntity.getId()).build());
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("orderCreated", data);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Send it to the kafka: {}", result.getProducerRecord());
}
});
return "created";
}
}
If everything goes well as expected, written test case in the above should not fail:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class OutboxPatternWithSpringBootApplicationTests {
@Test
void testOrderCreation() {
ResponseEntity<String> orderResponse =
testRestTemplate.postForEntity(generateUri("/api/order"), CreateOrderRequest.builder().productName("test").build(), String.class);
Assertions.assertEquals("created", orderResponse.getBody());
}
}
But what if kafka container goes down or not reachable? In that case even we write the data in our DB, external systems won’t be aware of the created event. At the end, it will create a confict between two services(microservices or external system whatever etc…)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaFailureTest {
@Autowired
TestRestTemplate testRestTemplate;
@MockitoBean
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
OrderRepository orderRepository;
@Test
void testKafkaFailure() {
Mockito.when(kafkaTemplate.send(Mockito.anyString(), Mockito.anyString())).thenThrow(new RuntimeException("Failed"));
ResponseEntity<String> orderResponse =
testRestTemplate.postForEntity(generateUri("/api/order"), CreateOrderRequest.builder().productName("test").build(), String.class);
// kafka operation failed and we returned 500
Assertions.assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, orderResponse.getStatusCode());
// but we saved order in our DB !!!
Assertions.assertEquals(1, orderRepository.count());
}
}
We don’t want to use
@Transactional
on top of thecreateOrder
. If we do that transaction will wait kafka operation which is not efficient in terms of database pooling
If save operation fails, it is okay. In other words we won’t store order in database and we also don’t call the kafkaTemplate to send message to external systems.
Problem arises when kafka operation fails
That is the where Outbox pattern comes in. Let’s solve this problem using that pattern.
Here is the updated code with outbox pattern:
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
@Transactional
public String createOrderWithOutbox(CreateOrderRequest createOrderRequest) throws JsonProcessingException {
OrderEntity createdEntity = orderRepository.save(OrderEntity.builder().productName(createOrderRequest.getProductName()).build());
String data = objectMapper.writeValueAsString(OrderEventCreated.builder().id(createdEntity.getId()).build());
outboxRepository.save(OutboxEntity.builder().type(OrderEventCreated.class.getSimpleName())
.content(data)
.generatedUtc(OffsetDateTime.now())
.build());
return "created";
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxProcessor {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
//Shedlock may required if you have more than one instance
@Scheduled(fixedRate = 5000)
public void processOutboxMessages() {
final int batchSize = 100;
// always process oldest OutboxMessage to publish messages in ordered manner
Pageable pageRequest = PageRequest.of(0, batchSize, Sort.by(Sort.Direction.ASC, "generatedUtc"));
Slice<OutboxEntity> unProcessedMessages = outboxRepository.getUnProcessedMessages(pageRequest);
unProcessedMessages.forEach(outboxEntity -> {
// we may also convert the content to the given type and push it into kafka !!
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("orderCreated", outboxEntity.getContent());
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Message send it to the kafka: {}", result.getProducerRecord());
// call another service to update outboxMessage as processed, update entity.consumedUtc time
} else {
log.info("Something went wrong while processing message: {}", result.getRecordMetadata().topic());
}
});
});
}
}
That is the Outbox pattern. However there are some caveats, you should consider:
Sometimes you may need to create your sql queries on runtime with some specification given by your client. In these cases, you may create many JPQLs …
It is crucial to capture long running method execution in your application (whether it is an API or traditional mvc application). You should also …