Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

Partner – Microsoft – NPI EA (cat = Baeldung)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, visit the documentation page.

You can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Microsoft – NPI EA (cat= Spring Boot)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, you can get started over on the documentation page.

And, you can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Orkes – NPI EA (cat=Spring)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

Partner – Orkes – NPI EA (tag=Microservices)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

eBook – Guide Spring Cloud – NPI EA (cat=Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

eBook – Mockito – NPI EA (tag = Mockito)
announcement - icon

Mocking is an essential part of unit testing, and the Mockito library makes it easy to write clean and intuitive unit tests for your Java code.

Get started with mocking and improve your application tests using our Mockito guide:

Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Reactive – NPI EA (cat=Reactive)
announcement - icon

Spring 5 added support for reactive programming with the Spring WebFlux module, which has been improved upon ever since. Get started with the Reactor project basics and reactive programming in Spring Boot:

>> Join Pro and download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Jackson – NPI EA (cat=Jackson)
announcement - icon

Do JSON right with Jackson

Download the E-book

eBook – HTTP Client – NPI EA (cat=Http Client-Side)
announcement - icon

Get the most out of the Apache HTTP Client

Download the E-book

eBook – Maven – NPI EA (cat = Maven)
announcement - icon

Get Started with Apache Maven:

Download the E-book

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

eBook – RwS – NPI EA (cat=Spring MVC)
announcement - icon

Building a REST API with Spring?

Download the E-book

Course – LS – NPI EA (cat=Jackson)
announcement - icon

Get started with Spring and Spring Boot, through the Learn Spring course:

>> LEARN SPRING
Course – RWSB – NPI EA (cat=REST)
announcement - icon

Explore Spring Boot 3 and Spring 6 in-depth through building a full REST API with the framework:

>> The New “REST With Spring Boot”

Course – LSS – NPI EA (cat=Spring Security)
announcement - icon

Yes, Spring Security can be complex, from the more advanced functionality within the Core to the deep OAuth support in the framework.

I built the security material as two full courses - Core and OAuth, to get practical with these more complex scenarios. We explore when and how to use each feature and code through it on the backing project.

You can explore the course here:

>> Learn Spring Security

Course – All Access – NPI EA (cat= Spring)
announcement - icon

All Access is finally out, with all of my Spring courses. Learn JUnit is out as well, and Learn Maven is coming fast. And, of course, quite a bit more affordable. Finally.

>> GET THE COURSE
Course – LSD – NPI EA (tag=Spring Data JPA)
announcement - icon

Spring Data JPA is a great way to handle the complexity of JPA with the powerful simplicity of Spring Boot.

Get started with Spring Data JPA through the guided reference course:

>> CHECK OUT THE COURSE

Partner – LambdaTest – NPI EA (cat=Testing)
announcement - icon

End-to-end testing is a very useful method to make sure that your application works as intended. This highlights issues in the overall functionality of the software, that the unit and integration test stages may miss.

Playwright is an easy-to-use, but powerful tool that automates end-to-end testing, and supports all modern browsers and platforms.

When coupled with LambdaTest (an AI-powered cloud-based test execution platform) it can be further scaled to run the Playwright scripts in parallel across 3000+ browser and device combinations:

>> Automated End-to-End Testing With Playwright

Course – Spring Sale 2025 – NPI EA (cat= Baeldung)
announcement - icon

Yes, we're now running our Spring Sale. All Courses are 25% off until 26th May, 2025:

>> EXPLORE ACCESS NOW

Course – Spring Sale 2025 – NPI (cat=Baeldung)
announcement - icon

Yes, we're now running our Spring Sale. All Courses are 25% off until 26th May, 2025:

>> EXPLORE ACCESS NOW

eBook – Guide Spring Cloud – NPI (cat=Cloud/Spring Cloud)
announcement - icon

Let's get started with a Microservice Architecture with Spring Cloud:

>> Join Pro and download the eBook

1. Overview

Spring Cloud Stream is a framework built on top of Spring Boot and Spring Integration that helps in creating event-driven or message-driven microservices.

In this article, we’ll introduce concepts and constructs of Spring Cloud Stream with some simple examples.

2. Maven Dependencies

To get started, we’ll need to add the Spring Cloud Starter Stream with the broker RabbitMQ Maven dependency as messaging-middleware to our pom.xml:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

And we’ll add the spring-cloud-stream-test-binder dependency to enable JUnit support as well:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-binder</artifactId>
    <scope>test</scope>
</dependency>

Lastly, we will use spring-cloud-dependencies for dependency management and select the appropriate version based on the compatibility matrix:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2024.0.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

3. Main Concepts

Microservices architecture follows the “smart endpoints and dumb pipes” principle. Communication between endpoints is driven by messaging-middleware parties like RabbitMQ or Apache Kafka. Services communicate by publishing domain events via these endpoints or channels.

Let’s walk through the concepts that make up the Spring Cloud Stream framework, along with the essential paradigms that we must be aware of to build message-driven services.

3.1. Constructs

By adding the spring-cloud-stream dependencies to the classpath, we automatically connect to a message broker through a Spring Cloud Stream “binder”. Additionally, we can define a Java function as a Spring bean to process incoming messages:

@SpringBootApplication
public class LogEnricherApplication {

    public static void main(String[] args) {
        SpringApplication.run(LogEnricherApplication.class, args);
    }

    @Bean
    public Function<String, String> enrichLogMessage() {
        return value -> "[%s] - %s".formatted("Baeldung", value);
    }
}

Let’s take a look at the definition of all these concepts:

  • Bindings — a collection of Java functions to process, transform, or send messages
  • Binder — messaging-middleware implementation such as Kafka or RabbitMQ
  • Channel — represents the communication pipe between messaging-middleware and the application
  • Message Schemas — used for serialization and deserialization of messages, these schemas can be statically read from a location or loaded dynamically, supporting the evolution of domain object types
  • StreamListeners — message-handling methods in beans that will be automatically invoked on a message from the channel after the MessageConverter does the serialization/deserialization between middleware-specific events and domain object types or POJOs

3.2. Communication Patterns

Messages designated to destinations are delivered by the Publish-Subscribe messaging pattern. Publishers categorize messages into topics, each identified by a name. Subscribers express interest in one or more topics. The middleware filters the messages, delivering those interesting topics to the subscribers.

Now, the subscribers could be grouped. A consumer group is a set of subscribers or consumers, identified by a group id, within which messages from a topic or topic’s partition are delivered in a load-balanced manner.

4. Programming Model

This section describes the basics of building Spring Cloud Stream applications.

4.1. Functional Testing

The test support allows us to leverage a binder implementation to interact with our Spring Cloud Stream channels:

@EnableTestBinder
@SpringBootTest
class LogEnricherApplicationUnitTest {

    @Autowired
    private InputDestination input;

    @Autowired
    private OutputDestination output;

    @Test
    void whenSendingLogMessage_thenMessageIsEnrichedWithPrefix() {
        // ...
    }
}

Let’s send a message to the above enrichLogMessage service and check whether the response contains the text “Baeldung” prefix:

@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello world").build(), "enrichLogMessage-in-0");

    Message<byte[]> message = output.receive(1000L, "enrichLogMessage-out-0");

    assertThat(message.getPayload())
      .asString()
      .isEqualTo("[Baeldung] - hello world");
}

