Sunday 7 October 2018

Data processing with Java 8 Streams



Streams operations can be divided in two categories intermediate and terminal operations. 

Operations which are connected to each other in the stream pipeline and their return type is Stream are called intermediate operation and one that closes the Stream pipeline is called the terminal operation.

There is another operation called Short circuit operation which breaks the Stream pipeline or return with some portion of the stream rather than whole stream pipeline like Limit() , anyMatch(), findFirst() etc



Note that all the operation done on the stream pipeline are called aggregate operation for the aggregation of the data and they are just like the SQL part of the query to fetch the data and applying the aggregation of the data.

More declarative and structural way of processing data just like SQL so are less error prone from the code maintainability perspective. 

The intermediate operation can be added to a Stream pipeline by methods:
  • filter()
  • map()
  • flatMap()
  • distinct()
  • sorted()
  • peek()
  • limit()
  • skip()
All Intermediate operations are lazy, so they’re not executed until a result of a processing is actually needed.

Basically, intermediate operations return a new stream. Executing an intermediate operation does not actually perform any operation, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate

Terminal operations are:

  • forEach()
  • forEachOrdered()
  • toArray()
  • reduce()
  • collect()
  • min()
  • max()
  • count()
  • anyMatch()
  • allMatch()
  • noneMatch()
  • findFirst()
  • findAny()

Considering the previous examples shared for a Product having properties like name, price, category  and inventory.


/**
 * 
 */
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author Sachin.Srivastava
 * @category Streams Java 8 examples 
 * @Desc elaborate scenarios with an pragmatic example
 * @since Java 1.8
 *
 */
public class DataProcessWithStream {

public static void main(String[] args) {

DataProcessWithStream streamsProduct = new DataProcessWithStream();
List<Product> products = new ArrayList<>();
products.add(streamsProduct.new Product("P1", "MOBILE", 500.0, 100));
products.add(streamsProduct.new Product("P2", "FASHION", 200.0, 50));
products.add(streamsProduct.new Product("P3", "MOBILE", 250.0, 30));
products.add(streamsProduct.new Product("P4", "COMPUTER", 4000.0, 40));
products.add(streamsProduct.new Product("P5", "BOOKS", 50.0, 25));
products.add(streamsProduct.new Product("P6", "HEALTHCARE", 120.0, 10));
products.add(streamsProduct.new Product("P7", "BOOKS", 80.0, 400));

// Scenario 1 - get the list of products in mobile category with descending
//filter -> sorting -> collect 
List<Product> productList = products.stream()
.filter(product -> "MOBILE".equalsIgnoreCase(product.getCategory()))
.sorted(Comparator.comparing(Product::getPrice).reversed())
.collect(Collectors.toList());
System.out.println(productList);


/**


* Output
*[Product [name=P1, category=MOBILE, price=500.0, inventory=100], 
*Product [name=P3, category=MOBILE, price=250.0, inventory=30]]


* **/

           //Scenario 2- get the products list by names

List<String> productByNames = 
    products.stream()
         .map(Product::getName)
         .collect(Collectors.toList());
System.out.println(productByNames);
/**
* output - 
* [P1, P2, P3, P4, P5, P6, P7]

* **/
//Scenario 3- get the sum of the inventories of all the books
 
Integer inventoryCountOfBook = 
    products.stream()
    .filter(p -> "BOOKS".equalsIgnoreCase(p.getCategory()))
    .mapToInt(Product::getInventory)
    .sum();
System.out.println(inventoryCountOfBook);
 
/**
* output - 
* 425


* **/
}

class Product {

public Product(String name, String category, Double price, Integer inventory) {
super();
this.name = name;
this.category = category;
this.price = price;//in dollars
this.inventory = inventory;
}
@Override
public String toString() {
return "Product [name=" + name + ", category=" + category + ", price=" + price + ", inventory=" + inventory
+ "]";
}
String name;
String category;
Double price;
Integer inventory;

public Integer getInventory() {
return inventory;
}
public void setInventory(Integer inventory) {
this.inventory = inventory;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
}


}

In the above example on the streams of list of products there is filter operation applied which expects a Predicate and later a Comparator  is passed to aggregate the data both results in the stream and terminal operation is the collect which results the aggregated data  from the stream pipeline.

Following are the important aggregate operation performed on the list of product stream.

map - transforms the each element into required operation.
filter - filter out the stream pipeline based on the predicate passed 
sorted- if stream data to be required  

If you see there are so many possibilities of various aggregate operations based on the requirement and output is extremely fast possible without writing the much code and easy to maintain and review.

That's the  biggest advantage of the Stream your code is much more cleaner and easy to understand and less error prone and output is phenomenal. Its greatly empower the data processing capabilities of the Java and definitely a great initiate to encouraging the functional programming concepts in Java.

Enjoy Java Streams!!  

Friday 5 October 2018

Java 8 Streams - empowering data processing queries






In the Java SE 8 release lot of emphasise is given on the data processing  queries similar to SQL structure where data finding and further aggregation gives you required data. Consider an example of a E Commerce site where a product  is having many attributes like Price , Inventory , Category etc and lot of aggregated data is required for various use cases.

E.g, In the SQL  structure of the query like for the table Product : 

Select * from Product where <?>  group by <??> and includes the aggregation like  SUM, MAX, AVG ,MIN 

So how DB works , first its find the data set and applies the aggregation rules as per the queries and gives you an required aggregated data.

Quite similar to SQL structural queries , same capability is added in the Java 8 in the forms of Streams under the package java.utils.stream which gives you capability of writing the code in a more of declarative way. 

A new interface Stream is added which extends the BaseStream interface which is responsible for which are sequences of elements supporting sequential and parallel aggregate operations and BaseStream  extends the AutoCloseable interface part of Java 1.7

Java API designers added this fantastic abstraction layer for processing the data in a declarative approach also leveraging the multi core architecture without writing a single line of multi threaded code.




Stream ----> BaseStream ----> AutoCloseable

Following is the structure of BaseStream interface : 

public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable {
Iterator<T> iterator();



Spliterator<T> spliterator();

boolean isParallel();

S sequential();

S parallel();

S unordered();

S onClose(Runnable arg0);

void close();

}



