Friday 5 October 2018

Java 8 Streams - empowering data processing queries






In the Java SE 8 release lot of emphasise is given on the data processing  queries similar to SQL structure where data finding and further aggregation gives you required data. Consider an example of a E Commerce site where a product  is having many attributes like Price , Inventory , Category etc and lot of aggregated data is required for various use cases.

E.g, In the SQL  structure of the query like for the table Product : 

Select * from Product where <?>  group by <??> and includes the aggregation like  SUM, MAX, AVG ,MIN 

So how DB works , first its find the data set and applies the aggregation rules as per the queries and gives you an required aggregated data.

Quite similar to SQL structural queries , same capability is added in the Java 8 in the forms of Streams under the package java.utils.stream which gives you capability of writing the code in a more of declarative way. 

A new interface Stream is added which extends the BaseStream interface which is responsible for which are sequences of elements supporting sequential and parallel aggregate operations and BaseStream  extends the AutoCloseable interface part of Java 1.7

Java API designers added this fantastic abstraction layer for processing the data in a declarative approach also leveraging the multi core architecture without writing a single line of multi threaded code.




Stream ----> BaseStream ----> AutoCloseable

Following is the structure of BaseStream interface : 

public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable {
Iterator<T> iterator();



Spliterator<T> spliterator();

boolean isParallel();

S sequential();

S parallel();

S unordered();

S onClose(Runnable arg0);

void close();

}



    Lets get into more details of the Stream API  and its capability of empowering the data processing using a simple declarative approach.

    Streams work on collections of elements and making the data into streams and parallel streams to process it without writing heavy lines of codes.

    Most importantly there are changes added into the Collection and Iterable interfaces  in Java 8 to make them compatible with the Stream capability .

    In the Collection  interface following default methods are being added : 

    default boolean removeIf(Predicate<? super E> arg0) {
    Objects.requireNonNull(arg0);
    boolean arg1 = false;
    Iterator arg2 = this.iterator();

    while (arg2.hasNext()) {
    if (arg0.test(arg2.next())) {
    arg2.remove();
    arg1 = true;
    }
    }

    return arg1;

    }

    default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
    }

    default Stream<E> stream() {
    return StreamSupport.stream(this.spliterator(), false);
    }

    default Stream<E> parallelStream() {
    return StreamSupport.stream(this.spliterator(), true);

    }

    And in the Iterable interface following methods are being introduced as part of enhancement in the interfaces.

    default void forEach(Consumer<? super T> arg0) {
    Objects.requireNonNull(arg0);
    Iterator arg1 = this.iterator();

    while (arg1.hasNext()) {
    Object arg2 = arg1.next();
    arg0.accept(arg2);
    }

    }

    default Spliterator<T> spliterator() {
    return Spliterators.spliteratorUnknownSize(this.iterator(), 0);




    These two changes in both the interfaces is crucial for implementing the Stream capability.

    Stream supports many operations such as filtermapreduce, iterate, collect that can be combined to write concise and expressive data processing queries.





    Let me show an example with the java code written for Product having properties like name , price , category  and inventory.



    /**
     * @author Sachin.Srivastava
     * @category Streams Java 8 examples 
     * @Desc elaborate scenarios with an pragmatic example
     * @since Java 1.8
     *
     */
    public class StreamsInJava {

    public static void main(String[] args) {

    StreamsInJava streamsProduct = new StreamsInJava();
    List<Product> products = new ArrayList<>();
    products.add(streamsProduct.new Product("P1", "MOBILE", 500.0, 100));
    products.add(streamsProduct.new Product("P2", "FASHION", 200.0, 50));
    products.add(streamsProduct.new Product("P3", "MOBILE", 250.0, 30));
    products.add(streamsProduct.new Product("P4", "COMPUTER", 4000.0, 40));
    products.add(streamsProduct.new Product("P5", "BOOKS", 50.0, 25));
    products.add(streamsProduct.new Product("P6", "HEALTHCARE", 120.0, 10));
    products.add(streamsProduct.new Product("P7", "BOOKS", 80.0, 400));

    //Scenario 1 - get the product details which has highest inventory
    //Scenario 2 - get the list of products in mobile category
    //Scenario 3 - get the highest price product
    //Scenario 3 - get the average price in the books category

    //Scenario 1 - get the product details which has highest inventory
    Integer highestInventory = 
    products
    .stream()
    .mapToInt(Product::getInventory)
    .max()
    .orElseThrow(NoSuchElementException::new);
    //1.1
    System.out.println(highestInventory);//printing the highest inventory

    //1.2 get the Object detail
    Product p = products.stream()
    .max(Comparator.comparing(Product::getInventory))
    .orElseThrow(NoSuchElementException::new);

    System.out.println(p);//printing the product detail
    /***
    * Output of scenario 1 -
    * 1.1 - 400
    * 1.2 - Product detail=Product [name=P7, category=BOOKS, price=80.0, inventory=400]

    * */

    //Scenario 2 - get the list of products in mobile category
    List<Product> pList = products.stream()
    .filter(product->"Mobile".equalsIgnoreCase(product.getCategory()))
    .collect(Collectors.toList());
    System.out.println(pList);

    /***
    * Output of scenario 2 -
    * [Product [name=P1, category=MOBILE, price=500.0, inventory=100], Product [name=P3, category=MOBILE, price=250.0, inventory=30]]

    * */


    /**

    * with the similar logic we can have n number of aggregation applied on the collections.
    * **/
    }

    class Product {

    public Product(String name, String category, Double price, Integer inventory) {
    super();
    this.name = name;
    this.category = category;
    this.price = price;//in dollars
    this.inventory = inventory;
    }
    @Override
    public String toString() {
    return "Product [name=" + name + ", category=" + category + ", price=" + price + ", inventory=" + inventory
    + "]";
    }
    String name;
    String category;
    Double price;
    Integer inventory;

    public Integer getInventory() {
    return inventory;
    }
    public void setInventory(Integer inventory) {
    this.inventory = inventory;
    }
    public Double getPrice() {
    return price;
    }
    public void setPrice(Double price) {
    this.price = price;
    }
    public String getName() {
    return name;
    }
    public void setName(String name) {
    this.name = name;
    }
    public String getCategory() {
    return category;
    }
    public void setCategory(String category) {
    this.category = category;
    }
    }


    }

    As you can with the above examples you can have as many as required aggregated process applied on the same collection of data with writing a such a simple declarative code, easy to write and maintain, an excellent outcome for your project.   :)

    Having said Stream would be proven a revolutionary in  the project  maintainability and performance in the multi threaded environment. 


    Hope you enjoy the journey with Java 8 !!!

    No comments:

    Post a Comment