• Software
  • Leadership
  • Agile
  • Events
  • Other Topics
    • Finance
    • Robotics & AI
    • System Administration
    • Books
    • Life Experiences
    • Environment
  • Write and Earn
  • About Us
    • About Us
    • Our Contributors
    • Contact Us
    • Article Submission Guidelines
    • Logo demystified
  • Follow @MeJaneve
    Janeve.Me
  • Categories

    Browse through following categories for articles written by professionals.
    • Agile
      4
    • Books
      5
    • Environment
      1
    • Events and Conferences
      7
    • Featured
      9
    • Finance
      1
    • Leadership
      6
    • Life Experiences
      8
    • Robotics & AI
      1
    • Software
      30
    • System Administration
      2
  • Software
  • Leadership
  • Agile
  • Events
  • Other Topics
    • Finance
    • Robotics & AI
    • System Administration
    • Books
    • Life Experiences
    • Environment
  • Write and Earn
  • About Us
    • About Us
    • Our Contributors
    • Contact Us
    • Article Submission Guidelines
    • Logo demystified
Home » Featured Software

Java Asynchronous Programming using CompletableFuture – Part 1

Janeve George Posted On February 5, 2022
0
1.4K Views


0
Shares
  • Share On Facebook
  • Tweet It

What is Asynchronous Programming?

To put it in simple words:

“Synchronous programming is when the program is executed one statement followed by another in the sequence it appears in the code.”

vs

“Asynchronous programming is when the program is executed one statement followed by another in the code, but parts of the code exist which would branch out and execute in parallel with the main flow.”

Asynchronous programming is typically used when there are several chunks of mutually exclusive code (no shared mutable data or dependent data) in your program that take a lot of time (database queries/file operation/API calls) and prevent efficient use of your hardware resources (blocking code).

Why do we need Asynchronous Programming?

Today, we have machines that have multiple cores and greater memory capacities. Asynchronous programming helps the developer to take maximum advantage of these compute powers of the hardware.

Let us take an example scenario of an e-commerce portal. Assume, you are given the following payload about an order.

{
  "order_id": 123456,
  "customer_id": 245123,
  "invoice_id": 432121323
}

Let us assume that you need to implement a function that would fetch the following information:

  • additional order information
  • customer information
  • invoice details
    • product details for each line item in the invoice

The synchronous flow would look like this:

In synchronous programming, each fetchY function would be blocked on the previous fetchX function. For instance, fetchCustomer(customer_id) will not start until fetchOrder(order_id) is completed. The system might be idle while fetchOrder(order_id) waits for the information from the database. Here, fetchOrder(order_id) blocks fetchCustomer(customer_id).

Let us implement the synchronous function.

package me.janeve.java8.concurrent_package.completablefutures.part1.sync.executors;

import me.janeve.java8.concurrent_package.completablefutures.part1.entities.*;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.CustomerService;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.InvoiceService;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.OrderService;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.ProductInfoService;
import me.janeve.java8.helpers.Timer;

public class SynchronousExecution {
    public TransactionDetails fetchTransactionDetails(TransactionData payload){
        Timer.start("SynchronousExecution::"+payload.getOrderId());
        Order order = OrderService.fetchOrder(payload.getOrderId());
        Customer customer = CustomerService.fetchCustomer(payload.getCustomerId());
        Invoice invoice = InvoiceService.fetchInvoice(payload.getInvoiceId());

        for(LineItem item: invoice.getItems()) {
            item.setProductInfo( ProductInfoService.getProductInfo(item.getProductId()) );
        }

        TransactionDetails data = TransactionDetails.builder()
                .order(order)
                .customer(customer)
                .invoice(invoice)
                .build();
        Timer.stop("SynchronousExecution::"+payload.getOrderId());
        return data;
    }
}

When we dissect the flow, we can see fetchOrder(order_id), fetchCustomer(customer_id), and fetchInvoice(invoice_id) do not have any dependency or shared mutable data between them. But, fetchProductInfo(product_id_i) can only be executed after getInvoice(invoice_id). Hence, fetchProductInfo(product_id_i) depends on getInvoice(invoice_id). Here, each individual fetchProductInfo(product_id_i) are also blocked by the previous fetchProductInfo(product_id_i-1) but does not have any dependency or shared mutable data between them.

