Database and Kafka Integration with Testcontainers in Spring Boot

Database and Kafka Integration with Testcontainers in Spring Boot

Before jumping into Testcontainers, I assume we all know that today’s applications tend to have many components, frameworks, databases, and event stores with paradigms such as Microservices architecture.

As the complexity grows with more and more moving parts, Software Testing and Automation become even more critical; therefore, many software testing methodologies have been developed to test an application’s functional or non-functional requirements, such as;

  • Unit Testing
  • Integration Testing
  • System Testing
  • Acceptance Testing
  • Performance Testing
  • Security Testing

The naming and scope of each test methodology may vary based on the company, team, and scope. 

This tutorial will demonstrate an Integration test scenario with the Testcontainers library to make sure integrated components work as intended.

What is Testcontainers? 

Testcontainers is a Java library that supports tests by providing lightweight, throwaway instances of databases, tools, event stores such as Kafka, or anything else that can run in a Docker container. With TestContainers we can initiate any component with a Docker image and make the system or integration tests with the external resources.

Integration Tests Setup with Testcontainers Library in Java

I will have a PostgreSQL database, Kafka event store, and a REST API as components in this example. All the components I mentioned are integrated in the way explained below;

  • The application has a Kafka consumer that consumes events and stores them in the PostgreSQL database.
  • The application has a GET endpoint that fetches the data from the database and exposes it through the REST API call.

Based on the business structure defined above, I want to ensure that my Kafka consumer consumes events properly and saves them in the database. 

Possible integration test case for this flow;

Step 1: Create a Kafka producer and produce data on the specified topic.

Step 2: Make a REST API call to fetch the data that has been produced to Kafka previously.

Step 3: Assert produced data and REST API response. 

To achieve this test case, I need to create separate containers for PostgreSQL, Kafka, and a container for the Spring Boot application. 

To be able to use testcontainers add the following dependency in Gradle.

testImplementation 'org.testcontainers:testcontainers:1.16.3'				

Set up Postgres Database with Testcontainers

I prefer to use GenericContainer over PostgreSQLContainer for the Postgres database as it feels more accessible to me to pass any environment variable and set up host configs. 

  • Pull the official Postgres docker image and instantiate a container on top of that.
  • It will expose 5432 as this is the default Postgres port
  • Bind docker internal port 5432 to exposed port 5432
  • Setup username and password for Postgres DB
  • Use charset en_US.utf8 for Postgres DB
GenericContainer postgresDBContainer = new GenericContainer("postgres:latest")
       .withCreateContainerCmdModifier((Consumer<CreateContainerCmd>) cmd -> cmd.withHostConfig(
               new HostConfig().withPortBindings(new PortBinding(Binding.bindPort(5432), new ExposedPort(5432)))
       ))
       .withExposedPorts(5432)
       .withEnv("POSTGRES_PASSWORD", "<postgres_username>")
       .withEnv("POSTGRES_USER", "<postgres_password>")
       .withEnv("LANG", "en_US.utf8")
       .withReuse(true);

Set up Kafka Container with Testcontainers

  • Pull the official Apache Kafka docker image and Instantiate a container on top of that.
  • It will expose 9092 as this is the default Kafka port
  • Bind docker internal port 9092 to exposed port 9092
  • Setup log consumer
  • Setup KAFKA_ADVERTISED_HOSTNAME to your IP address
  • Create two topics to be used in integration tests when the container is started
GenericContainer kafkaContainer = new GenericContainer("bitnami/kafka:latest")
       .withExposedPorts(9092)
       .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("testcontainers.kafka")))
       .withCreateContainerCmdModifier((Consumer<CreateContainerCmd>) cmd -> cmd.withHostConfig(
               new HostConfig().withPortBindings(
                       new PortBinding(Binding.bindPort(9092), new ExposedPort(9092)))
       ))
       .withEnv("KAFKA_ADVERTISED_HOSTNAME", localHostAddress())
       .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
       .withEnv("KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS", "1000000")
       .withEnv("KAFKA_CREATE_TOPICS", "dev.exceptionly.topic1,dev.exceptionly.topic2")
       .withCommand("run", "-e", "{"kafka_advertised_hostname":" + localHostAddress() + "}")
       .waitingFor(Wait.forLogMessage("Created topic .*", 3)
               .withStartupTimeout(Duration.ofSeconds(180)))
       .withReuse(true);

Set up Spring Boot Application with Testcontainers from a Dockerfile

I assume that you already have a Dockerfile for your application. If you don’t have one, check out my tutorial to learn how to Dockerize a Spring Boot Application.

  • Initiate a Docker container based on the Dockerfile of the Spring Boot application
  • Expose port 8080 for the  application and 8081 for health checks (optional)
  • Setup environment properties for the Spring Boot application container
  • Setup application.yaml file for the Spring Boot application container
  • Pass values for environment variables
  • Wait for kafkaContainer and postgresDBContainer to be started first.
