Sunday, 7 October 2018

Data processing with Java 8 Streams



Streams operations can be divided in two categories intermediate and terminal operations. 

Operations which are connected to each other in the stream pipeline and their return type is Stream are called intermediate operation and one that closes the Stream pipeline is called the terminal operation.

There is another operation called Short circuit operation which breaks the Stream pipeline or return with some portion of the stream rather than whole stream pipeline like Limit() , anyMatch(), findFirst() etc



Note that all the operation done on the stream pipeline are called aggregate operation for the aggregation of the data and they are just like the SQL part of the query to fetch the data and applying the aggregation of the data.

More declarative and structural way of processing data just like SQL so are less error prone from the code maintainability perspective. 

The intermediate operation can be added to a Stream pipeline by methods:
  • filter()
  • map()
  • flatMap()
  • distinct()
  • sorted()
  • peek()
  • limit()
  • skip()
All Intermediate operations are lazy, so they’re not executed until a result of a processing is actually needed.

Basically, intermediate operations return a new stream. Executing an intermediate operation does not actually perform any operation, but instead creates a new stream that, when traversed, contains the elements of the initial stream that match the given predicate

Terminal operations are:

  • forEach()
  • forEachOrdered()
  • toArray()
  • reduce()
  • collect()
  • min()
  • max()
  • count()
  • anyMatch()
  • allMatch()
  • noneMatch()
  • findFirst()
  • findAny()

Considering the previous examples shared for a Product having properties like name, price, category  and inventory.


/**
 * 
 */
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
 * @author Sachin.Srivastava
 * @category Streams Java 8 examples 
 * @Desc elaborate scenarios with an pragmatic example
 * @since Java 1.8
 *
 */
public class DataProcessWithStream {

public static void main(String[] args) {

DataProcessWithStream streamsProduct = new DataProcessWithStream();
List<Product> products = new ArrayList<>();
products.add(streamsProduct.new Product("P1", "MOBILE", 500.0, 100));
products.add(streamsProduct.new Product("P2", "FASHION", 200.0, 50));
products.add(streamsProduct.new Product("P3", "MOBILE", 250.0, 30));
products.add(streamsProduct.new Product("P4", "COMPUTER", 4000.0, 40));
products.add(streamsProduct.new Product("P5", "BOOKS", 50.0, 25));
products.add(streamsProduct.new Product("P6", "HEALTHCARE", 120.0, 10));
products.add(streamsProduct.new Product("P7", "BOOKS", 80.0, 400));

// Scenario 1 - get the list of products in mobile category with descending
//filter -> sorting -> collect 
List<Product> productList = products.stream()
.filter(product -> "MOBILE".equalsIgnoreCase(product.getCategory()))
.sorted(Comparator.comparing(Product::getPrice).reversed())
.collect(Collectors.toList());
System.out.println(productList);


/**


* Output
*[Product [name=P1, category=MOBILE, price=500.0, inventory=100], 
*Product [name=P3, category=MOBILE, price=250.0, inventory=30]]


* **/

           //Scenario 2- get the products list by names

List<String> productByNames = 
    products.stream()
         .map(Product::getName)
         .collect(Collectors.toList());
System.out.println(productByNames);
/**
* output - 
* [P1, P2, P3, P4, P5, P6, P7]

* **/
//Scenario 3- get the sum of the inventories of all the books
 
Integer inventoryCountOfBook = 
    products.stream()
    .filter(p -> "BOOKS".equalsIgnoreCase(p.getCategory()))
    .mapToInt(Product::getInventory)
    .sum();
System.out.println(inventoryCountOfBook);
 
/**
* output - 
* 425


* **/
}

class Product {

public Product(String name, String category, Double price, Integer inventory) {
super();
this.name = name;
this.category = category;
this.price = price;//in dollars
this.inventory = inventory;
}
@Override
public String toString() {
return "Product [name=" + name + ", category=" + category + ", price=" + price + ", inventory=" + inventory
+ "]";
}
String name;
String category;
Double price;
Integer inventory;

public Integer getInventory() {
return inventory;
}
public void setInventory(Integer inventory) {
this.inventory = inventory;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
}


}

In the above example on the streams of list of products there is filter operation applied which expects a Predicate and later a Comparator  is passed to aggregate the data both results in the stream and terminal operation is the collect which results the aggregated data  from the stream pipeline.

Following are the important aggregate operation performed on the list of product stream.

map - transforms the each element into required operation.
filter - filter out the stream pipeline based on the predicate passed 
sorted- if stream data to be required  

If you see there are so many possibilities of various aggregate operations based on the requirement and output is extremely fast possible without writing the much code and easy to maintain and review.

That's the  biggest advantage of the Stream your code is much more cleaner and easy to understand and less error prone and output is phenomenal. Its greatly empower the data processing capabilities of the Java and definitely a great initiate to encouraging the functional programming concepts in Java.

Enjoy Java Streams!!  

Friday, 5 October 2018

Java 8 Streams - empowering data processing queries






In the Java SE 8 release lot of emphasise is given on the data processing  queries similar to SQL structure where data finding and further aggregation gives you required data. Consider an example of a E Commerce site where a product  is having many attributes like Price , Inventory , Category etc and lot of aggregated data is required for various use cases.

E.g, In the SQL  structure of the query like for the table Product : 

Select * from Product where <?>  group by <??> and includes the aggregation like  SUM, MAX, AVG ,MIN 

So how DB works , first its find the data set and applies the aggregation rules as per the queries and gives you an required aggregated data.

Quite similar to SQL structural queries , same capability is added in the Java 8 in the forms of Streams under the package java.utils.stream which gives you capability of writing the code in a more of declarative way. 

A new interface Stream is added which extends the BaseStream interface which is responsible for which are sequences of elements supporting sequential and parallel aggregate operations and BaseStream  extends the AutoCloseable interface part of Java 1.7

Java API designers added this fantastic abstraction layer for processing the data in a declarative approach also leveraging the multi core architecture without writing a single line of multi threaded code.




Stream ----> BaseStream ----> AutoCloseable

Following is the structure of BaseStream interface : 

public interface BaseStream<T, S extends BaseStream<T, S>> extends AutoCloseable {
Iterator<T> iterator();



Spliterator<T> spliterator();

boolean isParallel();

S sequential();

S parallel();

S unordered();

S onClose(Runnable arg0);

void close();

}



