5. The Stream<T> type in Java 8
5.1. Example-01 - The Stream class
Operations on Observable streams have many similarities with Streams. One difference is that an element of a Stream cannot be processed until the entire Stream has been obtained, whereas an element of an Observable stream can be processed (observed) as soon as it is obtained, without waiting for the entire Observable stream to be obtained. Another difference is that once the Stream is obtained, its values are utilized by pulling them one by one from the Stream. For the Observable, it’s different. As soon as it emits a value, that value is pushed to its subscriber.
Several classes implement the concept of a Stream. Here we present the Stream<T> class:

The Stream class offers 39 methods. We will present a few of them. Consider the following code:
![]() |
package dvp.java8.streams;
import java.util.List;
import dvp.data.Person;
import dvp.data.People;
public class Example01 {
public static void main(String[] args) {
// list of people
List<Person> people = People.get();
// display 1
people.stream().forEach(p -> {
System.out.println(p);
});
System.out.println("----------------");
// display 2
people.stream().forEach(System.out::println);
}
}
- line 11: we instantiate a list of people;
- line 13: from this list, we create a Stream. All collections can be converted into Streams in this way. This allows us to take advantage of all the methods of this class, which enable us to process the elements of the collection more concisely than with loops. It also allows us to benefit from parallel processing of elements when possible;
- line 13: the [Stream.forEach] method has the following signature:
![]() |
We see that the method’s parameter is the functional interface [Consumer<T>] presented in Section 4.4—an interface whose sole method operates on type T and returns nothing.
- In the code:
people.stream().forEach(p -> {
System.out.println(p);
});
- [people.stream()] produces a stream of elements of type [Person] that feeds the [forEach] method. The parameter p is of type [Person], and the provided lambda function prints this person;
The previous code can be simplified as follows (line 18):
people.stream().forEach(System.out::println);
Rather than passing the value of a lambda function as a parameter, we pass the reference to an existing method, in this case the println method of the System.out class. Of course, this method must have the correct signature, in this case the signature of the [Consumer.accept] method: void accept(T t). As mentioned earlier, the parameter of the [accept] method will be of type [Person];
We obtain the following results:
Once a Stream has been processed, it can no longer be processed. It must be reconstructed if you wish to process it again. This is demonstrated by the following code [Example01b]:
package dvp.java8.streams;
import java.util.stream.Stream;
import dvp.data.Person;
import dvp.data.People;
public class Example01b {
public static void main(String[] args) {
// flow of people
Stream<Person> people = People.get().stream();
// display 1
people.forEach(p -> {
System.out.println(p);
});
System.out.println("----------------");
// display 2
people.forEach(System.out::println);
}
}
- Line 11: To optimize the code, we decide to construct the Stream only once. The results obtained are as follows:
{"name":"jean","age":20,"weight":70.0,"gender":"MALE"}
{"name":"marie","age":10,"weight":30.0,"gender":"FEMALE"}
{"name":"camille","age":30,"weight":55.0,"gender":"FEMALE"}
----------------
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.sourceStageSpliterator(Unknown Source)
at java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at dvp.java8.streams.Exemple02b.main(Exemple02b.java:18)
Every time you want to use a Stream, you must construct it, even if it has been constructed previously.
5.2. Example-02 - Parallel processing of elements in a Stream
![]() |
Consider the following code:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Person;
import dvp.data.People;
public class Example02 {
public static void main(String[] args) {
// list of people
List<Person> people = People.get();
// display 1
people.stream().forEach(Example02::display);
System.out.println("-----------------");
// display 2
people.stream().parallel().forEach(Example02::display);
}
public static void display(Person p) {
System.out.printf("Person %s on thread %s%n", p, Thread.currentThread().getName());
}
}
- lines 19–21: the [display] method prints to the console the JSON string of a person along with the name of the execution thread in which the display is taking place;
- line 13: displays a list of people. Note that the parameter of the [forEach] method is the reference to the previous static method;
- line 16: we do the same thing, but with the [parallel] method, we request that the stream elements be processed in parallel across multiple threads. Not all processing can be done in parallel. Here, we must assume that the display order does not matter because, in parallel processing, the execution order of the threads is not guaranteed. Note also a syntax that will become ubiquitous for both Streams and Observables:
- (continued)
- stream produces elements e1 that feed the m1 method;
- flux.m1 is in turn a stream of elements e2 that feed the m2 method;
- flux.m1.m2 is a stream of elements e3 that feed the m3 method;
The type of elements e1, e2, e3 may change as the initial stream undergoes processing.
Executing this code yields the following result:
We can see that the parallel execution (lines 5–7) took place on three different threads and did not follow the order of the elements as shown in lines 1–3. In this document, we will not focus much on the parallel processing of elements in a Stream, because that would require discussing the conditions that make such processing possible. We then discover that few operations can be performed in parallel. One that lends itself naturally to parallelism is the sum of the numerical elements of a stream, which we will now present.
5.3. Example-03 - Parallel Processing of Stream Elements
![]() |
Consider the following code (Example 03a):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class Example03a {
public static void main(String[] args) {
final long limit = 10,000,000L;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long start = new Date().getTime();
List<Long> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(i);
}
System.out.printf("Creating a list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// Sum of numbers - sequential method
start = new Date().getTime();
long sum = numbers.stream().reduce(0L, (s, i) -> s + i);
System.out.printf("sequential sum: sum=%s, duration (ms)=%s%n", sum, new Date().getTime() - start);
}
}
- In line 22, we use the [reduce] method, which has the following signature:
![]() | ![]() |
- The [reduce] method works with elements of type T;
- the [reduce] method applies the same processing to all elements in a stream: the initial value of an accumulator is provided as the first parameter. A method implementing the functional interface [BinaryOperator] [2] is provided as the second parameter: based on each element and the accumulator, this method returns a new value for the accumulator. The final value of the accumulator is the value returned by the [reduce] method. The code [3] illustrates this mechanism. The [apply] method is the method of the functional interface [BinaryOperator] [2];
Let’s return to the example code:
- line 12: we display the number of cores detected by the JVM;
- lines 15–18: a list of 10 million numbers is created;
- line 22: the sum of these numbers is calculated sequentially using a single thread;
We obtain the following results:
Now, let’s replace line 22 of the code with the following (Example03b):
long sum = numbers.stream().parallel().reduce(0L, (s, i) -> s + i);
We instruct the Stream elements to be processed in parallel using multiple threads. This is possible because the order in which the numbers are summed does not matter. We can therefore assign n1 numbers to thread T1, n2 numbers to thread T2, ... and finally sum the results provided by these different threads. We then obtain the following results:
There is therefore virtually no performance gain. This will often be the case in the examples that follow. Thread management itself is time-consuming. The operation performed by each core must be sufficiently complex for the performance gain to be noticeable. This is demonstrated by the following example (Example03c):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.BinaryOperator;
public class Example03c {
public static void main(String[] args) {
final long limit = 10_000L;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long startTime = new Date().getTime();
List<Long> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(i);
}
System.out.printf("Creating a list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// Sum of numbers - sequential method
start = new Date().getTime();
BinaryOperator<Long> bo = (s, i) -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
return s + i;
};
long sum = numbers.stream().reduce(0L, bo);
System.out.printf("sequential sum: sum=%s, duration (ms)=%s%n", sum, new Date().getTime() - start);
}
}
- line 30: we use the [reduce] method again, passing it the reference to the method from lines 23–29 as a parameter;
- line 28: the [bo] method returns the sum of its two parameters;
- lines 24–27: artificially, we make the thread wait for 1 millisecond to simulate intensive work;
We then obtain the following results:
Now, if we replace line 30 with the following:
long sum = numbers.stream().parallel().reduce(0L, bo);
we get the following results:
We can clearly see the performance gain achieved by executing the sum calculation in parallel. For processing 8 numbers:
- the sequential thread waits 8 times 1 millisecond, or 8 ms;
- the 8 parallel threads each wait 1 millisecond at the same time (for simplicity’s sake), so a total of 1 millisecond for the 8 numbers;
We can therefore expect parallel execution to be 8 times faster than sequential execution. That is roughly the case here.
5.4. Example-04 - Filtering a Stream
![]() |
Consider the following code:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Person;
import dvp.data.People;
public class Example04 {
public static void main(String[] args) {
// list of people
List<Person> people = People.get();
// display
System.out.println("age < 28 ----------------------");
people.stream().filter(p -> p.getAge() < 28).forEach(p -> {
System.out.println(p);
});
System.out.println("weight < 50 ----------------------");
people.stream().filter(p -> p.getWeight() < 50).forEach(p -> {
System.out.println(p);
});
System.out.println("age < 28 ----------------------");
people.stream().filter(p -> p.getAge() < 28).forEach(System.out::println);
System.out.println("weight < 50 ----------------------");
people.stream().filter(p -> p.getWeight() < 50).forEach(System.out::println);
}
}
- line 14: the [Stream.filter] method has the following signature:
![]() |
- The [filter] method expects as a parameter an instance of the functional interface [Predicate] presented in Section 4.2, whose only method to be implemented is the following: boolean test(T t);
- The [filter] method returns the elements of the Stream that satisfy the Predicate. It is therefore used to filter the Stream;
Consider the following code:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Person;
import dvp.data.People;
public class Example04 {
public static void main(String[] args) {
// list of people
List<Person> people = People.get();
// display
System.out.println("age < 28 ----------------------");
people.stream().filter(p -> p.getAge() < 28).forEach(p -> {
System.out.println(p);
});
System.out.println("weight < 50 ----------------------");
people.stream().filter(p -> p.getWeight() < 50).forEach(p -> {
System.out.println(p);
});
System.out.println("age < 28 ----------------------");
people.stream().filter(p -> p.getAge() < 28).forEach(System.out::println);
System.out.println("weight < 50 ----------------------");
people.stream().filter(p -> p.getWeight() < 50).forEach(System.out::println);
}
}
- lines 14-16: display people under 28 years old;
- lines 18-20: display people with a weight <50;
- line 22: does the same thing as lines 14–16 but more concisely;
- line 24: does the same thing as lines 18–20 but more concisely;
The results of the execution are as follows:
5.5. Example-05 - Create a Stream<T2> from a Stream<T1>
![]() |
Consider the following code:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Person;
import dvp.data.People;
public class Example05 {
public static void main(String[] args) {
// list of people
List<Person> people = People.get();
// output
System.out.println("Person --> String ----------------------");
people.stream().map(p -> p.getName()).forEach(System.out::println);
System.out.println("Person --> Integer ----------------------");
people.stream().map(p -> p.getAge()).forEach(System.out::println);
}
}
- Line 13: The [Stream.map] method has the following signature:
![]() |
The parameter of the [Stream.map] method is an instance of the functional interface [Function] presented in Section 4.3, whose only method to be implemented is: R apply(T t). We see that, given a type T, the [apply] function produces a type R. The [Stream.map] method will therefore produce a Stream of type R from a stream of type T (a stream of type T means here, in a technical inaccuracy that we will retain, a stream of elements of type T).
Let’s now examine the code in the example:
- line 14: from a person p, we keep only the name. We thus obtain a stream of Strings;
- line 14: from a person p, we keep only the name. We therefore obtain a stream of Integer;
The results obtained are as follows:
5.6. Example-06 - Other methods of the Stream<T> class
![]() |
We illustrate some of the 39 methods of the Stream class with the following code:
package dvp.java8.streams;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dvp.data.Person;
import dvp.data.People;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class Example06 {
// JSON mapper
static private ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// list of people
List<Person> people = People.get();
// all people
display("all", people);
// the first person
display("findFirst", people.stream().findFirst().get());
// any person
display("findAny", people.stream().findAny().get());
// people excluding the first one
print("skip 1", people.stream().skip(1L).collect(Collectors.toList()));
// the first 2 people
print("limit 2", people.stream().limit(2L).collect(Collectors.toList()));
// the number of people
print("count", people.stream().count());
// the oldest person
print("max age", people.stream().max(Comparator.comparingInt(Person::getAge)).get());
// the lightest person
print("min weight", people.stream().min(Comparator.comparingDouble(Person::getWeight)).get());
// the last person in alphabetical order by name
print("maxName", people.stream().max((p1, p2) -> p1.getName().compareToIgnoreCase(p2.getName())).get());
// the total age of all people
display("total age (reduce)", people.stream().map(p -> p.getAge()).reduce(0, (a1, a2) -> a1 + a2));
// people sorted by age in ascending order
display("people by ascending age",
people.stream().sorted(Comparator.comparingInt(Person::getAge)).collect(Collectors.toList()));
// Are there any people over 100 years old?
display("people over 100 years old (anyMatch)", people.stream().anyMatch(p -> p.getAge() > 100));
// Are all people 100 years old or younger?
display("people over 100 years old (noneMatch)", people.stream().noneMatch(p -> p.getAge() > 100));
// Are all people over 8 years old?
display("people over 8 years old (allMatch)", people.stream().allMatch(p -> p.getAge() > 8));
// Group people by gender
display("people grouped by gender", people.stream().collect(Collectors.groupingBy(p -> p.getGender())));
// removing duplicate elements from a list
display("distinct", Stream.of(1, 2, 1).distinct().collect(Collectors.toList()));
// From a Stream<Stream<T>>, we create a Stream<T>
print("flatMap", Stream.of(1, 2, 3).flatMap(i -> Stream.of(i, i + 10)).collect(Collectors.toList()));
// From a Stream<Stream<Integer>>, we create an IntStream and calculate its sum
print("flatMapToInt", Stream.of(1, 2, 3).flatMapToInt(i -> IntStream.of(i, i + 10)).sum());
// From a Stream<Stream<Integer>>, we create a DoubleStream and then an array
print("flatMapToDouble", Stream.of(1, 2, 3).flatMapToDouble(i -> DoubleStream.of(i, i * 1.2)).toArray());
// max of a stream of integers
print("reduce Integer::max", Stream.of(1, 10, 8).reduce(Integer::max).get());
// min of a stream of Doubles
print("reduce Integer::min", Stream.of(1.5, 10.4, 8.9).reduce(Double::min).get());
// average of a stream of integers
print("IntStream average", IntStream.of(1, 10, 8).average().getAsDouble());
// statistics of a stream of integers
print("IntStream summaryStatistics", IntStream.of(1, 10, 8).summaryStatistics());
}
public static <T> void display(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- lines 72, 75: display the JSON string of the method's second parameter;
- line 24: displays the JSON string for all people. The result is as follows:
5.6.1. [findFirst]
// the first person
display("findFirst", people.stream().findFirst().get());
The [findFirst] method returns the first element of a stream, if it exists. Its signature is as follows:
![]() |
The result is of type Optional<T>, a type introduced in Java 8:
![]() |
The Optional<T> class allows for different handling of null pointers. A method that needs to return a type T that may have the value null can choose to return a type Optional<T>. The [Optional<T>.isPresent()] method allows you to determine whether the method returned a value or not. The following code [Example06b] illustrates part of how Optional<T> works:
package dvp.java8.streams;
import java.util.Optional;
import com.fasterxml.jackson.core.JsonProcessingException;
public class Example06b {
public static void main(String[] args) throws JsonProcessingException {
// Optional without a value
Optional<Integer> o1 = m1();
System.out.println(o1.isPresent());
display(o1);
// optional with value
Optional<Integer> o2 = m2();
System.out.println(o2.isPresent());
display(o2);
}
private static void display(Optional<Integer> o1) {
try {
// retrieve the value from the Optional
// throws 1 exception if no value
System.out.println(o1.get());
} catch (Throwable th) {
System.out.printf("%s: %s%n", th.getClass().getName(), th.getMessage());
}
}
public static Optional<Integer> m1() {
// no value
return Optional.empty();
}
public static Optional<Integer> m2() {
// a value
return Optional.of(10);
}
}
The results are as follows:
false
java.util.NoSuchElementException: No value present
true
10
Let's return to the code illustrating the [findFirst] method:
// the first person
display("findFirst", people.stream().findFirst().get());
- line 2: to simplify the code, we use the [get] method on the Optional<Person> produced by the [findFirst] method. Clean code would require calling the [Optional<Person>.isPresent()] method before calling the [get] method;
The result is as follows:
5.6.2. [findAny]
// any person
display("findAny", people.stream().findAny().get());
The [findAny] method has the following signature:
![]() |
The [findAny] method can return any element from the stream. During testing, we observe that a sequential execution returns the first element of the stream, whereas a parallel execution can indeed return any element. This is demonstrated by the following code [Example06c]:
package dvp.java8.streams;
import java.util.List;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dvp.data.Person;
import dvp.data.People;
public class Example06c {
// JSON mapper
static private ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// list of people
List<Person> people = People.get();
// all people
display("all", people);
// any person
display("findAny parallel", people.stream().parallel().findAny().get());
// any person
print("findAny sequential", people.stream().findAny().get());
}
public static <T> void display(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- line 22: findAny executed in parallel;
- line 24: findAny executed sequentially;
The results obtained are as follows:
- line 4: the parallel execution returned the second element of the list of people. It could have been another one;
- line 6: sequential execution returned the first element of the list of people;
The use of the [findAny] method seems to make sense only in the parallel processing of a stream.
5.6.3. [skip]
// people excluding the first one
display("skip 1", people.stream().skip(1L).collect(Collectors.toList()));
The [skip] method has the following signature:
![]() |
The [skip] method skips the first n elements of a stream. As indicated in the documentation above, executing this method in parallel yields little performance gain and may even result in a loss of performance. Indeed, to skip the first n elements, the threads are forced to coordinate, which negates the performance gains from parallelism.
The [skip] method returns a Stream<Person> that is converted to a List<Person> by the [collect] method, which has the following signature:
![]() |
The [collect] method takes as a parameter an instance of type [Collector], which has a complex signature. There are predefined implementations of type [Collector] that usually allow you to avoid implementing it yourself. Here, the implementation used is [Collectors.toList()]. [Collectors] is a class with numerous static methods that implement the [Collector<T,A,R>] type. This is the first place to look when you want to convert a Stream into a standard Java collection:
![]() |
We will use some of these methods later on.
Execution yields the following result:
The first element of the list (jean) has been omitted.
5.6.4. [limit]
// the first 2 people
display("limit 2", people.stream().limit(2L).collect(Collectors.toList()));
The [limit] method has the following signature:
![]() |
The [limit] method allows you to keep only the first n elements of a stream. It is not suitable for parallel processing.
The execution yields the following result:
5.6.5. [count]
// the number of people
display("count", people.stream().count());
The [count] method has the following signature:
![]() |
The [count] method returns the number of elements in a Stream. Parallel execution of the method does not result in a performance gain, as shown in the following code (Example06d1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
public class Example06d1 {
public static void main(String[] args) {
final long limit = 10_000_000L;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long startTime = new Date().getTime();
List<Long> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(i);
}
System.out.printf("Creating the list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// Counting numbers - sequential method
Stream<Long> sNumbers = numbers.stream();
start = new Date().getTime();
long count = sNumbers.count();
System.out.printf("sequential counting: count=%s, duration (ms)=%s%n", count, new Date().getTime() - start);
}
}
- lines 11–22: create a Stream of 10 million numbers;
- lines 22–24: counting the Stream;
The execution yields the following result:
If we replace line 22 of the code with the following (Example06d2):
Stream<Long> sNumbers = numbers.stream().parallel();
we get the following results:
The JVM detected [8] cores on your machine
Creation of the list of 100,000,000 numbers in 4,341 ms
parallel counting: counter=10000000, duration (ms)=100
5.6.6. [max, min]
// the oldest person
display("max age", people.stream().max(Comparator.comparingInt(Person::getAge)).get());
The [max] method has the following signature:
![]() |
The [max] method returns the maximum value of a stream using the comparator passed to it as a parameter. Comparator is a functional interface whose only method to implement has the signature: int compare(T o1, T o2). This method must return -1 if o1 < o2, 0 if o1.equals(o2), and +1 if o1 > o2. The Comparator functional interface has many default static methods that implement the Comparator interface for the most common cases. Thus, in the statement:
display("max age", people.stream().max(Comparator.comparingInt(Person::getAge)).get());
We use the static method [Comparator.comparingInt], whose signature is as follows:
![]() |
The ToIntFunction type is a functional interface:
![]() |
The [applyAsInt] method of the ToIntFunction functional interface produces an int type from a type T. Let’s return to our code:
display("max age", people.stream().max(Comparator.comparingInt(Person::getAge)).get());
The actual parameter of the [Comparator.comparingInt] method must be a Person --> int lambda here. We pass the reference to the [Person.getAge] method, which has this signature. Ultimately, we will get the person with the oldest age. We obtain an Optional<Person> type, from which we extract the value using the [Optional.get] method. We get the following result:
Calculating the maximum in parallel does not result in any performance gains, as shown in the following example: (Example06e1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
public class Example06e1 {
public static void main(String[] args) {
// data
// final long limit = 100L;
// final boolean verbose = true;
final long limit = 10,000,000L;
final boolean verbose = false;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long start = new Date().getTime();
List<Long> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(new Random().nextLong());
}
System.out.printf("Creating a list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// maximum of the numbers - sequential method
Stream<Long> sNumbers = numbers.stream();
Comparator<Long> compLong = (l1, l2) -> {
if (verbose) {
// thread
System.out.printf("[%s]", Thread.currentThread().getName());
}
// comparison
long v1 = l1.longValue();
long v2 = l2.longValue();
if (v1 < v2) {
return -1;
} else {
if (v1 == v2) {
return 0;
} else {
return +1;
}
}
};
start = new Date().getTime();
// max_length = sNumbers.max(Comparator.naturalOrder()).get();
long max = sNumbers.max(compLong).get();
System.out.printf("%nSequential max: max=%s, duration (ms)=%s%n", max, new Date().getTime() - start);
}
}
- line 29: we have a stream of random integers of type Long;
- lines 30–47: the lambda variable compLong implements the Comparator<Long> interface. This interface is normally implemented by the [Comparator.naturalOrder()] method on line 49. But here, we want to display the execution thread (lines 31–33). So we implement the interface ourselves;
- line 50: finding the maximum;
We obtain the following results:
![]() |
Now, if we replace line 27 with the following (Example06e2):
Stream<Long> sNumbers = numbers.stream().parallel();
we get the following results:
![]() |
The parallel execution was therefore slower. If we increase the number of numbers to 10 million with verbose=false, we get the following results:
For sequential execution:
for parallel execution, which remains slower.
We use the [Stream.min] method in a similar way:
// the lightest person
display("min weight", people.stream().min(Comparator.comparingDouble(Person::getWeight)).get());
5.6.7. [reduce]
// the total age of all people
print("total age (reduce)", people.stream().map(p -> p.getAge()).reduce(0, (a1, a2) -> a1 + a2));
The [reduce] method was introduced in Section 5.3. Line 2 above sums the ages of all people. The result is as follows:
5.6.8. [sorted]
// people by ascending age
display("people by ascending age",
people.stream().sorted(Comparator.comparingInt(Person::getAge)).collect(Collectors.toList()));
// people sorted by alphabetical order of names
List<Person> people = people.stream().sorted((p1, p2) -> p1.getName().compareTo(p2.getName())).collect(Collectors.toList());
display("people sorted by alphabetical order of names", lPeople);
The [sorted] method (lines 3 and 5) has the following signature:
![]() |
The [sorted] method takes as a parameter the [Comparator] type described in Section 5.6.6 for the min and max methods. It allows a Stream to be sorted in the order of the comparator passed to it as a parameter. We have seen that the [Comparator] interface provides several default static methods that implement common comparators, notably for numbers and strings. Here, we use the [Comparator.comparingInt] method, which takes as a parameter a type ToIntFunction, which is a functional interface for the [applyAsInt] method with the following signature: int applyAsInt(T t). Here, the actual parameter passed to the [Comparator.comparingInt] method on line 3 is the reference to the [Person.age] method, which returns the person’s age.
The [Comparator] interface does not provide static methods for comparing strings. On line 5, we construct a lambda ourselves that implements the only method of this interface: int compare(T t1, T t2)
(p1, p2) -> p1.getName().compareTo(p2.getName())
This lambda compares the names of the people. The results obtained are as follows:
Parallel execution of the sort does not seem possible, as shown in the following code (Example06f1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Example06f1 {
// JSON mapper
static ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// data
final long limit = 100L;
final boolean verbose = true;
// final long limit = 10,000,000L;
// final boolean verbose = false;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long start = new Date().getTime();
List<Integer> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(new Random().nextInt(1000));
}
System.out.printf("Creating a list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// Sorting the numbers - sequential method
Stream<Integer> sNumbers = numbers.stream();
start = new Date().getTime();
Comparator<Integer> compInt = (i1, i2) -> {
if (verbose) {
// thread
System.out.printf("[%s]", Thread.currentThread().getName());
}
// comparison
int v1 = i1.intValue();
int v2 = i2.intValue();
if (v1 < v2) {
return +1;
} else {
if (v1 == v2) {
return 0;
} else {
return -1;
}
}
};
if (verbose) {
display("numbers", sNumbers.sorted(compInt).collect(Collectors.toList()));
}
System.out.printf("sequential sort: duration (ms)=%s%n", new Date().getTime() - start);
}
public static <T> void display(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- lines 30–36: we construct a stream of random numbers;
- line 32: we pass the compInt lambda (lines 38-55) to the [sorted] method. This lambda sorts the numbers in descending order and displays the thread executing it.
The results obtained are as follows:
![]() |
If, in the previous code, we replace line 36 with the following (Example06f2):
Stream<Integer> sNumbers = numbers.stream().parallel();
we get the following results:
![]() |
Surprisingly, we find that the stream of numbers was sorted using a single thread. There was no parallelism. Or am I missing something?
5.6.9. [anyMatch, noneMatch, allMatch]
// Are there people over 100 years old?
display("people over 100 years old (anyMatch)", people.stream().anyMatch(p -> p.getAge() > 100));
// Are all people 100 years old or younger?
display("people over 100 years old (noneMatch)", people.stream().noneMatch(p -> p.getAge() > 100));
// Are all people over 8 years old?
display("people over 8 years old (allMatch)", people.stream().allMatch(p -> p.getAge() > 8));
Lines 2, 4, and 6: the methods [anyMatch, noneMatch, allMatch] take a Predicate type as a parameter, as described in Section 4.2. They therefore perform filtering. All three return a Boolean:
- anyMatch returns true if there is at least one element in the Stream that satisfies the filter;
- noneMatch returns true if there are no elements in the Stream that satisfy the filter;
- allMatch returns true if all elements of the Stream satisfy the filter;
The results obtained are as follows:
5.6.10. [collect(Collectors.groupingBy)]
// We group people by gender
print("people grouped by gender", people.stream().collect(Collectors.groupingBy(p -> p.getGender())));
The [collect] method was introduced in Section 5.6.3. Its parameter is an implementation of the [Collector] interface. The [Collectors] class provides a number of static methods that implement the [Collector] interface. So far, we have used the [Collectors.toList()] method. Here, we use the static [Collectors.groupingBy] method, which creates a dictionary from the Stream. Its signature is as follows:
![]() |
The [groupingBy] method creates a Map<K,List<T>> from a Stream<T>. The key K is provided by the [groupingBy] method’s parameter of type Function<T,K>, whose sole method has the signature: K apply(T t). If we want to create a map indexed by a person’s gender, we must provide a function that generates the gender from a person. Here, we pass the reference to the [Person.getGender] method as the actual parameter of the [groupingBy] method. The results obtained are as follows:
Line 2 contains the JSON string of a dictionary indexed by two keys: MAN and WOMAN.
Parallel computation does not result in performance gains, as shown in the following example (Example06g1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Example06g1 {
// JSON mapper
static ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// data
final long limit = 100L;
final boolean verbose = true;
// final long limit = 10,000,000L;
// final boolean verbose = false;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long start = new Date().getTime();
List<Integer> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(new Random().nextInt(1000));
}
System.out.printf("Creating a list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// Grouping numbers by hundreds - sequential method
Stream<Integer> sNumbers = numbers.stream();
Function<Integer, Integer> groupByHundreds = n -> {
if (verbose) {
System.out.printf("[%s]", Thread.currentThread().getName());
}
return n / 100;
};
start = new Date().getTime();
// Map<Integer, List<Integer>> numbers = sNumbers.collect(Collectors.groupingBy(number -> number / 100));
Map<Integer, List<Integer>> lNumbers = sNumbers.collect(Collectors.groupingBy(groupByCent));
System.out.printf("%nSequential grouping: duration (ms)=%s%n", new Date().getTime() - start);
// results
if (verbose) {
print("grouped numbers", lNumbers);
}
}
public static <T> void display(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- lines 23–38: constructing a stream of numbers;
- line 47: the numbers are grouped by hundreds. The lambda function in lines 39–44 is used to display the execution thread;
The execution results are as follows:
![]() |
If, in the code, we replace line 38 with the following line (Example06g2):
Stream<Integer> sNumbers = numbers.stream().parallel();
we obtain the following results:
![]() |
We can see that parallel execution of the grouping has degraded performance.
5.6.11. [distinct]
// Remove duplicate elements from a list
display("distinct", Stream.of(1, 2, 1).distinct().collect(Collectors.toList()));
The [distinct] method has the following signature:
![]() |
It removes duplicates from a stream. The [Stream.of] method (line 2) has the following signature:
![]() |
It allows you to create a Stream from explicitly provided values. The results of the execution are as follows:
5.6.12. [flatMap]
// From a Stream<Stream<T>>, we create a Stream<T>
print("flatMap", Stream.of(1, 2, 3).flatMap(i -> Stream.of(i, i + 10)).collect(Collectors.toList()));
The [flatMap] method has the following signature:
![]() ![]() |
The [flatMap] method takes as a parameter a function that:
- takes an element of type T from the Stream as a parameter;
- returns a Stream<R>;
If, instead of the [flatMap] method, we had used the [map] method described in Section 5.5, the result would be a Stream<Stream<R>> type, where each element of type T in the initial stream would have produced a Stream<R> element. The [flatMap] method, on the other hand, returns a Stream<R> type. It flattens the various Stream<R> streams into a single stream. This is what the results of executing the previous code show:
There are specialized variants of [flatMap]:
// From a Stream<IntStream>, we create an IntStream and calculate its sum
print("flatMapToInt", Stream.of(1, 2, 3).flatMapToInt(i -> IntStream.of(i, i + 10)).sum());
The [flatMapToInt] method has the following signature:
![]() |
The [flatMapToInt] method takes as a parameter a function that returns an IntStream of the following type:
![]() |
IntStream is a stream of int. This type is preferable to the Stream<Integer> type because its processing avoids boxing/unboxing between the Integer and int types. This interface inherits many methods from the Stream<T> type and adds others, including the [sum] method above, which sums the elements of the IntStream.
The following code illustrates the use of the analogous [flatMapToDouble] method:
// From a Stream<DoubleStream>, we create a DoubleStream and then an array
print("flatMapToDouble", Stream.of(1, 2, 3).flatMapToDouble(i -> DoubleStream.of(i, i * 1.2)).toArray());
The [DoubleStream.toArray] method allows you to convert from a DoubleStream type to a double[] type.
The results for these two examples are as follows:
The following example demonstrates the performance gains achieved by switching from a Stream<Long> type to a LongStream type (Example06i1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class Example06i1 {
public static void main(String[] args) {
final long limit = 10,000,000L;
// number of processors
System.out.printf("The JVM detected [%s] cores on your machine%n", Runtime.getRuntime().availableProcessors());
// list of numbers
long start = new Date().getTime();
List<Long> numbers = new ArrayList<>();
for (long i = 0; i < limit; i++) {
numbers.add(i);
}
System.out.printf("Creating a list of %s numbers in %s ms%n", limit, new Date().getTime() - start);
// Sum of numbers - sequential method
start = new Date().getTime();
long sum = numbers.stream().reduce(0L, (s, i) -> s + i);
System.out.printf("Sequential sum of the Stream<Integer>: sum=%s, duration (ms)=%s%n", sum, new Date().getTime() - start);
}
}
- line 22: calculation of the sum of a stream of Long numbers;
The following results are obtained:
Now, let’s replace line 22 with the following (Example06i2):
long sum = numbers.stream().mapToLong(n -> n.longValue()).sum();
The Stream<Integer>.mapToLong method allows us to obtain a LongStream of primitive long elements, which we then sum using the sum function. We then obtain the following results:
The performance gain is clear.
5.6.13. primitive number stream methods
// max of an int stream
print("IntStream max", IntStream.of(1, 10, 8).max());
// min of a double stream
print("DoubleStream min", DoubleStream.of(1.5, 10.4, 8.9).min());
// average of an int stream
print("IntStream average", IntStream.of(1, 10, 8).average().getAsDouble());
// statistics of an int stream
print("IntStream summaryStatistics", IntStream.of(1, 10, 8).summaryStatistics());
Streams of primitive values (int, long, double) provide methods tailored to these types. The result of executing the previous code is as follows:
- The result of line 2 of the code is an OptionalInt type analogous to the Optional<Integer> type. The value stored in this object can be retrieved using the [getAsInt()] method. The presence of a value can be checked using the [isPresent()] method. Line 2 of the results does not mean that the [OptionalInt] class has fields named [asInt] and [present]. By default, the JSON library uses all public getX and isY methods of the object to be serialized to JSON. And here, there is indeed a [getAsInt] method and another [isPresent] method, even though the [asInt, present] fields themselves do not exist;
- the result of line 4 of the code is an OptionalDouble type analogous to the Optional<Double> type;
- the result of line 6 of the code is an OptionalDouble type whose value can be obtained using the [getAsDouble()] method. The [average] method calculates the average of the stream of numbers;
- The result of line 8 of the code is an IntSummaryStatistics type defined as follows:
![]() |
We can see that the resulting IntSummaryStatistics object provides various statistics about the stream of numbers, such as the number of values, sum, max, min, and average.




