Asynchronous Programming of the function

We can rethink this problem as to how data flows through these functions. You have input data that you process and fetch (execution of functions) more data downstream. Some of these data can be fetched in parallel to others. Let us take an example scenario of what the synchronous program execution could look like.

Now in this scenario, the total execution of the main thread would take at least 885 milliseconds. Now let us implement the same as an asynchronous program. Let us rearrange the execution of the functions in separate threads. It could look something like this:

If there is sufficient and idle hardware resource available, the above main thread would take around 400 milliseconds. That is more than a 50% gain in performance. Apart from that, we can have a higher efficient usage of idle hardware resources. Asynchronous programming could help in attaining a competitive advantage for massive systems with a high demand for availability and performance.

CompletableFuture in Java

Java’s java.util.concurrent package has evolved over its lifetime. Before understanding CompletableFuture, we must have a high-level understanding of the following Java features:

  1. Future API (since Java 1.5)
  2. ForkJoinPool (since Java 1.7)
  3. ExecutorService (since Java 1.5)

The interface java.util.concurrent.CompletionStage and the class java.util.concurrent.CompletableFuture were introduced in Java 8 as a part of Concurrency API improvements. The CompletableFuture classes are an extension to the Future API.

Let us implement the same logic with the CompletableFuture functionality of Java.

package me.janeve.java8.concurrent_package.completablefutures.part1.async.executors;

import me.janeve.java8.concurrent_package.completablefutures.part1.entities.TransactionData;
import me.janeve.java8.concurrent_package.completablefutures.part1.entities.TransactionDetails;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.CustomerService;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.InvoiceService;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.OrderService;
import me.janeve.java8.concurrent_package.completablefutures.part1.service.ProductInfoService;
import me.janeve.java8.helpers.Timer;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class ASynchronousExecution {
    public TransactionDetails fetchTransactionDetails(final TransactionData payload) {
        Timer.start("ASynchronousExecution::"+payload.getOrderId());

        final TransactionDetails.TransactionDetailsBuilder builder = TransactionDetails.builder();

        CompletableFuture.allOf(
                    fetchInvoice(payload, builder),
                    fetchOrder(payload, builder),
                    fetchCustomer(payload, builder)
                ).join();

        TransactionDetails data = builder.build();
        Timer.stop("ASynchronousExecution::"+payload.getOrderId());
        return data;
    }

    private CompletableFuture<Void> fetchCustomer(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder) {
        return CompletableFuture.supplyAsync(() -> CustomerService.fetchCustomer(payload.getCustomerId())).thenAccept(builder::customer);
    }

    private CompletableFuture<Void> fetchOrder(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder) {
        return CompletableFuture.supplyAsync(() -> OrderService.fetchOrder(payload.getOrderId())).thenAccept(builder::order);
    }

    private CompletableFuture<Void> fetchInvoice(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder) {
        return CompletableFuture.supplyAsync(() -> InvoiceService.fetchInvoice(payload.getInvoiceId()))
                .thenApply(invoice -> {
                    List<CompletableFuture<Void>> futures = invoice.getItems().stream()
                            .map(lineItem -> CompletableFuture.supplyAsync(
                                    () -> ProductInfoService.getProductInfo(lineItem.getProductId())
                                    ).thenAccept(lineItem::setProductInfo))
                            .collect(Collectors.toList());
                    CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
                    return invoice;
                })
                .thenAccept(builder::invoice);
    }
}

The complete code is available in my git repo.

Now let us understand some of the important implementations. First, let us look at the code in fetchCustomer(TransactionData payload, TransactionDetails.TransactionDetailsBuilder builder).

return CompletableFuture.supplyAsync(() -> CustomerService.fetchCustomer(payload.getCustomerId())).thenAccept(builder::customer);

Both fetchOrder(order_id) and fetchCustomer(customer_id) have the same logic. Let us look at fetchCustomer(customer_id). There are two important functions being invoked here:

  1. supplyAsync(Supplier supplier)
  2. thenAccept(Consume action)

supplyAsync(Supplier supplier)