    Lets get into more details of the Stream API  and its capability of empowering the data processing using a simple declarative approach.

    Streams work on collections of elements and making the data into streams and parallel streams to process it without writing heavy lines of codes.

    Most importantly there are changes added into the Collection and Iterable interfaces  in Java 8 to make them compatible with the Stream capability .

    In the Collection  interface following default methods are being added : 

    default boolean removeIf(Predicate<? super E> arg0) {
    Objects.requireNonNull(arg0);
    boolean arg1 = false;
    Iterator arg2 = this.iterator();

    while (arg2.hasNext()) {
    if (arg0.test(arg2.next())) {
    arg2.remove();
    arg1 = true;
    }
    }

    return arg1;

    }

    default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
    }

    default Stream<E> stream() {
    return StreamSupport.stream(this.spliterator(), false);
    }

    default Stream<E> parallelStream() {
    return StreamSupport.stream(this.spliterator(), true);

    }

    And in the Iterable interface following methods are being introduced as part of enhancement in the interfaces.

    default void forEach(Consumer<? super T> arg0) {
    Objects.requireNonNull(arg0);
    Iterator arg1 = this.iterator();

    while (arg1.hasNext()) {
    Object arg2 = arg1.next();
    arg0.accept(arg2);
    }

    }

    default Spliterator<T> spliterator() {
    return Spliterators.spliteratorUnknownSize(this.iterator(), 0);




    These two changes in both the interfaces is crucial for implementing the Stream capability.

    Stream supports many operations such as filtermapreduce, iterate, collect that can be combined to write concise and expressive data processing queries.





    Let me show an example with the java code written for Product having properties like name , price , category  and inventory.



    /**
     * @author Sachin.Srivastava
     * @category Streams Java 8 examples 
     * @Desc elaborate scenarios with an pragmatic example
     * @since Java 1.8
     *
     */
    public class StreamsInJava {

    public static void main(String[] args) {

    StreamsInJava streamsProduct = new StreamsInJava();
    List<Product> products = new ArrayList<>();
    products.add(streamsProduct.new Product("P1", "MOBILE", 500.0, 100));
    products.add(streamsProduct.new Product("P2", "FASHION", 200.0, 50));
    products.add(streamsProduct.new Product("P3", "MOBILE", 250.0, 30));
    products.add(streamsProduct.new Product("P4", "COMPUTER", 4000.0, 40));
    products.add(streamsProduct.new Product("P5", "BOOKS", 50.0, 25));
    products.add(streamsProduct.new Product("P6", "HEALTHCARE", 120.0, 10));
    products.add(streamsProduct.new Product("P7", "BOOKS", 80.0, 400));

    //Scenario 1 - get the product details which has highest inventory
    //Scenario 2 - get the list of products in mobile category
    //Scenario 3 - get the highest price product
    //Scenario 3 - get the average price in the books category

    //Scenario 1 - get the product details which has highest inventory
    Integer highestInventory = 
    products
    .stream()
    .mapToInt(Product::getInventory)
    .max()
    .orElseThrow(NoSuchElementException::new);
    //1.1
    System.out.println(highestInventory);//printing the highest inventory

    //1.2 get the Object detail
    Product p = products.stream()
    .max(Comparator.comparing(Product::getInventory))
    .orElseThrow(NoSuchElementException::new);

    System.out.println(p);//printing the product detail
    /***
    * Output of scenario 1 -
    * 1.1 - 400
    * 1.2 - Product detail=Product [name=P7, category=BOOKS, price=80.0, inventory=400]

    * */

    //Scenario 2 - get the list of products in mobile category
    List<Product> pList = products.stream()
    .filter(product->"Mobile".equalsIgnoreCase(product.getCategory()))
    .collect(Collectors.toList());
    System.out.println(pList);

    /***
    * Output of scenario 2 -
    * [Product [name=P1, category=MOBILE, price=500.0, inventory=100], Product [name=P3, category=MOBILE, price=250.0, inventory=30]]

    * */


    /**

    * with the similar logic we can have n number of aggregation applied on the collections.
    * **/
    }

    class Product {

    public Product(String name, String category, Double price, Integer inventory) {
    super();
    this.name = name;
    this.category = category;
    this.price = price;//in dollars
    this.inventory = inventory;
    }
    @Override
    public String toString() {
    return "Product [name=" + name + ", category=" + category + ", price=" + price + ", inventory=" + inventory
    + "]";
    }
    String name;
    String category;
    Double price;
    Integer inventory;

    public Integer getInventory() {
    return inventory;
    }
    public void setInventory(Integer inventory) {
    this.inventory = inventory;
    }
    public Double getPrice() {
    return price;
    }
    public void setPrice(Double price) {
    this.price = price;
    }
    public String getName() {
    return name;
    }
    public void setName(String name) {
    this.name = name;
    }
    public String getCategory() {
    return category;
    }
    public void setCategory(String category) {
    this.category = category;
    }
    }


    }

    As you can with the above examples you can have as many as required aggregated process applied on the same collection of data with writing a such a simple declarative code, easy to write and maintain, an excellent outcome for your project.   :)

    Having said Stream would be proven a revolutionary in  the project  maintainability and performance in the multi threaded environment. 


    Hope you enjoy the journey with Java 8 !!!

    Sunday, 24 September 2017

    DeadLock in Multithreading in Java


    Deadlock is a situation  where one thread1(t1) is waiting for the release of a lock(say str1) acquired by another thread t2 and this thread is waiting for the release of the lock (say str2) acquired by  first thread t1.



     This condition creates a never ending process of release of lock and no threads ever able to acquire the lock and programme hang in this state.

    Deadlock is tricky situation and most likely not able to reproduce in many condition,  but should be handle  with full care.

    I will try to demonstrate couple of scenarios where  deadlock doesn't occur but probable code of creating a deadlock and another scenario where deadlock occurs for sure and application remain in hanged state.

    Demo 1 : Situation likely to be a deadlock but works well and doesn't hang the application.

    public class MultiThreading {
    String s1 = "John";
    String s2 = "Cena";
    Thread t1 = new Thread(() -> {
    synchronized (s1) {
    synchronized (s2) {
    System.out.println(s1 + " " + s2);
    }
    }
    });
    Thread t2 = new Thread(() -> {
    synchronized (s2) {
    synchronized (s1) {
    System.out.println(s2 + " " + s1);
    }
    }
    });

    public static void main(String[] args) {
    MultiThreading mt = new MultiThreading();
    mt.t1.start();
    mt.t2.start();
    }

    }

    Output : 

    John Cena
    Cena John


    Demo2 : Deadlock occurs and application gets in the hanged state with the addition of a while loop in the above code.

    public class MultiThreading {
    String s1 = "John";
    String s2 = "Cena";
    Thread t1 = new Thread(() -> {
    while (true) {  /// note that addition of this loop
    synchronized (s1) {
    synchronized (s2) {
    System.out.println(s1 + " " + s2);
    }
    }
    }
    });
    Thread t2 = new Thread(() -> {
    while (true) {  /// note that addition of this loop
    synchronized (s2) {
    synchronized (s1) {
    System.out.println(s2 + " " + s1);
    }
    }
    }
    });

    public static void main(String[] args) {
    MultiThreading mt = new MultiThreading();
    mt.t1.start();
    mt.t2.start();
    }
    }

    Ouptut :  [prints following then application gets hanged]

    John Cena
    John Cena
    John Cena
    John Cena
    John Cena
    John Cena


    Most likely deadlock to be occurred in the places where inside of some loop multiple threads are accessing common resources which is common among the thread.

    How to avoid deadlock ?

    Answer : Mutex is the saviour in this kind of situation. Mutex is a semaphore with a permit count 1 . It allows one thread at a time to consume the common resource and other thread need a permit from the semaphore before using the resource. This exclusion of the common resource from the multi thread allows the updates on the value of the common resource safe and doesn't corrupt the data and make thread safe. 

    If run the same above code of deadlock using a mutex , then it goes well without hanging the application.

    public class MultiThreading {
    String s1 = "John";
    String s2 = "Cena";
    Semaphore semaphore = new Semaphore(1);
    Thread t1 = new Thread(() -> {
    while (true) {
    try{
    semaphore.acquire();
    synchronized (s1) {
    synchronized (s2) {
    System.out.println(s1 + " " + s2);
    }
    }
    }catch(Exception e){
    } finally {
    semaphore.release();
    }
    }
    });
    Thread t2 = new Thread(() -> {
    while (true) {
    try{
    semaphore.acquire();
    synchronized (s2) {
    synchronized (s1) {
    System.out.println(s2 + " " + s1);
    }
    }
    }catch(Exception e){
    } finally {
    semaphore.release();
    }
    }
    });

    public static void main(String[] args) {
    MultiThreading mt = new MultiThreading();
    mt.t1.start();
    mt.t2.start();
    }
    }

    Output : [without hanging :) and without deadlock :) ]

    Cena John
    Cena John
    Cena John
    Cena John
    Cena John
    Cena John
    Cena John
    Cena John
    Cena John 

    Enjoy the coding and multithreading!!!!

    Wednesday, 2 August 2017

    LRU caching implementation in Java


    Designing a Least recently used cache is an important piece in the software development where an element is removed from the  cache if the size of the cache is reaches to the capacity.

    Caching is always helpful in the faster retrieval  and gives O(1) or constant time retrieval complexity.

    Map data structure are helpful in achieving the O(1) time complexity and other data structures like Queue or Stack could be a solution where pop or remove method are useful in adding new element in the data structure in a constant time  or using doubly linked list could also be one solution for this caching.

    To provide a solution , I have used an implementation using the LinkedHashMap which has the property of giving the least recently used element with the iterator . First element is always an LRU element in the map.

    I will demonstrate the same using the following code snippet :

    import java.util.Iterator;
    import java.util.LinkedHashMap;
    import java.util.Map;
    import java.util.Map.Entry;

    /**
     * @author sachin.srivastava
     * tek9g.blogspot.com
     *
     */
    public class LRUCaching<T> {
    private int capacity;
    private LinkedHashMap<String, T> map;

    public LRUCaching(int capacity) {
    this.capacity = capacity;
    map = new LinkedHashMap<>();
    }
    public T get(String key){
    T t = this.map.get(key);
    if(t == null){
    return null;
    } else {
    this.set(key, t);
    return t;
    }
    }
    public void set(String key,T value){
    if(this.map.containsKey(key)){
    this.map.remove(key);
    } else if(this.map.size() == this.capacity){
    Iterator<String> it =  this.map.keySet().iterator();
    it.next();
    it.remove();
    }
    this.map.put(key, value);
    }
    public static <T> void printMap(Map<String, T> map){
    System.out.println("\n--printing the map elements--\n");
    for (Entry<String, T> entry : map.entrySet()) {
    System.out.println("key=" + entry.getKey() + " and value=" + entry.getValue());
    }
    }
    public static void main(String[] args) {
    LRUCaching<Integer> lru = new LRUCaching<>(5);//capacity of 5 elements.
    lru.set("a", 1);
    lru.set("b", 2);
    lru.set("c", 3);
    lru.set("d", 4);
    lru.set("e", 5);
    printMap(lru.map);
    System.out.println("\nget key=a and value=" + lru.get("a"));// it will move key a to front and b would be at the rear.
    printMap(lru.map);
    //setting the element beyond the capacity
    lru.set("f", 6);//it will remove the least used key b
    printMap(lru.map);
    }

    }


    Output : 


    --printing the map elements--

    key=a and value=1
    key=b and value=2
    key=c and value=3
    key=d and value=4
    key=e and value=5

    get key=a and value=1

    --printing the map elements--

    key=b and value=2
    key=c and value=3
    key=d and value=4
    key=e and value=5
    key=a and value=1

    --printing the map elements--

    key=c and value=3
    key=d and value=4
    key=e and value=5
    key=a and value=1
    key=f and value=6

    --------------------------------------

    In the above example the Map always contains the number of elements as per the capacity and as it reaches the limit , it removes the Least Recently Used element from the map. As soon as an element is get from the map , it ranks higher in the map so is the case when  key 'a' is get , it moves the key in the above to key 'b' which becomes the LRU element so when adding a new key 'f' it shows the capacity is reached and hence the LRU element which is removed from the map and allows a new key to inserted in the map.

    Enjoy the coding!!!!