ETL (Extract, Transform, Load) is the fundamental process in data warehousing and analytics. This involves extracting the data from various sources and then transforming it to fit operational needs, lastly loading it into the data storage system. Spring Cloud Data Flow (SCDF) is the microservice-based toolkit for building real-time data integration and processing pipelines. SCDF allows us to orchestrate the data pipelines, manage and monitor the data flows, and handle the data processing with ease.
In this article, we will create a simple ETL pipeline using Spring Cloud Data Flow which will extract the data, transform it, and load it.
Prerequisites:- Good Understanding of the Spring Boot and Spring Cloud.
- Basic Knowledge of the Apache Kafka.
- JDK and IntelliJ idea installed in your local system.
- Maven for building dependency management.
What is ETL?ETL stands for the Extract, Transform, Load. It is the process used in the data warehousing and data integration to move the data from the various sources to the centralized data warehouse or data store. The ETL process involves three main steps:
- Extract: This can be used to retrieving the data from various source systems.
- Transform: It can convert the extracted data into the format suitable for analysis and reporting.
- Load: It can load the transformed the data into the target database or the data warehouse.
What is Spring Cloud Data Flow?Spring Cloud Data Flow is the toolkit for the building data integration and real-time processing pipelines. It allows the developers to create and orchestrate the complex data workflows using the microservices. SCDF can supports running the applications as tasks or streams:
- Task: Short lived microservices that perform the specific function and then terminate.
- Streams: Continuous data flows that process data in real time.
How ETL Works with Spring Cloud Data Flow?In an ETL pipeline using Spring Cloud Data Flow, each step is implemented as a Spring Boot application. These applications are connected using messaging middleware like Apache Kafka.
Implementation of ETL with Spring Cloud Data FlowStep 1: Create a Spring Boot ProjectCreate a new Spring Boot project using IntelliJ IDEA. While creating the project, choose the following options:

Step 2: Add the DependenciesAdd the following dependencies to your pom.xml file:

After project creation done, then the folder structure in the IDE will look like the below image.

Step 3: Configure the Application PropertiesOpen the application.properties file and add the ETL configuration of the Spring Boot application.
spring.application.name=etl-spring-cloud-data-flow
# Extract application properties
spring.cloud.function.definition=extractData;transformData;loadData
# Kafka binder properties
spring.cloud.stream.bindings.extractData-out-0.destination=etl-topic
spring.cloud.stream.bindings.transformData-in-0.destination=etl-topic
spring.cloud.stream.bindings.transformData-out-0.destination=etl-transformed-topic
spring.cloud.stream.bindings.loadData-in-0.destination=etl-transformed-topic
# Kafka broker properties
spring.cloud.stream.kafka.binder.brokers=localhost:9092
Step 4: Create the ExtractService ClassCreate the ExtractService class in the extract package:
Go to src > main > java > com.gfg.etlspringclouddataflow > extract > ExtractService and put the below code.
Java
package com.gfg.etlspringclouddataflow.extract;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Supplier;
/**
* Configuration class for the extract service.
*/
@Configuration
public class ExtractService {
/**
* Bean definition for extracting data.
* This bean provides a Supplier that returns a string with the extracted data and the current timestamp.
*
* @return Supplier of a string containing the extracted data and timestamp.
*/
@Bean
public Supplier<String> extractData() {
return () -> "Extracted data at " + System.currentTimeMillis();
}
}
This defines a Spring configuration class, ExtractService , which contains a bean method, extractData . This method provides a Supplier that returns a string with “Extracted data” followed by the current timestamp. The @Bean annotation ensures that this Supplier is registered as a Spring bean.
Step 5: Create the LoadService ClassCreate the LoadService class in the load package:
Go to src > main > java > com.gfg.etlspringclouddataflow > load> LoadService and put the below code.
Java
package com.gfg.etlspringclouddataflow.load;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Consumer;
/**
* Configuration class for the load service.
*/
@Configuration
public class LoadService {
/**
* Bean definition for loading data.
* This bean provides a Consumer that prints the loaded data to the console.
*
* @return Consumer of a string that prints the loaded data.
*/
@Bean
public Consumer<String> loadData() {
return data -> System.out.println("Loaded data: " + data);
}
}
This defines a Spring configuration class, LoadService , with a bean method, loadData . This method provides a Consumer that prints the received data to the console, and the @Bean annotation ensures it is registered as a Spring bean.
Step 6: Create the TransformService ClassCreate the TransformService class in the transform package:
Go to src > main > java > com.gfg.etlspringclouddataflow > transform > TransformService and put the below code.
Java
package com.gfg.etlspringclouddataflow.transform;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.function.Function;
/**
* Configuration class for the transform service.
*/
@Configuration
public class TransformService {
/**
* Bean definition for transforming data.
* This bean provides a Function that converts the input string data to uppercase.
*
* @return Function that transforms the input string to uppercase.
*/
@Bean
public Function<String, String> transformData() {
return data -> data.toUpperCase();
}
}
This defines a Spring configuration class, TransformService , with a bean method, transformData . This method provides a Function that converts the input string to uppercase, and the @Bean annotation ensures it is registered as a Spring bean.
Step 7: Main ClassNo changes are required in the main class.
Java
package com.gfg.etlspringclouddataflow;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class EtlSpringCloudDataFlowApplication {
public static void main(String[] args) {
SpringApplication.run(EtlSpringCloudDataFlowApplication.class, args);
}
}
pom.xml file:
XML
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.gfg</groupId>
<artifactId>etl-spring-cloud-data-flow</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>etl-spring-cloud-data-flow</name>
<description>etl-spring-cloud-data-flow</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2023.0.2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
Step 8: Run the ApplicationNow run the application and it will start at port 8080.
 ConclusionThis example demonstrates a simple ETL pipeline using Spring Cloud Data Flow with the Kafka as the message broker service. The ExractService extracts the data, the TransformService transforms it and the LoadService load it. Each service defined as the Spring Cloud Stream component and communicates via Kafka topics. This setup can be expanded to handle the more complex ETL processes and data integration scenarios.
|