Horje
Spring Cloud Stream - Functional and Reactive

Spring Cloud Stream is a Spring framework that simplifies creating event-driven microservices. It uses functional programming constructs for message processing logic, often using annotated methods within a class and reactive programming tools like Reactor for asynchronous and reactive processing.

Maven Dependencies

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>3.1.3</version>
</dependency>

Supplier, Function, and Consumer

SCSt treats any bean of type Supplier, Function, or Consumer as a message handler (Function or Consumer) or message source (Supplier), or any bean that may be mapped to Supplier, Function, or Consumer (e.g., a POJO function, Kotlin lambdas, and so forth). Input and output bindings are automatically produced using the <function-name>-<in/out>-<index> naming standard, depending on the type of functional strategy that is being utilized.

Functional:

Java

@SpringBootApplication
public class GFG  {
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

Reactive:

Java

@SpringBootApplication
public class GFG  {
    @Bean
    public Function<Flux<String>, Flux<String>> uppercase() {
        return flux -> flux.map(value -> value.toUpperCase());
    }
}

Construction of Spring Cloud Stream

The program is set up to bind the channels INPUT and OUTPUT specified within the interface Processor by the annotation @EnableBinding. It is possible to set up both channels to utilize a certain message middleware or binder.

Java

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }
  
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public LogMessage enrichLogMessage(LogMessage log) {
        return new LogMessage(String.format("[1]: %s", log.getMessage()));
    }
}

Functional Testing

A binder implementation serving as the test support enables channel interaction and message inspection. Send a message to the mentioned enrichLogMessage service and see if the answer includes the string “[1]:” at the start of the message.

Java

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationTests {
  
    @Autowired
    private Processor pipe;
  
    @Autowired
    private MessageCollector messageCollector;
  
    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input()
          .send(MessageBuilder.withPayload(new LogMessage("This is the message"))
          .build());
  
        Object payload = messageCollector.forChannel(pipe.output())
          .poll()
          .getPayload();
  
        assertEquals("[1]: This is the message", payload.toString());
    }
}

Reactive Functions support

You may use the reactive programming approach in the implementation of Supplier, Function, or Consumer with little effort because Spring Cloud Function is built on top of Project Reactor.

Java

@EnableAutoConfiguration
@EnableBinding(Processor.class)
public static class SinkFromConsumer {
    public static void main(String[] args) {
        SpringApplication.run(SinkFromConsumer.class
                              "--spring.cloud.stream.function.definition=reactiveUpperCase");
    }
    @Bean
    public Function<Flux<String>, Flux<String>> reactiveUpperCase() {
        return flux -> flux.map(val -> val.toUpperCase());
    }
}

Conclusion

So, this is how Spring Cloud Stream – functional and reactive. Spring framework that simplifies creating event-driven microservices. It uses functional programming constructs for message processing logic, often using annotated methods within a class and reactive programming tools like Reactor for asynchronous and reactive processing.




Reffered: https://www.geeksforgeeks.org


Advance Java

Related
Spring Data JPA vs Spring JDBC Template Spring Data JPA vs Spring JDBC Template
Struts 2 Custom Validation - Workflow Interceptor Struts 2 Custom Validation - Workflow Interceptor
Spring Boot - Handling Url Encoded Form Spring Boot - Handling Url Encoded Form
Hibernate Mapping Set using XML Hibernate Mapping Set using XML
Spring MVC - ResourceBundleViewResolver Configuration Spring MVC - ResourceBundleViewResolver Configuration

Type:
Geek
Category:
Coding
Sub Category:
Tutorial
Uploaded by:
Admin
Views:
11