As we can see, input bindings are named using the function name followed by “-in-“ and an index. Similarly, output bindings use the “-out-“ suffix, and an index. The “in” and “out” indicate the type of binding, and the index is usually 0 for single input and output functions, relevant only for functions with multiple inputs or outputs.

4.2. Binding Destinations

We can use the Spring Cloud Stream configuration to map the bindings to custom destinations – such as topics or queue names. For instance, let’s update the application.yml and use the queues “queue.log.messages” and “queue.pretty.log.messages” as input and output for our service:

spring:
  cloud:
    stream:
      bindings:
        enrichLogMessage-in-0:
          destination: queue.log.messages
        enrichLogMessage-out-0:
          destination: queue.pretty.log.messages

Now, we can update our test and start using the destination names instead of the binding names:

@Test
void whenSendingLogMessage_thenItsEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello world").build(), "queue.log.messages");

    Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");

    assertThat(message.getPayload())
      .asString()
      .isEqualTo("[Baeldung] - hello world");
}

As a result, our application will listen to “queue.log.messages”, process the messages, and publish the results to “queue.pretty.log.messages”.

4.3. Event Routing

In Spring Cloud Stream, Event Routing involves managing the flow of events between sources and destinations. Its key purpose is to route events to a specific subscriber or a designated destination based on the event producer.

