Skip to content

5. The Stream<T> type in Java 8

5.1. Example-01 - The Stream class

Operations on Observable streams have many similarities with Streams. One difference is that an element of a Stream cannot be processed until the entire Stream has been obtained, whereas an element of an Observable stream can be processed (observed) as soon as it is obtained, without waiting for the entire Observable stream to be obtained. Another difference is that once the Stream is obtained, its values are utilized by pulling them one by one from the Stream. For the Observable, it’s different. As soon as it emits a value, that value is pushed to its subscriber.

Several classes implement the concept of a Stream. Here we present the Stream<T> class:

Image

The Stream class offers 39 methods. We will present a few of them. Consider the following code:

  

package dvp.java8.streams;
 
import java.util.List;

import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple01 {
    public static void main(String[] args) {
        // list of persons
        List<Personne> personnes = Personnes.get();
        // display 1
        personnes.stream().forEach(p -> {
            System.out.println(p);
        });
        System.out.println("----------------");
        // display 2
        personnes.stream().forEach(System.out::println);
    }
}
  • line 11: we instantiate a list of people;
  • line 13: from this list, we create a Stream. All collections can be converted into Streams in this way. This allows us to take advantage of all the methods of this class, which enable us to process the elements of the collection more concisely than with loops. It also allows us to benefit from parallel processing of elements when possible;
  • line 13: the [Stream.forEach] method has the following signature:
 

We see that the method’s parameter is the functional interface [Consumer<T>] presented in Section 4.4—an interface whose sole method operates on type T and returns nothing.

  • In the code:

        personnes.stream().forEach(p -> {
            System.out.println(p);
});
  • [people.stream()] produces a stream of elements of type [Person] that feeds the [forEach] method. The parameter p is of type [Person], and the provided lambda function prints this person;

The previous code can be simplified as follows (line 18):


personnes.stream().forEach(System.out::println);

Rather than passing the value of a lambda function as a parameter, we pass the reference to an existing method, in this case the println method of the System.out class. Of course, this method must have the correct signature, in this case the signature of the [Consumer.accept] method: void accept(T t). As mentioned earlier, the parameter of the [accept] method will be of type [Person];

We obtain the following results:

1
2
3
4
5
6
7
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}
----------------
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}

Once a Stream has been processed, it can no longer be processed. It must be reconstructed if you wish to process it again. This is demonstrated by the following code [Example01b]:


package dvp.java8.streams;
 
import java.util.stream.Stream;
 
import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple01b {
    public static void main(String[] args) {
        // people flows
        Stream<Personne> personnes = Personnes.get().stream();
        // display 1
        personnes.forEach(p -> {
            System.out.println(p);
        });
        System.out.println("----------------");
        // display 2
        personnes.forEach(System.out::println);
    }
}
  • Line 11: To optimize the code, we decide to construct the Stream only once. The results obtained are as follows:

{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}
----------------
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.util.stream.AbstractPipeline.sourceStageSpliterator(Unknown Source)
    at java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at dvp.java8.streams.Exemple02b.main(Exemple02b.java:18)

Every time you want to use a Stream, you must construct it, even if it has been constructed previously.

5.2. Example-02 - Parallel processing of elements in a Stream

  

Consider the following code:


package dvp.java8.streams;
 
import java.util.List;
 
import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple02 {
    public static void main(String[] args) {
        // list of persons
        List<Personne> personnes = Personnes.get();
        // display 1
        personnes.stream().forEach(Exemple02::affiche);
        System.out.println("-----------------");
        // display 2
        personnes.stream().parallel().forEach(Exemple02::affiche);
    }
 
    public static void affiche(Personne p) {
        System.out.printf("Personne %s sur thread %s%n", p, Thread.currentThread().getName());
    }
}
  • lines 19–21: the [display] method prints to the console the JSON string of a person along with the name of the execution thread in which the display is taking place;
  • line 13: displays a list of people. Note that the parameter of the [forEach] method is the reference to the previous static method;
  • line 16: we do the same thing, but with the [parallel] method, we request that the stream elements be processed in parallel across multiple threads. Not all processing can be done in parallel. Here, we must assume that the display order does not matter because, in parallel processing, the execution order of the threads is not guaranteed. Note also a syntax that will become ubiquitous for both Streams and Observables:
flux.m1(e1->...).m2(e2->..).m3(e3->...)...
  • (continued)
    • stream produces elements e1 that feed the m1 method;
    • flux.m1 is in turn a stream of elements e2 that feed the m2 method;
    • flux.m1.m2 is a stream of elements e3 that feed the m3 method;

The type of elements e1, e2, e3 may change as the initial stream undergoes processing.

Executing this code yields the following result:

1
2
3
4
5
6
7
Personne {"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"} sur thread main
Personne {"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"} sur thread main
Personne {"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"} sur thread main
-----------------
Personne {"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"} sur thread main
Personne {"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"} sur thread ForkJoinPool.commonPool-worker-1
Personne {"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"} sur thread ForkJoinPool.commonPool-worker-2

