Horje
ETL with Spring Cloud Data Flow

ETL (Extract, Transform, Load) is the fundamental process in data warehousing and analytics. This involves extracting the data from various sources and then transforming it to fit operational needs, lastly loading it into the data storage system. Spring Cloud Data Flow (SCDF) is the microservice-based toolkit for building real-time data integration and processing pipelines. SCDF allows us to orchestrate the data pipelines, manage and monitor the data flows, and handle the data processing with ease.

In this article, we will create a simple ETL pipeline using Spring Cloud Data Flow which will extract the data, transform it, and load it.

Prerequisites:

  • Good Understanding of the Spring Boot and Spring Cloud.
  • Basic Knowledge of the Apache Kafka.
  • JDK and IntelliJ idea installed in your local system.
  • Maven for building dependency management.

What is ETL?

ETL stands for the Extract, Transform, Load. It is the process used in the data warehousing and data integration to move the data from the various sources to the centralized data warehouse or data store. The ETL process involves three main steps:

  1. Extract: This can be used to retrieving the data from various source systems.
  2. Transform: It can convert the extracted data into the format suitable for analysis and reporting.
  3. Load: It can load the transformed the data into the target database or the data warehouse.

What is Spring Cloud Data Flow?

Spring Cloud Data Flow is the toolkit for the building data integration and real-time processing pipelines. It allows the developers to create and orchestrate the complex data workflows using the microservices. SCDF can supports running the applications as tasks or streams:

  • Task: Short lived microservices that perform the specific function and then terminate.
  • Streams: Continuous data flows that process data in real time.

How ETL Works with Spring Cloud Data Flow?

In an ETL pipeline using Spring Cloud Data Flow, each step is implemented as a Spring Boot application. These applications are connected using messaging middleware like Apache Kafka.

Implementation of ETL with Spring Cloud Data Flow

Step 1: Create a Spring Boot Project

Create a new Spring Boot project using IntelliJ IDEA. While creating the project, choose the following options:

Spring Boot Project Creation


Step 2: Add the Dependencies

Add the following dependencies to your pom.xml file:

Dependencies


After project creation done, then the folder structure in the IDE will look like the below image.

Folder Structure


Step 3: Configure the Application Properties

Open the application.properties file and add the ETL configuration of the Spring Boot application.

spring.application.name=etl-spring-cloud-data-flow

# Extract application properties
spring.cloud.function.definition=extractData;transformData;loadData

# Kafka binder properties
spring.cloud.stream.bindings.extractData-out-0.destination=etl-topic
spring.cloud.stream.bindings.transformData-in-0.destination=etl-topic
spring.cloud.stream.bindings.transformData-out-0.destination=etl-transformed-topic
spring.cloud.stream.bindings.loadData-in-0.destination=etl-transformed-topic

# Kafka broker properties
spring.cloud.stream.kafka.binder.brokers=localhost:9092


Step 4: Create the ExtractService Class

Create the ExtractService class in the extract package:

Go to src > main > java > com.gfg.etlspringclouddataflow > extract > ExtractService and put the below code.

Java
package com.gfg.etlspringclouddataflow.extract;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Supplier;

/**
 * Configuration class for the extract service.
 */
@Configuration
public class ExtractService {

    /**
     * Bean definition for extracting data.
     * This bean provides a Supplier that returns a string with the extracted data and the current timestamp.
     *
     * @return Supplier of a string containing the extracted data and timestamp.
     */
    @Bean
    public Supplier<String> extractData() {
        return () -> "Extracted data at " + System.currentTimeMillis();
    }
}

This defines a Spring configuration class, ExtractService, which contains a bean method, extractData. This method provides a Supplier that returns a string with “Extracted data” followed by the current timestamp. The @Bean annotation ensures that this Supplier is registered as a Spring bean.

Step 5: Create the LoadService Class

Create the LoadService class in the load package:

Go to src > main > java > com.gfg.etlspringclouddataflow > load> LoadService and put the below code.

Java
package com.gfg.etlspringclouddataflow.load;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Consumer;

/**
 * Configuration class for the load service.
 */
@Configuration
public class LoadService {

    /**
     * Bean definition for loading data.
     * This bean provides a Consumer that prints the loaded data to the console.
     *
     * @return Consumer of a string that prints the loaded data.
     */
    @Bean
    public Consumer<String> loadData() {
        return data -> System.out.println("Loaded data: " + data);
    }
}

This defines a Spring configuration class, LoadService, with a bean method, loadData. This method provides a Consumer that prints the received data to the console, and the @Bean annotation ensures it is registered as a Spring bean.

Step 6: Create the TransformService Class

Create the TransformService class in the transform package:

Go to src > main > java > com.gfg.etlspringclouddataflow > transform > TransformService and put the below code.

Java
package com.gfg.etlspringclouddataflow.transform;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.function.Function;

/**
 * Configuration class for the transform service.
 */
@Configuration
public class TransformService {

    /**
     * Bean definition for transforming data.
     * This bean provides a Function that converts the input string data to uppercase.
     *
     * @return Function that transforms the input string to uppercase.
     */
    @Bean
    public Function<String, String> transformData() {
        return data -> data.toUpperCase();
    }

}

This defines a Spring configuration class, TransformService, with a bean method, transformData. This method provides a Function that converts the input string to uppercase, and the @Bean annotation ensures it is registered as a Spring bean.

Step 7: Main Class

No changes are required in the main class.

Java
package com.gfg.etlspringclouddataflow;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class EtlSpringCloudDataFlowApplication {

    public static void main(String[] args) {
        SpringApplication.run(EtlSpringCloudDataFlowApplication.class, args);
    }

}


pom.xml file:

XML
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.1</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.gfg</groupId>
    <artifactId>etl-spring-cloud-data-flow</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>etl-spring-cloud-data-flow</name>
    <description>etl-spring-cloud-data-flow</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.2</spring-cloud.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-binder</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>


Step 8: Run the Application

Now run the application and it will start at port 8080.

Application Started

Conclusion

This example demonstrates a simple ETL pipeline using Spring Cloud Data Flow with the Kafka as the message broker service. The ExractService extracts the data, the TransformService transforms it and the LoadService load it. Each service defined as the Spring Cloud Stream component and communicates via Kafka topics. This setup can be expanded to handle the more complex ETL processes and data integration scenarios.




Reffered: https://www.geeksforgeeks.org


Advance Java

Related
Migrating a Spring Boot Application from Spring Security 5 to Spring Security 6 Migrating a Spring Boot Application from Spring Security 5 to Spring Security 6
Spring Security - Updating Your Password Spring Security - Updating Your Password
Resolving Failed to Configure a DataSource Error in Spring Boot Application Resolving Failed to Configure a DataSource Error in Spring Boot Application
Introduction to Microservice Circuit Breakers in Spring Boot Introduction to Microservice Circuit Breakers in Spring Boot
Spring Security - Login for a Spring Web App – Error Handling and Localization Spring Security - Login for a Spring Web App – Error Handling and Localization

Type:
Geek
Category:
Coding
Sub Category:
Tutorial
Uploaded by:
Admin
Views:
20