For instance, suppose we want to enrich only log messages longer than ten characters, leaving shorter ones unchanged. To achieve this, let’s define another function that returns a Message<String>, allowing us to customize the message metadata:

@Bean
public Function<String, Message<String>> processLogs() {
    return log -> {
        boolean shouldBeEnriched = log.length() > 10;
        // ...
    };
}

After that, we’ll specify the new destination in the message header using the spring.cloud.stream.sendto.destination key. In our case, if the log needs enrichment, we will route it to the enrichLogMessage-in-0 binding. Otherwise, we’ll send the message directly to the output queue:

@Bean
public Function<String, Message<String>> processLogs() {
    return log -> {
        boolean shouldBeEnriched = log.length() > 10;
        String destination = shouldBeEnriched ? "enrichLogMessage-in-0" : "queue.pretty.log.messages";

        return MessageBuilder.withPayload(log)
          .setHeader("spring.cloud.stream.sendto.destination", destination)
          .build();
    };
}

Now, we need to update the configuration to enable event routing. Since we have more than one function declared as a bean, let’s also include both via the spring.cloud.function.definition property:

spring:
  cloud:
    function:
     definition: enrichLogMessage;processLogs
    stream:
      function.routing.enabled: true
      bindings: 
        # ...

That’s it! Let’s test our code – if we send the “hello world” string to the processLog binding, we’ll expect it to be updated and published to “queue.pretty.log.message”:

@Test
void whenProcessingLongLogMessage_thenItsEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello world").build(), "processLogs-in-0");

    Message<byte[]> message = output.receive(1000L, "queue.pretty.log.messages");

    assertThat(message.getPayload())
      .asString()
      .isEqualTo("[Baeldung] - hello world");
}

On the other hand, if we send a string that is less than ten characters log, we’ll expect it to be published to “queue.pretty.log.message” without any modification:

@Test
void whenProcessingShortLogMessage_thenItsNotEnrichedWithPrefix() {
    input.send(MessageBuilder.withPayload("hello").build(), "processLogs-in-0");

    Message<byte[]> messgae= output.receive(1000L, "queue.pretty.log.messages");

    assertThat(messgae.getPayload())
      .asString()
      .isEqualTo("hello");
}

5. Setup

Let’s set up the application that will process the message from the RabbitMQ broker.

5.1. Binder Configuration

As previously discussed, we can add the binder library for RabbitMQ to the classpath by including this dependency. However, if no binder implementation is provided, Spring will use direct message communication between the channels.

5.2. RabbitMQ Configuration

To configure the example in section 3.1 to use the RabbitMQ binder, we need to update the application.yml located at src/main/resources:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: <host>
                port: 5672
                username: <username>
                password: <password>
                virtual-host: /

The input binding will use the exchange called queue.log.messages, and the output binding will use the exchange “queue.pretty.log.messages”. Both bindings will use the binder called local_rabbit.

Note that we don’t need to create the RabbitMQ exchanges or queues in advance. When running the application, both exchanges are automatically created.

