Sunday 7 October 2018

Data processing with Java 8 Streams



Streams operations can be divided in two categories intermediate and terminal operations. 

Operations which are connected to each other in the stream pipeline and their return type is Stream are called intermediate operation and one that closes the Stream pipeline is called the terminal operation.

There is another operation called Short circuit operation which breaks the Stream pipeline or return with some portion of the stream rather than whole stream pipeline like Limit() , anyMatch(), findFirst() etc



Note that all the operation done on the stream pipeline are called aggregate operation for the aggregation of the data and they are just like the SQL part of the query to fetch the data and applying the aggregation of the data.

More declarative and structural way of processing data just like SQL so are less error prone from the code maintainability perspective. 

The intermediate operation can be added to a Stream pipeline by methods:
  • filter()
  • map()
  • flatMap()
  • distinct()
  • sorted()
  • peek()
  • limit()
  • skip()
All Intermediate operations are lazy, so they’re not executed until a result of a processing is actually needed.

Basically, intermediate operations return a new stream. Executing an intermediate operation does not actually perform any operation, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate

Terminal operations are:

  • forEach()
  • forEachOrdered()
  • toArray()
  • reduce()
  • collect()
  • min()
  • max()
  • count()
  • anyMatch()
  • allMatch()
  • noneMatch()
  • findFirst()
  • findAny()

Considering the previous examples shared for a Product having properties like name, price, category  and inventory.


/**
 * 
 */
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author Sachin.Srivastava
 * @category Streams Java 8 examples 
 * @Desc elaborate scenarios with an pragmatic example
 * @since Java 1.8
 *
 */
public class DataProcessWithStream {

public static void main(String[] args) {

DataProcessWithStream streamsProduct = new DataProcessWithStream();
List<Product> products = new ArrayList<>();
products.add(streamsProduct.new Product("P1", "MOBILE", 500.0, 100));
products.add(streamsProduct.new Product("P2", "FASHION", 200.0, 50));
products.add(streamsProduct.new Product("P3", "MOBILE", 250.0, 30));
products.add(streamsProduct.new Product("P4", "COMPUTER", 4000.0, 40));
products.add(streamsProduct.new Product("P5", "BOOKS", 50.0, 25));
products.add(streamsProduct.new Product("P6", "HEALTHCARE", 120.0, 10));
products.add(streamsProduct.new Product("P7", "BOOKS", 80.0, 400));

// Scenario 1 - get the list of products in mobile category with descending
//filter -> sorting -> collect 
List<Product> productList = products.stream()
.filter(product -> "MOBILE".equalsIgnoreCase(product.getCategory()))
.sorted(Comparator.comparing(Product::getPrice).reversed())
.collect(Collectors.toList());
System.out.println(productList);


/**


* Output
*[Product [name=P1, category=MOBILE, price=500.0, inventory=100], 
*Product [name=P3, category=MOBILE, price=250.0, inventory=30]]


* **/

           //Scenario 2- get the products list by names

List<String> productByNames = 
    products.stream()
         .map(Product::getName)
         .collect(Collectors.toList());
System.out.println(productByNames);
/**
* output - 
* [P1, P2, P3, P4, P5, P6, P7]

* **/
//Scenario 3- get the sum of the inventories of all the books
 
Integer inventoryCountOfBook = 
    products.stream()
    .filter(p -> "BOOKS".equalsIgnoreCase(p.getCategory()))
    .mapToInt(Product::getInventory)
    .sum();
System.out.println(inventoryCountOfBook);
 
/**
* output - 
* 425


* **/
}

class Product {

public Product(String name, String category, Double price, Integer inventory) {
super();
this.name = name;
this.category = category;
this.price = price;//in dollars
this.inventory = inventory;
}
@Override
public String toString() {
return "Product [name=" + name + ", category=" + category + ", price=" + price + ", inventory=" + inventory
+ "]";
}
String name;
String category;
Double price;
Integer inventory;

public Integer getInventory() {
return inventory;
}
public void setInventory(Integer inventory) {
this.inventory = inventory;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
}


}

In the above example on the streams of list of products there is filter operation applied which expects a Predicate and later a Comparator  is passed to aggregate the data both results in the stream and terminal operation is the collect which results the aggregated data  from the stream pipeline.

Following are the important aggregate operation performed on the list of product stream.

map - transforms the each element into required operation.
filter - filter out the stream pipeline based on the predicate passed 
sorted- if stream data to be required  

If you see there are so many possibilities of various aggregate operations based on the requirement and output is extremely fast possible without writing the much code and easy to maintain and review.

That's the  biggest advantage of the Stream your code is much more cleaner and easy to understand and less error prone and output is phenomenal. Its greatly empower the data processing capabilities of the Java and definitely a great initiate to encouraging the functional programming concepts in Java.

Enjoy Java Streams!!  

1 comment:

  1. List result =
    list.stream()
    .filter(Objects::nonNull)
    .filter(predicate)
    .sorted(comp)
    .collect(Collectors.toList());

    ReplyDelete