The supplyAsync(Supplier supplier) function is called to fork a new thread. In our example, the main thread is asking CompletableFuture to perform the fetchCustomer(customer_id) function in a separate thread. The data that is returned by the fetchCustomer(customer_id) method is then sent downstream to the thenAccept(Consume action) function.

thenAccept(Consume action)

The thenAccept(Consume action) function gets the information from the upstream and just updates the data in the response builder class.

The fetchInvoice(invoice_id) must get the invoice details first. The invoice details would contain several line items for which we need to make requests to fetch the product info.

return CompletableFuture.supplyAsync(() -> InvoiceService.fetchInvoice(payload.getInvoiceId()))
    .thenApply(invoice -> {
        List<CompletableFuture<Void>> futures = invoice.getItems().stream()
            .map(lineItem -> CompletableFuture.supplyAsync(() -> ProductInfoService.getProductInfo(lineItem.getProductId()))
                    .thenAccept(lineItem::setProductInfo))
            .collect(Collectors.toList());
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).join();
        return invoice;
    })
    .thenAccept(builder::invoice);

The CompletableFuture.supplyAsync() (Line No 1) function is to build an async call to fetch the invoice details. When the InvoiceService returns with the invoice details, it is then sent to the downstream function thenApply(Function fn).

thenApply(Function fn)

The thenApply(Function fn) function gets the invoice details and it then internally iterates through each line item (.getItems().stream().map()) and makes CompletableFuture.supplyAsync() call to fetch each of the product info in parallel. The thenAccept(Consume action) function is then called to update the product info for the line item it is processing.

The collect() function accumulates all the futures that is created for each line item we are processing. It is then used to make sure that all the line items has completed processing in line number 7. Since we need to now update the invoice details in the response builder, we return the invoice to the downstream in line number 8.

The thenAccept(builder::invoice) then updates the response builder with the value that was returned from the upstream.

Now let us look at the main thread that had spawned all these new threads.

final TransactionDetails.TransactionDetailsBuilder builder = TransactionDetails.builder();

CompletableFuture.allOf(
        fetchInvoice(payload, builder),
        fetchOrder(payload, builder),
        fetchCustomer(payload, builder)
    ).join();

TransactionDetails data = builder.build();

Line no 1, we first want to initialize a response builder that would help in async call. The methods can then take their time to update the builder class and once all the various threads have done their job, the builder is used to build the response.

Lines 3 to 7, we call the various calls and wait for those threads to finish their work. Once all the tasks are complete, the response is built and returned.

Summary

We have only scratched the surface of the capabilities of the CompletableFuture class. We have a plethora of functions and features to explore. Make sure you like/subscribe to our social media channels for new content updates.

  • Facebook
  • Twitter
  • LinkedIn
  • Link

If you have any specific scenarios you would like us to cover, please comment below and we will try to cover those scenarios in the coming parts on CompletableFuture and Java concurrency.

Post Views: 1,443
0
Shares
  • Share On Facebook
  • Tweet It




Author

Janeve George

A Technology Leader, Software Engineer, and Agile Methodologies enthusiast. Currently, working as Lead Software Development with Zeta Suite. He has more than 1.8 decades of experience spanning different verticals predominated by hosting, cloud, and media delivery technologies.

3Es to Supercharge your Career Journey and Performance Review Ratings
Read Next

3Es to Supercharge your Career Journey and Performance Review Ratings

  • Follow @MeJaneve
    Janeve.Me
  • Categories

    Browse through following categories for articles written by professionals.
    • Agile
      4
    • Books
      5
    • Environment
      1
    • Events and Conferences
      7
    • Featured
      9
    • Finance
      1
    • Leadership
      6
    • Life Experiences
      8
    • Robotics & AI
      1
    • Software
      30
    • System Administration
      2

  • Popular Posts

  • Recent Posts

    • 3Es to Supercharge your Career Journey and Performance Review Ratings
    • Java Asynchronous Programming using CompletableFuture - Part 1
    • The Java Stream API
    • Functional Interfaces in Java
  • Keep In Touch

    Follow us on social media to get latest articles on Programming, System Architecture, Agile Development Methodologies, Product and Project Management, Personal Development, BigData, Robotics, Upcoming Events and more...


Copyright © 2020 | Janeve.Me. All rights Reserved.
Press enter/return to begin your search