We can see that the parallel execution (lines 5–7) took place on three different threads and did not follow the order of the elements as shown in lines 1–3. In this document, we will not focus much on the parallel processing of elements in a Stream, because that would require discussing the conditions that make such processing possible. We then discover that few operations can be performed in parallel. One that lends itself naturally to parallelism is the sum of the numerical elements of a stream, which we will now present.

5.3. Example-03 - Parallel Processing of Stream Elements

  

Consider the following code (Example 03a):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
public class Exemple03a {
    public static void main(String[] args) {
 
        final long limite = 10_000_000L;
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Long> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(i);
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // sum of numbers - sequential method
        début = new Date().getTime();
        long somme = nombres.stream().reduce(0L, (s, i) -> s + i);
        System.out.printf("somme séquentielle : somme=%s, durée (ms)=%s%n", somme, new Date().getTime() - début);
    }
}
  • In line 22, we use the [reduce] method, which has the following signature:
  • The [reduce] method works with elements of type T;
  • the [reduce] method applies the same processing to all elements in a stream: the initial value of an accumulator is provided as the first parameter. A method implementing the functional interface [BinaryOperator] [2] is provided as the second parameter: based on each element and the accumulator, this method returns a new value for the accumulator. The final value of the accumulator is the value returned by the [reduce] method. The code [3] illustrates this mechanism. The [apply] method is the method of the functional interface [BinaryOperator] [2];

Let’s return to the example code:

  • line 12: we display the number of cores detected by the JVM;
  • lines 15–18: a list of 10 million numbers is created;
  • line 22: the sum of these numbers is calculated sequentially using a single thread;

We obtain the following results:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 4336 ms
somme séquentielle : somme=49999995000000, durée (ms)=225

Now, let’s replace line 22 of the code with the following (Example03b):


long somme = nombres.stream().parallel().reduce(0L, (s, i) -> s + i);

We instruct the Stream elements to be processed in parallel using multiple threads. This is possible because the order in which the numbers are summed does not matter. We can therefore assign n1 numbers to thread T1, n2 numbers to thread T2, ... and finally sum the results provided by these different threads. We then obtain the following results:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 4332 ms
somme parallèle : somme=49999995000000, durée (ms)=184

There is therefore virtually no performance gain. This will often be the case in the examples that follow. Thread management itself is time-consuming. The operation performed by each core must be sufficiently complex for the performance gain to be noticeable. This is demonstrated by the following example (Example03c):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.BinaryOperator;
 
public class Exemple03c {
    public static void main(String[] args) {
 
        final long limite = 10_000L;
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Long> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(i);
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // sum of numbers - sequential method
        début = new Date().getTime();
        BinaryOperator<Long> bo = (s, i) -> {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
            }
            return s + i;
        };
        long somme = nombres.stream().reduce(0L, bo);
        System.out.printf("somme séquentielle : somme=%s, durée (ms)=%s%n", somme, new Date().getTime() - début);
    }
}
  • line 30: we use the [reduce] method again, passing it the reference to the method from lines 23–29 as a parameter;
  • line 28: the [bo] method returns the sum of its two parameters;
  • lines 24–27: artificially, we make the thread wait for 1 millisecond to simulate intensive work;

We then obtain the following results:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000 nombres en 2 ms
somme séquentielle : somme=49995000, durée (ms)=13617

Now, if we replace line 30 with the following:


long somme = nombres.stream().parallel().reduce(0L, bo);

we get the following results:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000 nombres en 2 ms
somme séquentielle : somme=49995000, durée (ms)=1598

We can clearly see the performance gain achieved by executing the sum calculation in parallel. For processing 8 numbers:

  • the sequential thread waits 8 times 1 millisecond, or 8 ms;
  • the 8 parallel threads each wait 1 millisecond at the same time (for simplicity’s sake), so a total of 1 millisecond for the 8 numbers;

We can therefore expect parallel execution to be 8 times faster than sequential execution. That is roughly the case here.

5.4. Example-04 - Filtering a Stream

  

Consider the following code:


package dvp.java8.streams;
 
import java.util.List;
 
import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple04 {
    public static void main(String[] args) {
        // list of persons
        List<Personne> personnes = Personnes.get();
        // displays
        System.out.println("age < 28 ----------------------");
        personnes.stream().filter(p -> p.getAge() < 28).forEach(p -> {
            System.out.println(p);
        });
        System.out.println("poids < 50 ----------------------");
        personnes.stream().filter(p -> p.getPoids() < 50).forEach(p -> {
            System.out.println(p);
        });
        System.out.println("age < 28 ----------------------");
        personnes.stream().filter(p -> p.getAge() < 28).forEach(System.out::println);
        System.out.println("poids < 50 ----------------------");
        personnes.stream().filter(p -> p.getPoids() < 50).forEach(System.out::println);
    }
}
  • line 14: the [Stream.filter] method has the following signature:
 
  • The [filter] method expects as a parameter an instance of the functional interface [Predicate] presented in Section 4.2, whose only method to be implemented is the following: boolean test(T t);
  • The [filter] method returns the elements of the Stream that satisfy the Predicate. It is therefore used to filter the Stream;