To test the application, we can use the RabbitMQ management site to publish a message. In the Publish Message panel of the exchange “queue.log.messages”, we need to enter the request in JSON format.

5.3. Customizing Message Conversion

For this code example, let’s add another binding – we can call it highlightLogs:

@Bean
Function<LogMessage, String> highlightLogs() {
    return logMsg -> logMsg.message().toUpperCase();
}

This time, our function takes a Java object called LogMessage as input:

record LogMessage(String message) {
}

For such use cases, Spring Cloud Stream allows us to apply custom message conversion for specific content types. Let’s define a custom message converter to be used to deserialize LogMessage objects when the contentType header is set to text/plain:

@Component
class TextPlainMessageConverter extends AbstractMessageConverter {

    public TextPlainMessageConverter() {
        super(new MimeType("text", "plain"));
    }

    @Override
    protected boolean supports(Class<?> clazz) {
        return (LogMessage.class == clazz);
    }

    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object payload = message.getPayload();
        String text = payload instanceof String ? (String) payload : new String((byte[]) payload);
        return new LogMessage(text);
    }
}

Finally, we can test our converter by sending a message with the text/plain content type. When we run the test, we expect the message to be processed and the uppercase string to be returned as the binding output:

@Test
void whenHighlightingLogMessage_thenItsTransformedToUppercase() {
    Message<String> msgIn = MessageBuilder.withPayload("hello")
            .setHeader("contentType", "text/plain")
            .build();
    input.send(msgIn, "highlightLogs-in-0");

    Message<byte[]> msgOut = output.receive(1000L, "highlightLogs-out-0");
    assertThat(msgOut.getPayload())
            .asString()
            .isEqualTo("HELLO");
}

5.4. Consumer Groups

When running multiple instances of our application, every time there is a new message in an input channel, all subscribers will be notified.

Most of the time, we need the message to be processed only once. Spring Cloud Stream implements this behavior via consumer groups.

To enable this behavior, each consumer binding can use the spring.cloud.stream.bindings.<CHANNEL>.group property to specify a group name:

spring:
  cloud:
    stream:
      bindings:
        enrichLogMessage-in-0:
          destination: queue.log.messages
          group: test-group
        # ...

6. Message-Driven Microservices

In this section, we introduce all the required features for running our Spring Cloud Stream applications in a microservices context.

6.1. Scaling Up

When multiple applications are running, it’s important to ensure the data is split properly across consumers. To do so, Spring Cloud Stream provides two properties:

  • spring.cloud.stream.instanceCount — number of running applications
  • spring.cloud.stream.instanceIndex — index of the current application

For example, if we’ve deployed two instances of the above MyLoggerServiceApplication application, the property spring.cloud.stream.instanceCount should be 2 for both applications, and the property spring.cloud.stream.instanceIndex should be 0 and 1 respectively.

These properties are automatically set if we deploy the Spring Cloud Stream applications using Spring Data Flow as described in this article.

6.2. Partitioning

The domain events could be Partitioned messages. This helps when we are scaling up the storage and improving application performance.

The domain event usually has a partition key so that it ends up in the same partition with related messages.

Let’s say that we want the log messages to be partitioned by the first letter in the message, which would be the partition key, and grouped into two partitions.

There would be one partition for the log messages that start with A-M and another partition for N-Z. This can be configured using two properties:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression — the expression to partition the payloads
  • spring.cloud.stream.bindings.output.producer.partitionCount — the number of groups

Sometimes the expression to partition is too complex to write it in only one line. For these cases, we can write our custom partition strategy using the property spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Health Indicator

In a microservices context, we also need to detect when a service is down or starts failing. Spring Cloud Stream provides the property management.health.binders.enabled to enable the health indicators for binders.

When running the application, we can query the health status at http://<host>:<port>/health.

7. Conclusion