    Lets get into more details of the Stream API  and its capability of empowering the data processing using a simple declarative approach.

    Streams work on collections of elements and making the data into streams and parallel streams to process it without writing heavy lines of codes.

    Most importantly there are changes added into the Collection and Iterable interfaces  in Java 8 to make them compatible with the Stream capability .

    In the Collection  interface following default methods are being added : 

    default boolean removeIf(Predicate<? super E> arg0) {
    Objects.requireNonNull(arg0);
    boolean arg1 = false;
    Iterator arg2 = this.iterator();

    while (arg2.hasNext()) {
    if (arg0.test(arg2.next())) {
    arg2.remove();
    arg1 = true;
    }
    }

    return arg1;

    }

    default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
    }

    default Stream<E> stream() {
    return StreamSupport.stream(this.spliterator(), false);
    }

    default Stream<E> parallelStream() {
    return StreamSupport.stream(this.spliterator(), true);

    }

    And in the Iterable interface following methods are being introduced as part of enhancement in the interfaces.

    default void forEach(Consumer<? super T> arg0) {
    Objects.requireNonNull(arg0);
    Iterator arg1 = this.iterator();

    while (arg1.hasNext()) {
    Object arg2 = arg1.next();
    arg0.accept(arg2);
    }

    }

    default Spliterator<T> spliterator() {
    return Spliterators.spliteratorUnknownSize(this.iterator(), 0);




    These two changes in both the interfaces is crucial for implementing the Stream capability.

    Stream supports many operations such as filtermapreduce, iterate, collect that can be combined to write concise and expressive data processing queries.





    Let me show an example with the java code written for Product having properties like name , price , category  and inventory.



    /**
     * @author Sachin.Srivastava
     * @category Streams Java 8 examples 
     * @Desc elaborate scenarios with an pragmatic example
     * @since Java 1.8
     *
     */
    public class StreamsInJava {

    public static void main(String[] args) {

    StreamsInJava streamsProduct = new StreamsInJava();
    List<Product> products = new ArrayList<>();
    products.add(streamsProduct.new Product("P1", "MOBILE", 500.0, 100));
    products.add(streamsProduct.new Product("P2", "FASHION", 200.0, 50));
    products.add(streamsProduct.new Product("P3", "MOBILE", 250.0, 30));
    products.add(streamsProduct.new Product("P4", "COMPUTER", 4000.0, 40));
    products.add(streamsProduct.new Product("P5", "BOOKS", 50.0, 25));
    products.add(streamsProduct.new Product("P6", "HEALTHCARE", 120.0, 10));
    products.add(streamsProduct.new Product("P7", "BOOKS", 80.0, 400));

    //Scenario 1 - get the product details which has highest inventory
    //Scenario 2 - get the list of products in mobile category
    //Scenario 3 - get the highest price product
    //Scenario 3 - get the average price in the books category

    //Scenario 1 - get the product details which has highest inventory
    Integer highestInventory = 
    products
    .stream()
    .mapToInt(Product::getInventory)
    .max()
    .orElseThrow(NoSuchElementException::new);
    //1.1
    System.out.println(highestInventory);//printing the highest inventory

    //1.2 get the Object detail
    Product p = products.stream()
    .max(Comparator.comparing(Product::getInventory))
    .orElseThrow(NoSuchElementException::new);

    System.out.println(p);//printing the product detail
    /***
    * Output of scenario 1 -
    * 1.1 - 400
    * 1.2 - Product detail=Product [name=P7, category=BOOKS, price=80.0, inventory=400]

    * */

    //Scenario 2 - get the list of products in mobile category
    List<Product> pList = products.stream()
    .filter(product->"Mobile".equalsIgnoreCase(product.getCategory()))
    .collect(Collectors.toList());
    System.out.println(pList);

    /***
    * Output of scenario 2 -
    * [Product [name=P1, category=MOBILE, price=500.0, inventory=100], Product [name=P3, category=MOBILE, price=250.0, inventory=30]]

    * */


    /**

    * with the similar logic we can have n number of aggregation applied on the collections.
    * **/
    }

    class Product {

    public Product(String name, String category, Double price, Integer inventory) {
    super();
    this.name = name;
    this.category = category;
    this.price = price;//in dollars
    this.inventory = inventory;
    }
    @Override
    public String toString() {
    return "Product [name=" + name + ", category=" + category + ", price=" + price + ", inventory=" + inventory
    + "]";
    }
    String name;
    String category;
    Double price;
    Integer inventory;

    public Integer getInventory() {
    return inventory;
    }
    public void setInventory(Integer inventory) {
    this.inventory = inventory;
    }
    public Double getPrice() {
    return price;
    }
    public void setPrice(Double price) {
    this.price = price;
    }
    public String getName() {
    return name;
    }
    public void setName(String name) {
    this.name = name;
    }
    public String getCategory() {
    return category;
    }
    public void setCategory(String category) {
    this.category = category;
    }
    }


    }

    As you can with the above examples you can have as many as required aggregated process applied on the same collection of data with writing a such a simple declarative code, easy to write and maintain, an excellent outcome for your project.   :)

    Having said Stream would be proven a revolutionary in  the project  maintainability and performance in the multi threaded environment. 


    Hope you enjoy the journey with Java 8 !!!