Consider the following code:


package dvp.java8.streams;
 
import java.util.List;
 
import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple04 {
    public static void main(String[] args) {
        // list of persons
        List<Personne> personnes = Personnes.get();
        // displays
        System.out.println("age < 28 ----------------------");
        personnes.stream().filter(p -> p.getAge() < 28).forEach(p -> {
            System.out.println(p);
        });
        System.out.println("poids < 50 ----------------------");
        personnes.stream().filter(p -> p.getPoids() < 50).forEach(p -> {
            System.out.println(p);
        });
        System.out.println("age < 28 ----------------------");
        personnes.stream().filter(p -> p.getAge() < 28).forEach(System.out::println);
        System.out.println("poids < 50 ----------------------");
        personnes.stream().filter(p -> p.getPoids() < 50).forEach(System.out::println);
    }
}
  • lines 14-16: display people under 28 years old;
  • lines 18-20: display people with a weight <50;
  • line 22: does the same thing as lines 14–16 but more concisely;
  • line 24: does the same thing as lines 18–20 but more concisely;

The results of the execution are as follows:

age < 28 ----------------------
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
poids < 50 ----------------------
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
age < 28 ----------------------
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
poids < 50 ----------------------
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}

5.5. Example-05 - Create a Stream<T2> from a Stream<T1>

  

Consider the following code:


package dvp.java8.streams;

import java.util.List;
 
import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple05 {
  public static void main(String[] args) {
    // list of persons
    List<Personne> personnes = Personnes.get();
    // displays
    System.out.println("Personne --> String ----------------------");
    personnes.stream().map(p -> p.getNom()).forEach(System.out::println);
    System.out.println("Personne --> Integer ----------------------");
    personnes.stream().map(p -> p.getAge()).forEach(System.out::println);
  }
}
  • Line 13: The [Stream.map] method has the following signature:
 

The parameter of the [Stream.map] method is an instance of the functional interface [Function] presented in Section 4.3, whose only method to be implemented is: R apply(T t). We see that, given a type T, the [apply] function produces a type R. The [Stream.map] method will therefore produce a Stream of type R from a stream of type T (a stream of type T means here, in a technical inaccuracy that we will retain, a stream of elements of type T).

Let’s now examine the code in the example:

  • line 14: from a person p, we keep only the name. We thus obtain a stream of Strings;
  • line 14: from a person p, we keep only the name. We therefore obtain a stream of Integer;

The results obtained are as follows:

1
2
3
4
5
6
7
8
Personne --> String ----------------------
jean
marie
camille
Personne --> Integer ----------------------
20
10
30

5.6. Example-06 - Other methods of the Stream<T> class

  

We illustrate some of the 39 methods of the Stream class with the following code:


package dvp.java8.streams;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dvp.data.Personne;
import dvp.data.Personnes;
 
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
 
public class Exemple06 {
 
    // mapper jSON
    static private ObjectMapper jsonMapper = new ObjectMapper();
 
    public static void main(String[] args) throws JsonProcessingException {
        // list of persons
        List<Personne> personnes = Personnes.get();
        // all people
        affiche("all", personnes);
        // the 1st person
        affiche("findFirst", personnes.stream().findFirst().get());
        // any person
        affiche("findAny", personnes.stream().findAny().get());
        // people without the 1st
        affiche("skip 1", personnes.stream().skip(1L).collect(Collectors.toList()));
        // the first 2 people
        affiche("limit 2", personnes.stream().limit(2L).collect(Collectors.toList()));
        // the number of people
        affiche("count", personnes.stream().count());
        // the oldest person
        affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());
        // the lightest person
        affiche("poids min", personnes.stream().min(Comparator.comparingDouble(Personne::getPoids)).get());
        // last person in alphabetical order of name
        affiche("nom max", personnes.stream().max((p1, p2) -> p1.getNom().compareToIgnoreCase(p2.getNom())).get());
        // total age of all persons
        affiche("âge total (reduce)", personnes.stream().map(p -> p.getAge()).reduce(0, (a1, a2) -> a1 + a2));
        // people by ascending age
        affiche("personnes par âge croissant",
                personnes.stream().sorted(Comparator.comparingInt(Personne::getAge)).collect(Collectors.toList()));
        // are there any people over 100?
        affiche("des personnes de + de 100 ans (anyMatch)", personnes.stream().anyMatch(p -> p.getAge() > 100));
        // are all people at most 100 years old?
        affiche("des personnes de + de 100 ans (noneMatch)", personnes.stream().noneMatch(p -> p.getAge() > 100));
        // are all people over 8 years old?
        affiche("des personnes de + de 8 ans (allMatch)", personnes.stream().allMatch(p -> p.getAge() > 8));
        // group people by gender
        affiche("personnes regroupées par sexe", personnes.stream().collect(Collectors.groupingBy(p -> p.getSexe())));
        // remove duplicate elements from a list
        affiche("distinct", Stream.of(1, 2, 1).distinct().collect(Collectors.toList()));
        // of a Stream<Stream<T>>, we make a Stream<T>
        affiche("flatMap", Stream.of(1, 2, 3).flatMap(i -> Stream.of(i, i + 10)).collect(Collectors.toList()));
        // of a Stream<Stream<Integer>>, we make a IntStream and calculate its sum
        affiche("flatMapToInt", Stream.of(1, 2, 3).flatMapToInt(i -> IntStream.of(i, i + 10)).sum());
        // of a Stream<Stream<Integer>>, we make a DoubleStream and then an array
        affiche("flatMapToDouble", Stream.of(1, 2, 3).flatMapToDouble(i -> DoubleStream.of(i, i * 1.2)).toArray());
        // max of a stream of integers
        affiche("reduce Integer::max", Stream.of(1, 10, 8).reduce(Integer::max).get());
        // min of a Double
        affiche("reduce Integer::min", Stream.of(1.5, 10.4, 8.9).reduce(Double::min).get());
        // average of a stream of integers
        affiche("IntStream average", IntStream.of(1, 10, 8).average().getAsDouble());
        // statistics for a stream of integers
        affiche("IntStream summaryStatistics", IntStream.of(1, 10, 8).summaryStatistics());
    }
 
    public static <T> void affiche(String message, T value) throws JsonProcessingException {
        System.out.println(String.format("%s ----", message));
        System.out.println(jsonMapper.writeValueAsString(value));
    }
}
  • lines 72, 75: display the JSON string of the method's second parameter;
  • line 24: displays the JSON string for all people. The result is as follows:
all ----
[{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"},{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"},{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}]

5.6.1. [findFirst]


// la 1ère personne
affiche("findFirst", personnes.stream().findFirst().get());

The [findFirst] method returns the first element of a stream, if it exists. Its signature is as follows:

The result is of type Optional<T>, a type introduced in Java 8:

The Optional<T> class allows for different handling of null pointers. A method that needs to return a type T that may have the value null can choose to return a type Optional<T>. The [Optional<T>.isPresent()] method allows you to determine whether the method returned a value or not. The following code [Example06b] illustrates part of how Optional<T> works:


package dvp.java8.streams;
 
import java.util.Optional;
 
import com.fasterxml.jackson.core.JsonProcessingException;
 
public class Exemple06b {
 
    public static void main(String[] args) throws JsonProcessingException {
        // optional without value
        Optional<Integer> o1 = m1();
        System.out.println(o1.isPresent());
        affiche(o1);
        // optional with value
        Optional<Integer> o2 = m2();
        System.out.println(o2.isPresent());
        affiche(o2);
    }
 
    private static void affiche(Optional<Integer> o1) {
        try {
            // retrieve the value of the Optional
            // throws 1 exception if no value
            System.out.println(o1.get());
        } catch (Throwable th) {
            System.out.printf("%s : %s%n", th.getClass().getName(), th.getMessage());
        }
 
    }
 
    public static Optional<Integer> m1() {
        // no value
        return Optional.empty();
    }
 
    public static Optional<Integer> m2() {
        // a value
        return Optional.of(10);
    }
}

The results are as follows:


false
java.util.NoSuchElementException : No value present
true
10

Let's return to the code illustrating the [findFirst] method:


// la 1ère personne
affiche("findFirst", personnes.stream().findFirst().get());
  • line 2: to simplify the code, we use the [get] method on the Optional<Person> produced by the [findFirst] method. Clean code would require calling the [Optional<Person>.isPresent()] method before calling the [get] method;

The result is as follows:

findFirst ----
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}

5.6.2. [findAny]


// n'importe quelle personne
affiche("findAny", personnes.stream().findAny().get());

The [findAny] method has the following signature:

 

The [findAny] method can return any element from the stream. During testing, we observe that a sequential execution returns the first element of the stream, whereas a parallel execution can indeed return any element. This is demonstrated by the following code [Example06c]:


package dvp.java8.streams;
 
import java.util.List;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import dvp.data.Personne;
import dvp.data.Personnes;
 
public class Exemple06c {
 
    // mapper jSON
    static private ObjectMapper jsonMapper = new ObjectMapper();
 
    public static void main(String[] args) throws JsonProcessingException {
        // list of persons
        List<Personne> personnes = Personnes.get();
        // everyone
        affiche("all", personnes);
        // any person
        affiche("findAny parallèle", personnes.stream().parallel().findAny().get());
        // any person
        affiche("findAny séquentiel", personnes.stream().findAny().get());
    }

    public static <T> void affiche(String message, T value) throws JsonProcessingException {
        System.out.println(String.format("%s ----", message));
        System.out.println(jsonMapper.writeValueAsString(value));
    }
}
  • line 22: findAny executed in parallel;
  • line 24: findAny executed sequentially;

The results obtained are as follows:

1
2
3
4
5
6
all ----
[{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"},{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"},{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}]
findAny parallèle ----
{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}
findAny séquentiel ----
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
  • line 4: the parallel execution returned the second element of the list of people. It could have been another one;
  • line 6: sequential execution returned the first element of the list of people;

The use of the [findAny] method seems to make sense only in the parallel processing of a stream.

5.6.3. [skip]


// les personnes sans la 1ère
affiche("skip 1", personnes.stream().skip(1L).collect(Collectors.toList()));

The [skip] method has the following signature:

 

The [skip] method skips the first n elements of a stream. As indicated in the documentation above, executing this method in parallel yields little performance gain and may even result in a loss of performance. Indeed, to skip the first n elements, the threads are forced to coordinate, which negates the performance gains from parallelism.

The [skip] method returns a Stream<Person> that is converted to a List<Person> by the [collect] method, which has the following signature:

 

The [collect] method takes as a parameter an instance of type [Collector], which has a complex signature. There are predefined implementations of type [Collector] that usually allow you to avoid implementing it yourself. Here, the implementation used is [Collectors.toList()]. [Collectors] is a class with numerous static methods that implement the [Collector<T,A,R>] type. This is the first place to look when you want to convert a Stream into a standard Java collection:

 

We will use some of these methods later on.

Execution yields the following result:

skip 1 ----
[{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"},{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}]

The first element of the list (jean) has been omitted.

5.6.4. [limit]


// les 2 premières personnes
affiche("limit 2", personnes.stream().limit(2L).collect(Collectors.toList()));

The [limit] method has the following signature:

 

The [limit] method allows you to keep only the first n elements of a stream. It is not suitable for parallel processing.

The execution yields the following result:

limit 2 ----
[{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"},{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}]

5.6.5. [count]


// le nombre de personnes
affiche("count", personnes.stream().count());

The [count] method has the following signature:

 

The [count] method returns the number of elements in a Stream. Parallel execution of the method does not result in a performance gain, as shown in the following code (Example06d1):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
 
public class Exemple06d1 {
    public static void main(String[] args) {
 
        final long limite = 10_000_000L;
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Long> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(i);
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // counting numbers - sequential method
        Stream<Long> sNombres = nombres.stream();
        début = new Date().getTime();
        long count = sNombres.count();
        System.out.printf("comptage séquentiel : compteur=%s, durée (ms)=%s%n", count, new Date().getTime() - début);
    }
}
  • lines 11–22: create a Stream of 10 million numbers;
  • lines 22–24: counting the Stream;

The execution yields the following result:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 4407 ms
comptage séquentiel : compteur=10000000, durée (ms)=67

If we replace line 22 of the code with the following (Example06d2):


Stream<Long> sNombres = nombres.stream().parallel();

we get the following results:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 4341 ms
comptage parallèle : compteur=10000000, durée (ms)=100

5.6.6. [max, min]


// la personne la + âgée
affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());

The [max] method has the following signature:

 

The [max] method returns the maximum value of a stream using the comparator passed to it as a parameter. Comparator is a functional interface whose only method to implement has the signature: int compare(T o1, T o2). This method must return -1 if o1 < o2, 0 if o1.equals(o2), and +1 if o1 > o2. The Comparator functional interface has many default static methods that implement the Comparator interface for the most common cases. Thus, in the statement:


affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());

We use the static method [Comparator.comparingInt], whose signature is as follows:

 

The ToIntFunction type is a functional interface:

 

The [applyAsInt] method of the ToIntFunction functional interface produces an int type from a type T. Let’s return to our code:


affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());

The actual parameter of the [Comparator.comparingInt] method must be a Person --> int lambda here. We pass the reference to the [Person.getAge] method, which has this signature. Ultimately, we will get the person with the oldest age. We obtain an Optional<Person> type, from which we extract the value using the [Optional.get] method. We get the following result:

age max ----
{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}

Calculating the maximum in parallel does not result in any performance gains, as shown in the following example: (Example06e1):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
 
public class Exemple06e1 {
    public static void main(String[] args) {
 
        // data
        // final long limit = 100L;
        // final boolean verbose = true;
        final long limite = 10_000_000L;
        final boolean verbose = false;
 
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Long> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(new Random().nextLong());
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // max numbers - sequential method
        Stream<Long> sNombres = nombres.stream();
        Comparator<Long> compLong = (l1, l2) -> {
            if (verbose) {
                // thread
                System.out.printf("[%s]", Thread.currentThread().getName());
            }
            // comparison
            long v1 = l1.longValue();
            long v2 = l2.longValue();
            if (v1 < v2) {
                return -1;
            } else {
                if (v1 == v2) {
                    return 0;
                } else {
                    return +1;
                }
            }
        };
        début = new Date().getTime();
        // long max = sNombres.max(Comparator.naturalOrder()).get();
        long max = sNombres.max(compLong).get();
        System.out.printf("%nmax séquentiel : max=%s, durée (ms)=%s%n", max, new Date().getTime() - début);
    }
}
  • line 29: we have a stream of random integers of type Long;
  • lines 30–47: the lambda variable compLong implements the Comparator<Long> interface. This interface is normally implemented by the [Comparator.naturalOrder()] method on line 49. But here, we want to display the execution thread (lines 31–33). So we implement the interface ourselves;
  • line 50: finding the maximum;

We obtain the following results:

 

Now, if we replace line 27 with the following (Example06e2):


Stream<Long> sNombres = nombres.stream().parallel();

we get the following results:

 

The parallel execution was therefore slower. If we increase the number of numbers to 10 million with verbose=false, we get the following results:

1
2
3
4
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 3764 ms

max séquentiel : max=9223370471463514417, durée (ms)=53

For sequential execution:

1
2
3
4
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 3760 ms

max parallèle : max=9223365260999360873, durée (ms)=77

for parallel execution, which remains slower.

We use the [Stream.min] method in a similar way:


// la personne la + légère
affiche("poids min", personnes.stream().min(Comparator.comparingDouble(Personne::getPoids)).get());

5.6.7. [reduce]


// l'âge total de toutes les personnes
affiche("âge total (reduce)", personnes.stream().map(p -> p.getAge()).reduce(0, (a1, a2) -> a1 + a2));

The [reduce] method was introduced in Section 5.3. Line 2 above sums the ages of all people. The result is as follows:

âge total (reduce) ----
60

5.6.8. [sorted]


// les personnes par âge croissant
affiche("personnes par âge croissant",
                personnes.stream().sorted(Comparator.comparingInt(Personne::getAge)).collect(Collectors.toList()));
// les personnes par ordre alphabétique des noms
List<Personne> lPersonnes=personnes.stream().sorted((p1, p2) -> p1.getNom().compareTo(p2.getNom())).collect(Collectors.toList());
affiche("personnes par ordre alphabétique des noms", lPersonnes);

The [sorted] method (lines 3 and 5) has the following signature:

 

The [sorted] method takes as a parameter the [Comparator] type described in Section 5.6.6 for the min and max methods. It allows a Stream to be sorted in the order of the comparator passed to it as a parameter. We have seen that the [Comparator] interface provides several default static methods that implement common comparators, notably for numbers and strings. Here, we use the [Comparator.comparingInt] method, which takes as a parameter a type ToIntFunction, which is a functional interface for the [applyAsInt] method with the following signature: int applyAsInt(T t). Here, the actual parameter passed to the [Comparator.comparingInt] method on line 3 is the reference to the [Person.age] method, which returns the person’s age.

The [Comparator] interface does not provide static methods for comparing strings. On line 5, we construct a lambda ourselves that implements the only method of this interface: int compare(T t1, T t2)


(p1, p2) -> p1.getNom().compareTo(p2.getNom())

This lambda compares the names of the people. The results obtained are as follows:

1
2
3
4
personnes par âge croissant ----
[{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"},{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"},{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}]
personnes par ordre alphabétique des noms ----
[{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"},{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"},{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}]

Parallel execution of the sort does not seem possible, as shown in the following code (Example06f1):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
public class Exemple06f1 {
    // mapper jSON
    static ObjectMapper jsonMapper = new ObjectMapper();
 
    public static void main(String[] args) throws JsonProcessingException {
 
        // data
        final long limite = 100L;
        final boolean verbose = true;
//         final long limit = 10_000_000L;
//         final boolean verbose = false;
 
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Integer> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(new Random().nextInt(1000));
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // number sorting - sequential method
        Stream<Integer> sNombres = nombres.stream();
        début = new Date().getTime();
        Comparator<Integer> compInt = (i1, i2) -> {
            if (verbose) {
                // thread
                System.out.printf("[%s]", Thread.currentThread().getName());
            }
            // comparison
            int v1 = i1.intValue();
            int v2 = i2.intValue();
            if (v1 < v2) {
                return +1;
            } else {
                if (v1 == v2) {
                    return 0;
                } else {
                    return -1;
                }
            }
        };
        if (verbose) {
            affiche("nombres", sNombres.sorted(compInt).collect(Collectors.toList()));
        }
        System.out.printf("tri séquentiel : durée (ms)=%s%n", new Date().getTime() - début);
    }
 
    public static <T> void affiche(String message, T value) throws JsonProcessingException {
        System.out.println(String.format("%s ----", message));
        System.out.println(jsonMapper.writeValueAsString(value));
    }
 
}
  • lines 30–36: we construct a stream of random numbers;
  • line 32: we pass the compInt lambda (lines 38-55) to the [sorted] method. This lambda sorts the numbers in descending order and displays the thread executing it.

The results obtained are as follows:

 

If, in the previous code, we replace line 36 with the following (Example06f2):


        Stream<Integer> sNombres = nombres.stream().parallel();        

we get the following results:

 

Surprisingly, we find that the stream of numbers was sorted using a single thread. There was no parallelism. Or am I missing something?

5.6.9. [anyMatch, noneMatch, allMatch]


// y-a-t-il des personnes de + de 100 ans ?
affiche("des personnes de + de 100 ans (anyMatch)", personnes.stream().anyMatch(p -> p.getAge() > 100));
// est-ce que toutes les personnes ont au plus 100 ans ?
affiche("des personnes de + de 100 ans (noneMatch)", personnes.stream().noneMatch(p -> p.getAge() > 100));
// est-ce que toutes les personnes ont + de 8 ans
affiche("des personnes de + de 8 ans (allMatch)", personnes.stream().allMatch(p -> p.getAge() > 8));

Lines 2, 4, and 6: the methods [anyMatch, noneMatch, allMatch] take a Predicate type as a parameter, as described in Section 4.2. They therefore perform filtering. All three return a Boolean:

  • anyMatch returns true if there is at least one element in the Stream that satisfies the filter;
  • noneMatch returns true if there are no elements in the Stream that satisfy the filter;
  • allMatch returns true if all elements of the Stream satisfy the filter;

The results obtained are as follows:

1
2
3
4
5
6
des personnes de + de 100 ans (anyMatch) ----
false
des personnes de + de 100 ans (noneMatch) ----
true
des personnes de + de 8 ans (allMatch) ----
true

5.6.10. [collect(Collectors.groupingBy)]


// on regroupe les personnes par sexe
affiche("personnes regroupées par sexe", personnes.stream().collect(Collectors.groupingBy(p -> p.getSexe())));

The [collect] method was introduced in Section 5.6.3. Its parameter is an implementation of the [Collector] interface. The [Collectors] class provides a number of static methods that implement the [Collector] interface. So far, we have used the [Collectors.toList()] method. Here, we use the static [Collectors.groupingBy] method, which creates a dictionary from the Stream. Its signature is as follows:

 

The [groupingBy] method creates a Map<K,List<T>> from a Stream<T>. The key K is provided by the [groupingBy] method’s parameter of type Function<T,K>, whose sole method has the signature: K apply(T t). If we want to create a map indexed by a person’s gender, we must provide a function that generates the gender from a person. Here, we pass the reference to the [Person.getGender] method as the actual parameter of the [groupingBy] method. The results obtained are as follows:

personnes regroupées par sexe ----
{"HOMME":[{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}],"FEMME":[{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"},{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}]}

Line 2 contains the JSON string of a dictionary indexed by two keys: MAN and WOMAN.

Parallel computation does not result in performance gains, as shown in the following example (Example06g1):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
 
public class Exemple06g1 {
 
    // mppeur jSON
    static ObjectMapper jsonMapper = new ObjectMapper();
 
    public static void main(String[] args) throws JsonProcessingException {
 
        // data
        final long limite = 100L;
        final boolean verbose = true;
//         final long limit = 10_000_000L;
//         final boolean verbose = false;
 
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Integer> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(new Random().nextInt(1000));
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // grouping numbers by hundred - sequential method
        Stream<Integer> sNombres = nombres.stream();
        Function<Integer, Integer> groupByCent = n -> {
            if (verbose) {
                System.out.printf("[%s]", Thread.currentThread().getName());
            }
            return n / 100;
        };
        début = new Date().getTime();
        // Map<Integer, List<Integer>> lNombres = sNombres.collect(Collectors.groupingBy(number -> number / 100));
        Map<Integer, List<Integer>> lNombres = sNombres.collect(Collectors.groupingBy(groupByCent));
        System.out.printf("%nregroupement séquentiel : durée (ms)=%s%n", new Date().getTime() - début);
        // results
        if (verbose) {
            affiche("nombres regroupés", lNombres);
        }
    }
 
    public static <T> void affiche(String message, T value) throws JsonProcessingException {
        System.out.println(String.format("%s ----", message));
        System.out.println(jsonMapper.writeValueAsString(value));
    }
 
}
  • lines 23–38: constructing a stream of numbers;
  • line 47: the numbers are grouped by hundreds. The lambda function in lines 39–44 is used to display the execution thread;

The execution results are as follows:

 

If, in the code, we replace line 38 with the following line (Example06g2):


Stream<Integer> sNombres = nombres.stream().parallel();            

we obtain the following results:

 

We can see that parallel execution of the grouping has degraded performance.

5.6.11. [distinct]


// supression des éléments en double d'une liste
affiche("distinct", Stream.of(1, 2, 1).distinct().collect(Collectors.toList()));

The [distinct] method has the following signature:

 

It removes duplicates from a stream. The [Stream.of] method (line 2) has the following signature:

 

It allows you to create a Stream from explicitly provided values. The results of the execution are as follows:

distinct ----
[1,2]

5.6.12. [flatMap]


// d'un Stream<Stream<T>>, on fait un Stream<T>
affiche("flatMap", Stream.of(1, 2, 3).flatMap(i -> Stream.of(i, i + 10)).collect(Collectors.toList()));

The [flatMap] method has the following signature:

 

The [flatMap] method takes as a parameter a function that:

  • takes an element of type T from the Stream as a parameter;
  • returns a Stream<R>;

If, instead of the [flatMap] method, we had used the [map] method described in Section 5.5, the result would be a Stream<Stream<R>> type, where each element of type T in the initial stream would have produced a Stream<R> element. The [flatMap] method, on the other hand, returns a Stream<R> type. It flattens the various Stream<R> streams into a single stream. This is what the results of executing the previous code show:

flatMap ----
[1,11,2,12,3,13]

There are specialized variants of [flatMap]:


// d'un Stream<IntStream>, on fait un IntStream dont on calcule la somme
affiche("flatMapToInt", Stream.of(1, 2, 3).flatMapToInt(i -> IntStream.of(i, i + 10)).sum());

The [flatMapToInt] method has the following signature:

 

The [flatMapToInt] method takes as a parameter a function that returns an IntStream of the following type:

 

IntStream is a stream of int. This type is preferable to the Stream<Integer> type because its processing avoids boxing/unboxing between the Integer and int types. This interface inherits many methods from the Stream<T> type and adds others, including the [sum] method above, which sums the elements of the IntStream.

The following code illustrates the use of the analogous [flatMapToDouble] method:


// d'un Stream<DoubleStream>, on fait un DoubleStream puis un tableau
affiche("flatMapToDouble", Stream.of(1, 2, 3).flatMapToDouble(i -> DoubleStream.of(i, i * 1.2)).toArray());

The [DoubleStream.toArray] method allows you to convert from a DoubleStream type to a double[] type.

The results for these two examples are as follows:

1
2
3
4
flatMapToInt ----
42
flatMapToDouble ----
[1.0,1.2,2.0,2.4,3.0,3.5999999999999996]

The following example demonstrates the performance gains achieved by switching from a Stream<Long> type to a LongStream type (Example06i1):


package dvp.java8.streams;
 
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
 
public class Exemple06i1 {
    public static void main(String[] args) {
 
        final long limite = 10_000_000L;
        // number of processors
        System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
        // list of numbers
        long début = new Date().getTime();
        List<Long> nombres = new ArrayList<>();
        for (long i = 0; i < limite; i++) {
            nombres.add(i);
        }
        System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
        // sum of numbers - sequential method
        début = new Date().getTime();
        long somme = nombres.stream().reduce(0L, (s, i) -> s + i);
        System.out.printf("somme séquentielle du Stream<Integer> : somme=%s, durée (ms)=%s%n", somme, new Date().getTime() - début);
    }
}
  • line 22: calculation of the sum of a stream of Long numbers;

The following results are obtained:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 4537 ms
somme séquentielle du Stream<Integer> : somme=49999995000000, durée (ms)=226

Now, let’s replace line 22 with the following (Example06i2):


long somme = nombres.stream().mapToLong(n -> n.longValue()).sum();

The Stream<Integer>.mapToLong method allows us to obtain a LongStream of primitive long elements, which we then sum using the sum function. We then obtain the following results:

1
2
3
La JVM a détecté [8] coeurs sur votre machine
création de la liste des 10000000 nombres en 4511 ms
somme séquentielle du LongStream : somme=49999995000000, durée (ms)=99

The performance gain is clear.

5.6.13. primitive number stream methods


// max d'un flux de int
affiche("IntStream max", IntStream.of(1, 10, 8).max());
// min d'un flux de double
affiche("DoubleStream min", DoubleStream.of(1.5, 10.4, 8.9).min());
// moyenne d'un flux de int
affiche("IntStream average", IntStream.of(1, 10, 8).average().getAsDouble());
// statistiques d'un flux de int
affiche("IntStream summaryStatistics", IntStream.of(1, 10, 8).summaryStatistics());

Streams of primitive values (int, long, double) provide methods tailored to these types. The result of executing the previous code is as follows:

1
2
3
4
5
6
7
8
IntStream max ----
{"asInt":10,"present":true}
DoubleStream min ----
{"asDouble":1.5,"present":true}
IntStream average ----
6.333333333333333
IntStream summaryStatistics ----
{"count":3,"sum":19,"min":1,"max":10,"average":6.333333333333333}
  • The result of line 2 of the code is an OptionalInt type analogous to the Optional<Integer> type. The value stored in this object can be retrieved using the [getAsInt()] method. The presence of a value can be checked using the [isPresent()] method. Line 2 of the results does not mean that the [OptionalInt] class has fields named [asInt] and [present]. By default, the JSON library uses all public getX and isY methods of the object to be serialized to JSON. And here, there is indeed a [getAsInt] method and another [isPresent] method, even though the [asInt, present] fields themselves do not exist;
  • the result of line 4 of the code is an OptionalDouble type analogous to the Optional<Double> type;
  • the result of line 6 of the code is an OptionalDouble type whose value can be obtained using the [getAsDouble()] method. The [average] method calculates the average of the stream of numbers;
  • The result of line 8 of the code is an IntSummaryStatistics type defined as follows:
 

We can see that the resulting IntSummaryStatistics object provides various statistics about the stream of numbers, such as the number of values, sum, max, min, and average.