Implementation of Outbox Pattern in Spring Boot

  • |
  • 20 March 2025

In distributed systems, we generally face the challenge of keeping our databases and external systems in sync.

For instance when we create an order we also notify (createOrderEvent) the ship service for the incoming order(s) using message broker such as kafka. If message broker or order save operation fails, our systems ends up in an inconsistent state.

The Outbox pattern tries to solve this problem by treating message publication as part of your database transaction. In other words instead of sending “createOrderEvent” directly into the message broker, we first save into the Outbox table at the same time we are saving the order (in a single transaction), then send it to the message broker periodically getting the records from that table.

Outbox pattern fixes a problem such as “save data and communicate with an external component” or “need to do two things at once”

Let’s do a practical example for Outbox pattern using spring boot. In our scenario we are creating an order then notify the other systems using message broker (kafka in this case)

If you only need to see the code, here is the github link

Create a spring boot application

Please create spring boot application with the following dependencies: web, postgres, flyway, kafka, testcontainer

I am assuming that you already have started docker containers for postgres and kafka

Initialy, assuming that we have following migration script in db/migration folder (V1_init.sql):

CREATE TABLE orders (
    id character varying(60) PRIMARY KEY,
    product_name character varying(255) NOT NULL
);

With these setup:

@Entity
@Table(name = "orders")
public class OrderEntity {
    @Id
    @UuidGenerator(style = UuidGenerator.Style.TIME)
    @Column(name = "id")
    private String id;

    @Column(name = "product_name")
    private String productName;
}

@Repository
public interface OrderRepository extends JpaRepository<OrderEntity, String> {
}

// create and send event to the external systems using kafka
public record OrderEventCreated(
        String id
) {
}
  • and we have simple endpoint to create order:
@RestController("api/order")
@RequiredArgsConstructor
public class OrderController {
    private final OrderService orderService;
    
    @PostMapping
    public String createOrder(@RequestBody CreateOrderRequest createOrderRequest) {
        return orderService.createOrder(createOrderRequest);
    }
}

public record CreateOrderRequest(
        String productName
) {
}

Create order without outbox pattern

Let’s first see how code looks like without applying outbox pattern:

@Service
@RequiredArgsConstructor
public class OrderService {
    public String createOrder(CreateOrderRequest createOrderRequest) throws JsonProcessingException {
        OrderEntity createdEntity = orderRepository.save(OrderEntity.builder().productName(createOrderRequest.getProductName()).build());

        String data = objectMapper.writeValueAsString(OrderEventCreated.builder().id(createdEntity.getId()).build());
        
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("orderCreated", data);
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("Send it to the kafka: {}", result.getProducerRecord());
            }
        });

        return "created";
    }
}

If everything goes well as expected, written test case in the above should not fail:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class OutboxPatternWithSpringBootApplicationTests {
	
	@Test
	void testOrderCreation() {
		ResponseEntity<String> orderResponse =
				testRestTemplate.postForEntity(generateUri("/api/order"), CreateOrderRequest.builder().productName("test").build(), String.class);

		Assertions.assertEquals("created", orderResponse.getBody());
	}
}

But what if kafka container goes down or not reachable? In that case even we write the data in our DB, external systems won’t be aware of the created event. At the end, it will create a confict between two services(microservices or external system whatever etc…)

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class KafkaFailureTest {
    @Autowired
    TestRestTemplate testRestTemplate;
    @MockitoBean
    KafkaTemplate<String, String> kafkaTemplate;
    @Autowired
    OrderRepository orderRepository;

    @Test
    void testKafkaFailure() {
        Mockito.when(kafkaTemplate.send(Mockito.anyString(), Mockito.anyString())).thenThrow(new RuntimeException("Failed"));
        ResponseEntity<String> orderResponse =
                testRestTemplate.postForEntity(generateUri("/api/order"), CreateOrderRequest.builder().productName("test").build(), String.class);
        
        // kafka operation failed and we returned 500
        Assertions.assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, orderResponse.getStatusCode());
        // but we saved order in our DB !!!
        Assertions.assertEquals(1, orderRepository.count());
    }
}

We don’t want to use @Transactional on top of the createOrder. If we do that transaction will wait kafka operation which is not efficient in terms of database pooling

If save operation fails, it is okay. In other words we won’t store order in database and we also don’t call the kafkaTemplate to send message to external systems.

Problem arises when kafka operation fails

That is the where Outbox pattern comes in. Let’s solve this problem using that pattern.

outbox_pattern_flow.png

First step - Update orderService with Outbox Pattern

Here is the updated code with outbox pattern:


@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final ObjectMapper objectMapper;

    @Transactional
    public String createOrderWithOutbox(CreateOrderRequest createOrderRequest) throws JsonProcessingException {
        OrderEntity createdEntity = orderRepository.save(OrderEntity.builder().productName(createOrderRequest.getProductName()).build());
        String data = objectMapper.writeValueAsString(OrderEventCreated.builder().id(createdEntity.getId()).build());
        outboxRepository.save(OutboxEntity.builder().type(OrderEventCreated.class.getSimpleName())
                        .content(data)
                        .generatedUtc(OffsetDateTime.now())
                .build());
        return "created";
    }
  
}
  • As you can see instead of publishing message(s) into kafka, we are saving into DB as outbox messages. We will use field “generatedUtc” while processing outbox messages into message queue.
  • From now on, we are just running on the happy path. If everything is OK, then we know that we have saved order and outbox message. If one of them fails, transaction will be rollback.

Second step - Implementing Outbox processor

  • Outbox processor, could be a scheduler job, will find the unprocessed messages and publish them to the message queue. Then if it is published successfully, it will mark the outbox messages as processed. Publish operation can be done by one by or batch.

Create scheduler job to process outbox message

@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxProcessor {
    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    //Shedlock may required if you have more than one instance
    @Scheduled(fixedRate = 5000)
    public void processOutboxMessages() {
        final int batchSize = 100;
        // always process oldest OutboxMessage to publish messages in ordered manner
        Pageable pageRequest = PageRequest.of(0, batchSize, Sort.by(Sort.Direction.ASC, "generatedUtc"));
        
        Slice<OutboxEntity> unProcessedMessages = outboxRepository.getUnProcessedMessages(pageRequest);
        
        unProcessedMessages.forEach(outboxEntity -> {
            // we may also convert the content to the given type and push it into kafka !!
            CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send("orderCreated", outboxEntity.getContent());
            future.whenComplete((result, ex) -> {
                if (ex == null) {
                    log.info("Message send it to the kafka: {}", result.getProducerRecord());
                    // call another service to update outboxMessage as processed, update entity.consumedUtc time
                } else {
                    log.info("Something went wrong while processing message: {}", result.getRecordMetadata().topic());
                }
            });
        });
    }
}

That is the Outbox pattern. However there are some caveats, you should consider:

  • It is better to design retry pattern while publishing the messages
  • To not infinitely processing un-processed message again in again, It is better to add maxCounter and giving up to processing “dead outbox messages” and delete it from Outbox message
  • And also it is better to put “dead outbox messages” in somewhere (another table etc..) to further investigation
  • You should always shrink Outbox message table after processed. Would be better to create another scheduler to delete processed or “dead outbox” messages
  • And most important part, it is so crucial how you can consume the messages. Because Outbox pattern can publish a message more than once (or we can say that we guarantee that we are going to publish a message at least one message, but could be many). Therefore your consumer should be aware of this situation otherwise it may consume and run its business logic more than once.

You May Also Like