In this tutorial, we presented the main concepts of Spring Cloud Stream and showed how to use it through some simple examples over RabbitMQ. More info about Spring Cloud Stream can be found here.

The code backing this article is available on GitHub. Once you're logged in as a Baeldung Pro Member, start learning and coding on the project.
Baeldung Pro – NPI EA (cat = Baeldung)
announcement - icon

Baeldung Pro comes with both absolutely No-Ads as well as finally with Dark Mode, for a clean learning experience:

>> Explore a clean Baeldung

Once the early-adopter seats are all used, the price will go up and stay at $33/year.

Partner – Microsoft – NPI EA (cat = Spring Boot)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, visit the documentation page.

You can also ask questions and leave feedback on the Azure Container Apps GitHub page.

Partner – Orkes – NPI EA (cat = Spring)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

Partner – Orkes – NPI EA (tag = Microservices)
announcement - icon

Modern software architecture is often broken. Slow delivery leads to missed opportunities, innovation is stalled due to architectural complexities, and engineering resources are exceedingly expensive.

Orkes is the leading workflow orchestration platform built to enable teams to transform the way they develop, connect, and deploy applications, microservices, AI agents, and more.

With Orkes Conductor managed through Orkes Cloud, developers can focus on building mission critical applications without worrying about infrastructure maintenance to meet goals and, simply put, taking new products live faster and reducing total cost of ownership.

Try a 14-Day Free Trial of Orkes Conductor today.

eBook – HTTP Client – NPI EA (cat=HTTP Client-Side)
announcement - icon

The Apache HTTP Client is a very robust library, suitable for both simple and advanced use cases when testing HTTP endpoints. Check out our guide covering basic request and response handling, as well as security, cookies, timeouts, and more:

>> Download the eBook

eBook – Java Concurrency – NPI EA (cat=Java Concurrency)
announcement - icon

Handling concurrency in an application can be a tricky process with many potential pitfalls. A solid grasp of the fundamentals will go a long way to help minimize these issues.

Get started with understanding multi-threaded applications with our Java Concurrency guide:

>> Download the eBook

eBook – Java Streams – NPI EA (cat=Java Streams)
announcement - icon

Since its introduction in Java 8, the Stream API has become a staple of Java development. The basic operations like iterating, filtering, mapping sequences of elements are deceptively simple to use.

But these can also be overused and fall into some common pitfalls.

To get a better understanding on how Streams work and how to combine them with other language features, check out our guide to Java Streams:

>> Join Pro and download the eBook

eBook – Persistence – NPI EA (cat=Persistence)
announcement - icon

Working on getting your persistence layer right with Spring?

Explore the eBook

Course – LS – NPI EA (cat=REST)

announcement - icon

Get started with Spring Boot and with core Spring, through the Learn Spring course:

>> CHECK OUT THE COURSE

Course – Spring Sale 2025 – NPI EA (cat= Baeldung)
announcement - icon

Yes, we're now running our Spring Sale. All Courses are 25% off until 26th May, 2025:

>> EXPLORE ACCESS NOW

Course – Spring Sale 2025 – NPI (All)
announcement - icon

Yes, we're now running our Spring Sale. All Courses are 25% off until 26th May, 2025:

>> EXPLORE ACCESS NOW

Partner – Microsoft – NPI (cat=Spring)
announcement - icon

Azure Container Apps is a fully managed serverless container service that enables you to build and deploy modern, cloud-native Java applications and microservices at scale. It offers a simplified developer experience while providing the flexibility and portability of containers.

Of course, Azure Container Apps has really solid support for our ecosystem, from a number of build options, managed Java components, native metrics, dynamic logger, and quite a bit more.

To learn more about Java features on Azure Container Apps, visit the documentation page.

You can also ask questions and leave feedback on the Azure Container Apps GitHub page.

eBook Jackson – NPI EA – 3 (cat = Jackson)
eBook – eBook Guide Spring Cloud – NPI (cat=Cloud/Spring Cloud)