GenericContainer applicationContainer = new GenericContainer(new ImageFromDockerfile()
       .withDockerfile(Paths.get("../Dockerfile").toAbsolutePath()))
       .withExposedPorts(8080, 8081)
       .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger("dev.exceptionly")))
       .waitingFor(Wait.forHttp("/healthcheck").forPort(8081))
       .withFileSystemBind(Paths.get("../dependencies/environment.properties").toAbsolutePath().toString(),
               "/opt/app/config/application-integration-test.properties", BindMode.READ_ONLY)
       .withFileSystemBind(Paths.get("src/integrationTest/resources/application.yml").toAbsolutePath().toString(),
               "/opt/app/config/application.yml", BindMode.READ_ONLY)
       .withEnv("database", localHostAddress())
       .withEnv("kafka", localHostAddress())
       .withEnv("SERVER_HOST", localHostAddress())
       .dependsOn(kafkaContainer)
       .dependsOn(postgresDBContainer)
       .withReuse(true);				

Host address to be used in containers;

private static String localHostAddress() {
   try {
       return InetAddress.getLocalHost().getHostAddress();
   } catch (UnknownHostException e) {
           // do something here
   }
}

environment.properties file configuration for Spring Boot application container;

database=exceptionly-db
kafka=exceptionly-kafka

Integration Tests application.yaml configuration for the application docker container

Remember the  ${database} and ${kafka} variables are passed when application container is started.

spring:
     profiles:
       active: integration-tests
     datasource:
       url: jdbc:postgresql://${database}:5432/postgres
       username: <postgres_username>
       password: <postgres_password>
     kafka:
        bootstrap-servers:
          - ${kafka:localhost}:9092

Integration Tests Implementation 

Finally, we have all containers created and ready to be used for integration tests, but we are still missing some configurations, such as Kafka producer. To test the application’s Kafka consumption, we need to create a Kafka producer and publish events on the specified topic. This behavior will imitate the external producer.

public class KafkaEventProducer {
   private final static String TOPIC_1 = "dev.exceptionly.topic1";
   private final KafkaTemplate<String, ReviewResultEvent> producer;
   public KafkaEventProducer(String bootstrapServers) {
       final Map<String, Object> props = new HashMap<>();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       props.put(ProducerConfig.ACKS_CONFIG, "all");
       props.put(ProducerConfig.RETRIES_CONFIG, 0);
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
       producer = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
   }
   public void send(ReviewResultEvent event) {
       LOGGER.info("Publishing event: {}", event);
       producer.send(new ProducerRecord<>(TOPIC_1, event));
   }
}

Remember the initial test case that drove me to create all these containers and setup;

Step 1: Produce data on the specified topic.

Step 2: Make a REST API call to fetch the data that has been produced to Kafka previously that will hit the database.

Step 3: Assert produced data and REST API response.

   KafkaEventProducer producer;
  @BeforeAll
  void setup() {
    String bootstrapServers = kafkaContainer.getHost() + ":" + kafkaContainer.getMappedPort(9092);
    producer = new KafkaEventProducer(bootstrapServers);
  }
  @Test
  void shouldFetchProducedDataThroughRESTApi() {
    //given
    producer.send( < event1 >);
    producer.send( < event2 >);
    producer.send( < event3 >);
    //when
    var response = given().port(serviceContainer.getMappedPort(8080))
        .auth().basic(USERNAME, PASSWORD)
        .accept(ContentType.JSON)
        .when()
        .get(String.format( < base_url >, < path_param1 >))
        .then()
        .assertThat()
        .statusCode(200)
        .and()
        .extract()
        .response();
    //then
    List<Object> entities = mapper.readValue(response.asString(), Object.class);
    assertEquals(3, entities.size());
    entities.forEach(entity -> assertEntity(entity));
  }

Conclusion

TestContainers’s official website has excellent documentation and many modules for different test purposes. 

I highly encourage you to go through the official documents to understand them in detail and challenge yourself after this tutorial.

Feel free to check out my other related posts;

Leave a Reply

Your email address will not be published.

#

“Exceptionly was such a fast and efficient process. Complete game changer for hiring remote.”

Liam Martin
Co-Founder at TimeDoctor

#

“Great service, outstanding speed, stunning quality.”

Andy Tryba
CEO at Gigster

Is Exceptionly an outsourcing business?

No. We encourage our clients to do direct hires. We provide payroll consultancy and services in a 100% transparent way if our clients are new to remote hiring.

Is Exceptionly a database of candidates?

Not really, data is only a small part of our business. While we own over 2 million publicly available software engineer data evaluated, our power lies in hands-on testing and engineer-to-engineer technical interviews.

Is Exceptionly expensive?

Exceptionly is more cost-efficient than any other serious competitor who owns a decent technical testing funnel. If you compare Exceptionly with HR agencies who only do resume screening before endorsement, yes, it is expensive.

Who is the founder of Exceptionly?

Sinan Ata is the founder of Exceptionly, who previously built two global technical talent acquisition enginess and tested over 2.5 million software engineers in his career.

Still have questions? Contact us.

Ready to meet top-notch engineers?

Let's get your talent pipelines up and running!

HIRE WITH EXCEPTIONLY

No hidden fees.