5. El tipo Stream<T> de Java 8
5.1. Ejemplo-01: la clase Stream
Las operaciones con los flujos Observable tienen muchos puntos en común con los flujos Stream. Una diferencia es que un elemento de un flujo Stream no puede procesarse hasta que se haya obtenido todo el flujo Stream, mientras que un elemento de un flujo Observable puede procesarse (observado) tan pronto como se obtiene, sin esperar a que se obtenga la totalidad del flujo Observable. Otra diferencia es que, una vez obtenido el Stream, se explotan sus valores extrayéndolos (pull) uno a uno del Stream. En el caso del observable, es diferente. Tan pronto como este emite un valor, este se envía (pushed) a su suscriptor.
Varias clases implementan el concepto de Stream. A continuación presentamos la clase Stream<T>:

La clase Stream cuenta con 39 métodos. Vamos a presentar algunos de ellos. Consideremos el siguiente código:
![]() |
package dvp.java8.streams;
import java.util.List;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple01 {
public static void main(String[] args) {
// lista de personas
List<Personne> personnes = Personnes.get();
// visualización 1
personnes.stream().forEach(p -> {
System.out.println(p);
});
System.out.println("----------------");
// visualización 2
personnes.stream().forEach(System.out::println);
}
}
- línea 11: se instancia una lista de personas;
- línea 13: a partir de esta lista, se crea un Stream. Todas las colecciones pueden transformarse así en flujos Stream. Esto permite aprovechar todos los métodos de esta clase, lo que permite tratar los elementos de la colección de forma más concisa que con bucles. También permite aprovechar el paralelismo en el tratamiento de los elementos cuando este es posible;
- línea 13: el método [Stream.forEach] tiene la siguiente firma:
![]() |
Se observa que el parámetro del método es la interfaz funcional [Consumer<T>] presentada en el apartado 4.4, una interfaz cuyo único método utiliza el tipo T y no devuelve ningún valor.
- En el código:
personnes.stream().forEach(p -> {
System.out.println(p);
});
- [personnes.stream()] genera un flujo de elementos de tipo [Personne] que alimenta el método [forEach]. El parámetro p es de tipo [Personne] y la función lambda proporcionada muestra a esta persona;
El código anterior se puede simplificar de la siguiente manera (línea 18):
personnes.stream().forEach(System.out::println);
En lugar de pasar como parámetro el valor de una función lambda, pasamos la referencia de un método existente, en este caso el método println de la clase System.out. Por supuesto, este método debe tener la firma correcta, en este caso la firma del método [Consumer.accept]: void accept(T t). Como se ha mencionado anteriormente, el parámetro del método [accept] será de tipo [Personne];
Obtenemos los siguientes resultados:
Una vez que se ha utilizado un flujo Stream, ya no se puede volver a utilizar. Es necesario reconstruirlo si se desea volver a utilizarlo. Esto se muestra en el siguiente código [Exemple01b]:
package dvp.java8.streams;
import java.util.stream.Stream;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple01b {
public static void main(String[] args) {
// flujo de personas
Stream<Personne> personnes = Personnes.get().stream();
// visualización 1
personnes.forEach(p -> {
System.out.println(p);
});
System.out.println("----------------");
// visualización 2
personnes.forEach(System.out::println);
}
}
- línea 11: para optimizar el código, se decide construir el Stream una sola vez. Los resultados obtenidos son entonces los siguientes:
{"nom":"jean","age":20,"poids":70.0,"sexe":"HOMME"}
{"nom":"marie","age":10,"poids":30.0,"sexe":"FEMME"}
{"nom":"camille","age":30,"poids":55.0,"sexe":"FEMME"}
----------------
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.sourceStageSpliterator(Unknown Source)
at java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at dvp.java8.streams.Exemple02b.main(Exemple02b.java:18)
Cada vez que se quiera utilizar un Stream, hay que generarlo aunque ya se haya generado anteriormente.
5.2. Ejemplo-02: procesamiento en paralelo de los elementos de un Stream
![]() |
Consideremos el siguiente código:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple02 {
public static void main(String[] args) {
// lista de personas
List<Personne> personnes = Personnes.get();
// visualización 1
personnes.stream().forEach(Exemple02::affiche);
System.out.println("-----------------");
// visualización 2
personnes.stream().parallel().forEach(Exemple02::affiche);
}
public static void affiche(Personne p) {
System.out.printf("Personne %s sur thread %s%n", p, Thread.currentThread().getName());
}
}
- líneas 19-21: el método [affiche] escribe en la consola la cadena jSON de una persona, así como el nombre del hilo de ejecución en el que se realiza la visualización;
- línea 13: muestra una lista de personas. Cabe señalar que el parámetro del método [forEach] es la referencia del método estático anterior;
- Línea 16: hacemos lo mismo, pero con el método [parallel] solicitamos que el procesamiento de los elementos del flujo se realice en paralelo en varios subprocesos. No todos los procesos pueden realizarse en paralelo. En este caso, hay que suponer que el orden de visualización no tiene importancia, ya que en un procesamiento paralelo no se garantiza el orden de ejecución de los subprocesos. Por otra parte, cabe destacar una sintaxis que se hará omnipresente tanto para los Stream como para los Observable:
- (continuación)
- flux genera elementos e1 que alimentan el método m1;
- flux.m1 es, a su vez, un flujo de elementos e2 que alimentan el método m2;
- flux.m1.m2 es un flujo de elementos e3 que alimentan el método m3;
El tipo de los elementos e1, e2, e3 puede cambiar a medida que el flujo inicial se somete a diferentes procesamientos.
La ejecución de este código da el siguiente resultado:
Se observa que la ejecución en paralelo (líneas 5-7) se ha realizado en tres subprocesos diferentes y no ha respetado el orden de los elementos, que es el de las líneas 1-3. En este documento, no insistiremos mucho en el procesamiento en paralelo de los elementos de un Stream, ya que para ello habría que abordar las condiciones que hacen posible dicho procesamiento. Descubrimos entonces que son pocos los procesamientos que pueden realizarse en paralelo. Uno de los que se presta naturalmente al paralelismo es la suma de los elementos numéricos de un flujo que presentamos a continuación.
5.3. Ejemplo-03: procesamiento en paralelo de los elementos de un Stream
![]() |
Consideremos el siguiente código (Ejemplo 03a):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class Exemple03a {
public static void main(String[] args) {
final long limite = 10_000_000L;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de nombres
long début = new Date().getTime();
List<Long> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(i);
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// suma de números - método secuencial
début = new Date().getTime();
long somme = nombres.stream().reduce(0L, (s, i) -> s + i);
System.out.printf("somme séquentielle : somme=%s, durée (ms)=%s%n", somme, new Date().getTime() - début);
}
}
- En la línea 22, utilizamos el método [reduce], cuya firma es la siguiente:
![]() | ![]() |
- el método [reduce] trabaja con elementos de tipo T;
- el método [reduce] aplica el mismo tratamiento a todos los elementos de un flujo: el valor inicial de un acumulador se proporciona como primer parámetro. Se proporciona como segundo parámetro un método que instancia la interfaz funcional [BinaryOperator] [2]: a partir de cada elemento y del acumulador, este método proporciona un nuevo valor del acumulador. El valor final de este es el valor devuelto por el método [reduce]. El código [3] explica este mecanismo. El método [apply] es el método de la interfaz funcional [BinaryOperator] [2];
Volvamos al código de ejemplo:
- línea 12: se muestra el número de núcleos detectados por JVM;
- líneas 15-18: se crea una lista de 10 millones de números;
- línea 22: se calcula la suma de estos números secuencialmente con un solo subproceso;
Se obtienen los siguientes resultados:
Ahora, sustituyamos la línea 22 del código por la siguiente (Ejemplo03b):
long somme = nombres.stream().parallel().reduce(0L, (s, i) -> s + i);
Se solicita que los elementos del Stream se procesen en paralelo utilizando varios subprocesos. Esto es posible porque el orden de suma de los números no tiene importancia. Por lo tanto, podemos asignar n1 números a un subproceso T1, n2 números a un subproceso T2, ... y, finalmente, sumar las sumas proporcionadas por estos diferentes subprocesos. Obtenemos entonces los siguientes resultados:
Por lo tanto, prácticamente no hay ganancia de rendimiento. En los ejemplos que siguen, este será a menudo el caso. La gestión de los subprocesos en sí misma requiere mucho tiempo. Es necesario que la operación realizada por cada núcleo sea lo suficientemente compleja como para que se aprecie la ganancia de rendimiento. Esto es lo que muestra el siguiente ejemplo (Ejemplo03c):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.function.BinaryOperator;
public class Exemple03c {
public static void main(String[] args) {
final long limite = 10_000L;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de números
long début = new Date().getTime();
List<Long> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(i);
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// suma de números - método secuencial
début = new Date().getTime();
BinaryOperator<Long> bo = (s, i) -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
return s + i;
};
long somme = nombres.stream().reduce(0L, bo);
System.out.printf("somme séquentielle : somme=%s, durée (ms)=%s%n", somme, new Date().getTime() - début);
}
}
- línea 30: se vuelve a utilizar el método [reduce], al que se le proporciona como parámetro la referencia del método de las líneas 23-29;
- línea 28: el método [bo] proporciona la suma de sus dos parámetros;
- líneas 24-27: de forma artificial, se hace esperar al hilo 1 milisegundo para simular un trabajo intensivo;
Se obtienen entonces los siguientes resultados:
Ahora bien, si sustituimos la línea 30 por la siguiente:
long somme = nombres.stream().parallel().reduce(0L, bo);
se obtienen los siguientes resultados:
Se aprecia claramente la mejora en el rendimiento que aporta la ejecución en paralelo del cálculo de la suma. Para el procesamiento de 8 números:
- el hilo secuencial espera 8 veces 1 milisegundo, es decir, 8 ms;
- los 8 subprocesos paralelos esperan al mismo tiempo 1 milisegundo cada uno (a modo de simplificación), es decir, un total de 1 milisegundo para los 8 números;
Por lo tanto, cabe esperar que la ejecución en paralelo sea 8 veces más rápida que la ejecución secuencial. Este es más o menos el caso aquí.
5.4. Ejemplo-04: filtrar un Stream
![]() |
Consideremos el siguiente código:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple04 {
public static void main(String[] args) {
// lista de personas
List<Personne> personnes = Personnes.get();
// visualizaciones
System.out.println("age < 28 ----------------------");
personnes.stream().filter(p -> p.getAge() < 28).forEach(p -> {
System.out.println(p);
});
System.out.println("poids < 50 ----------------------");
personnes.stream().filter(p -> p.getPoids() < 50).forEach(p -> {
System.out.println(p);
});
System.out.println("age < 28 ----------------------");
personnes.stream().filter(p -> p.getAge() < 28).forEach(System.out::println);
System.out.println("poids < 50 ----------------------");
personnes.stream().filter(p -> p.getPoids() < 50).forEach(System.out::println);
}
}
- línea 14: el método [Stream.filter] tiene la siguiente firma:
![]() |
- el método [filter] espera como parámetro una instancia de la interfaz funcional [Predicate] presentada en el apartado 4.2 y cuyo único método a implementar es el siguiente: boolean test(T t);
- El método [filter] devuelve los elementos del flujo que cumplen con el criterio Predicate. Por lo tanto, sirve para filtrar el Stream;
Consideremos el siguiente código:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple04 {
public static void main(String[] args) {
// lista de personas
List<Personne> personnes = Personnes.get();
// visualizaciones
System.out.println("age < 28 ----------------------");
personnes.stream().filter(p -> p.getAge() < 28).forEach(p -> {
System.out.println(p);
});
System.out.println("poids < 50 ----------------------");
personnes.stream().filter(p -> p.getPoids() < 50).forEach(p -> {
System.out.println(p);
});
System.out.println("age < 28 ----------------------");
personnes.stream().filter(p -> p.getAge() < 28).forEach(System.out::println);
System.out.println("poids < 50 ----------------------");
personnes.stream().filter(p -> p.getPoids() < 50).forEach(System.out::println);
}
}
- líneas 14-16: muestran a las personas con una edad <28;
- líneas 18-20: muestran a las personas con un peso <50;
- línea 22: hace lo mismo que las líneas 14-16, pero de forma más concisa;
- línea 24: hace lo mismo que las líneas 18-20, pero de forma más concisa;
Los resultados de la ejecución son los siguientes:
5.5. Ejemplo-05: crear un Stream<T2> a partir de un Stream<T1>
![]() |
Consideremos el siguiente código:
package dvp.java8.streams;
import java.util.List;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple05 {
public static void main(String[] args) {
// lista de personas
List<Personne> personnes = Personnes.get();
// visualizaciones
System.out.println("Personne --> String ----------------------");
personnes.stream().map(p -> p.getNom()).forEach(System.out::println);
System.out.println("Personne --> Integer ----------------------");
personnes.stream().map(p -> p.getAge()).forEach(System.out::println);
}
}
- En la línea 13, el método [Stream.map] tiene la siguiente firma:
![]() |
El parámetro del método [Stream.map] es una instancia de la interfaz funcional [Function] presentada en el apartado 4.3 y cuyo único método a implementar es: R apply(T t). Se observa que, a partir de un tipo T, la función [apply] produce un tipo R. Por lo tanto, el método [Stream.map] generará un flujo Stream de tipo R a partir de un flujo de tipo T (flujo de tipo T significa aquí, en un uso impropio del lenguaje que mantendremos, flujo de elementos de tipo T).
Analicemos ahora el código del ejemplo:
- línea 14: de una persona p, solo se conserva el nombre. Se obtiene, por tanto, un flujo de String;
- línea 14: de una persona p, solo se conserva el nombre. Se obtiene, por tanto, un flujo de Integer;
Los resultados obtenidos son los siguientes:
5.6. Ejemplo-06: otros métodos de la clase Stream<T>
![]() |
Ilustramos algunos de los 39 métodos de la clase Stream con el siguiente código:
package dvp.java8.streams;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dvp.data.Personne;
import dvp.data.Personnes;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class Exemple06 {
// mappeador jSON
static private ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// lista de personas
List<Personne> personnes = Personnes.get();
// todas las personas
affiche("all", personnes);
// la primera persona
affiche("findFirst", personnes.stream().findFirst().get());
// cualquier persona
affiche("findAny", personnes.stream().findAny().get());
// las personas excepto la primera
affiche("skip 1", personnes.stream().skip(1L).collect(Collectors.toList()));
// las dos primeras personas
affiche("limit 2", personnes.stream().limit(2L).collect(Collectors.toList()));
// el número de personas
affiche("count", personnes.stream().count());
// la persona de más edad
affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());
// la persona más delgada
affiche("poids min", personnes.stream().min(Comparator.comparingDouble(Personne::getPoids)).get());
// la última persona por orden alfabético de los nombres
affiche("nom max", personnes.stream().max((p1, p2) -> p1.getNom().compareToIgnoreCase(p2.getNom())).get());
// la edad total de todas las personas
affiche("âge total (reduce)", personnes.stream().map(p -> p.getAge()).reduce(0, (a1, a2) -> a1 + a2));
// las personas por orden de edad ascendente
affiche("personnes par âge croissant",
personnes.stream().sorted(Comparator.comparingInt(Personne::getAge)).collect(Collectors.toList()));
// ¿Hay personas de más de 100 años?
affiche("des personnes de + de 100 ans (anyMatch)", personnes.stream().anyMatch(p -> p.getAge() > 100));
// ¿Todas las personas tienen 100 años o menos?
affiche("des personnes de + de 100 ans (noneMatch)", personnes.stream().noneMatch(p -> p.getAge() > 100));
// ¿Todas las personas tienen más de 8 años?
affiche("des personnes de + de 8 ans (allMatch)", personnes.stream().allMatch(p -> p.getAge() > 8));
// ¿Se agrupan las personas por sexo?
affiche("personnes regroupées par sexe", personnes.stream().collect(Collectors.groupingBy(p -> p.getSexe())));
// Eliminación de elementos duplicados de una lista
affiche("distinct", Stream.of(1, 2, 1).distinct().collect(Collectors.toList()));
// de un Stream<Stream<T>>, se crea un Stream<T>
affiche("flatMap", Stream.of(1, 2, 3).flatMap(i -> Stream.of(i, i + 10)).collect(Collectors.toList()));
// de un Stream<Stream<Integer>>, se crea un IntStream del que se calcula la suma
affiche("flatMapToInt", Stream.of(1, 2, 3).flatMapToInt(i -> IntStream.of(i, i + 10)).sum());
// de un Stream<Stream<Integer>>, se crea un DoubleStream y luego una matriz
affiche("flatMapToDouble", Stream.of(1, 2, 3).flatMapToDouble(i -> DoubleStream.of(i, i * 1.2)).toArray());
// máximo de un flujo de enteros
affiche("reduce Integer::max", Stream.of(1, 10, 8).reduce(Integer::max).get());
// mínimo de un flujo de Double
affiche("reduce Integer::min", Stream.of(1.5, 10.4, 8.9).reduce(Double::min).get());
// media de un flujo de enteros
affiche("IntStream average", IntStream.of(1, 10, 8).average().getAsDouble());
// estadísticas de un flujo de enteros
affiche("IntStream summaryStatistics", IntStream.of(1, 10, 8).summaryStatistics());
}
public static <T> void affiche(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- líneas 72, 75: muestran la cadena jSON del segundo parámetro del método;
- línea 24: muestra la cadena jSON de todas las personas. Se obtiene el siguiente resultado:
5.6.1. [findFirst]
// la primera persona
affiche("findFirst", personnes.stream().findFirst().get());
El método [findFirst] devuelve el primer elemento de un flujo, si existe. Su firma es la siguiente:
![]() |
El resultado es de tipo Optional<T>, un tipo introducido por Java 8:
![]() |
La clase Optional<T> permite gestionar de forma diferente los punteros null. Un método que deba devolver un tipo T que pueda tener el valor null puede decidir devolver un tipo Optional<T>. El método [Optional<T>.isPresent()] permite saber si el método ha devuelto un valor o no. El siguiente código [Exemple06b] ilustra parte del funcionamiento de Optional<T>:
package dvp.java8.streams;
import java.util.Optional;
import com.fasterxml.jackson.core.JsonProcessingException;
public class Exemple06b {
public static void main(String[] args) throws JsonProcessingException {
// opcional sin valor
Optional<Integer> o1 = m1();
System.out.println(o1.isPresent());
affiche(o1);
// opcional con valor
Optional<Integer> o2 = m2();
System.out.println(o2.isPresent());
affiche(o2);
}
private static void affiche(Optional<Integer> o1) {
try {
// se recupera el valor del Optional
// lanza una excepción si no hay valor
System.out.println(o1.get());
} catch (Throwable th) {
System.out.printf("%s : %s%n", th.getClass().getName(), th.getMessage());
}
}
public static Optional<Integer> m1() {
// sin valor
return Optional.empty();
}
public static Optional<Integer> m2() {
// un valor
return Optional.of(10);
}
}
Los resultados obtenidos son los siguientes:
false
java.util.NoSuchElementException : No value present
true
10
Volvamos al código de ejemplo del método [findFirst]:
// la primera persona
affiche("findFirst", personnes.stream().findFirst().get());
- línea 2: para simplificar el código, utilizamos el método [get] sobre el Optional<Personne> generado por el método [findFirst]. Un código limpio requeriría llamar al método [Optional<Personne>.isPresent()] antes de llamar al método [get];
El resultado obtenido es el siguiente:
5.6.2. [findAny]
// n'importe quelle personne
affiche("findAny", personnes.stream().findAny().get());
El método [findAny] tiene la siguiente firma:
![]() |
El método [findAny] puede devolver cualquier elemento del flujo. Durante las pruebas, se observa que una ejecución secuencial devuelve el primer elemento del flujo, mientras que una ejecución paralela puede devolver efectivamente cualquier elemento. Esto se muestra en el siguiente código [Exemple06c]:
package dvp.java8.streams;
import java.util.List;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import dvp.data.Personne;
import dvp.data.Personnes;
public class Exemple06c {
// convertidor jSON
static private ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// lista de personas
List<Personne> personnes = Personnes.get();
// todas las personas
affiche("all", personnes);
// cualquier persona
affiche("findAny parallèle", personnes.stream().parallel().findAny().get());
// cualquier persona
affiche("findAny séquentiel", personnes.stream().findAny().get());
}
public static <T> void affiche(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- línea 22: findAny ejecutado en paralelo;
- línea 24: findAny ejecutado secuencialmente;
Los resultados obtenidos son los siguientes:
- línea 4: la ejecución en paralelo devolvió el elemento 2 de la lista de personas. Podría haber sido otro;
- línea 6: la ejecución secuencial devolvió el primer elemento de la lista de personas;
El uso del método [findAny] solo parece tener sentido en el procesamiento paralelo de un flujo.
5.6.3. [skip]
// las personas sin la primera
affiche("skip 1", personnes.stream().skip(1L).collect(Collectors.toList()));
El método [skip] tiene la siguiente firma:
![]() |
El método [skip] ignora los primeros n elementos de un flujo. Como indica la documentación anterior, la ejecución de este método en paralelo aporta pocas mejoras de rendimiento e incluso puede provocar una pérdida del mismo. De hecho, para ignorar los primeros n elementos, los subprocesos se ven obligados a coordinarse, lo que anula las mejoras de rendimiento debidas al paralelismo.
El método [skip] devuelve un flujo Stream<Personne> que se transforma en un tipo List<Personne> mediante el método [collect], cuya firma es la siguiente:
![]() |
El método [collect] admite como parámetro una instancia del tipo [Collector], cuya firma es compleja. Existen implementaciones predefinidas del tipo [Collector] que, en la mayoría de los casos, permiten evitar tener que implementarlo uno mismo. En este caso, la implementación utilizada es [Collectors.toList()]. [Collectors] es una clase que posee numerosos métodos estáticos que implementan el tipo [Collector<T,A,R>]. Es ahí donde hay que buscar en primer lugar cuando se quiere transformar un Stream en una colección estándar de Java:
![]() |
Utilizaremos algunos de estos métodos más adelante.
La ejecución da el siguiente resultado:
Se ha omitido el primer elemento de la lista (jean).
5.6.4. [limit]
// las dos primeras personas
affiche("limit 2", personnes.stream().limit(2L).collect(Collectors.toList()));
El método [limit] tiene la siguiente firma:
![]() |
El método [limit] permite conservar solo los primeros n elementos de un flujo. No es adecuado para el procesamiento paralelo.
La ejecución da el siguiente resultado:
5.6.5. [count]
// el número de personas
affiche("count", personnes.stream().count());
El método [count] tiene la siguiente firma:
![]() |
El método [count] devuelve el número de elementos de un Stream. La ejecución en paralelo del método no aporta ninguna mejora en el rendimiento, tal y como muestra el siguiente código (Ejemplo06d1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;
public class Exemple06d1 {
public static void main(String[] args) {
final long limite = 10_000_000L;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de nombres
long début = new Date().getTime();
List<Long> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(i);
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// recuento de números - método secuencial
Stream<Long> sNombres = nombres.stream();
début = new Date().getTime();
long count = sNombres.count();
System.out.printf("comptage séquentiel : compteur=%s, durée (ms)=%s%n", count, new Date().getTime() - début);
}
}
- líneas 11-22: se crea un Stream con 10 millones de números;
- líneas 22-24: recuento del Stream;
La ejecución da el siguiente resultado:
Si sustituimos la línea 22 del código por la siguiente (Ejemplo06d2):
Stream<Long> sNombres = nombres.stream().parallel();
se obtienen los siguientes resultados:
5.6.6. [max, min]
// la persona de más edad
affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());
El método [max] tiene la siguiente firma:
![]() |
El método [max] devuelve el valor máximo de un flujo utilizando el comparador que se le pasa como parámetro. Comparator es una interfaz funcional cuyo único método a implementar tiene la siguiente firma: int compara (T o1, T o2). Este método debe devolver -1 si o1 < o2, 0 si o1.equals(o2), +1 si o1 > o2. La interfaz funcional Comparator tiene numerosos métodos estáticos por defecto que implementan la interfaz Comparator para los casos más habituales. Así, en la instrucción:
affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());
utilizamos el método estático [Comparator.comparingInt], cuya firma es la siguiente:
![]() |
El tipo ToIntFunction es una interfaz funcional:
![]() |
El método [applyAsInt] de la interfaz funcional ToIntFunction genera un tipo int a partir de un tipo T. Volvamos a nuestro código:
affiche("age max", personnes.stream().max(Comparator.comparingInt(Personne::getAge)).get());
El parámetro efectivo del método [Comparator.comparingInt] debe ser aquí un lambda Persona --> int. Pasamos la referencia del método [Personne.getAge], que tiene precisamente esta firma. Al final, obtendremos la persona de mayor edad. Se obtiene un tipo Optional<Personne> del que se extrae el valor con el método [Optional.get]. Se obtiene el siguiente resultado:
El cálculo de max en paralelo no aporta mejoras de rendimiento, como muestra el siguiente ejemplo: (Ejemplo06e1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;
public class Exemple06e1 {
public static void main(String[] args) {
// data
// final long límite = 100L;
// final boolean verbose = true;
final long limite = 10_000_000L;
final boolean verbose = false;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de nombres
long début = new Date().getTime();
List<Long> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(new Random().nextLong());
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// máximo de nombres - método secuencial
Stream<Long> sNombres = nombres.stream();
Comparator<Long> compLong = (l1, l2) -> {
if (verbose) {
// hilo
System.out.printf("[%s]", Thread.currentThread().getName());
}
// comparación
long v1 = l1.longValue();
long v2 = l2.longValue();
if (v1 < v2) {
return -1;
} else {
if (v1 == v2) {
return 0;
} else {
return +1;
}
}
};
début = new Date().getTime();
// long máx. = sNombres.max(Comparator.naturalOrder()).get();
long max = sNombres.max(compLong).get();
System.out.printf("%nmax séquentiel : max=%s, durée (ms)=%s%n", max, new Date().getTime() - début);
}
}
- línea 29: tenemos un flujo de limite números aleatorios de tipo Long;
- líneas 30-47: la variable lambda compLong implementa la interfaz Comparator<Long>. Esta interfaz normalmente se implementa mediante el método [Comparator.naturalOrder()] de la línea 49. Pero aquí queremos mostrar el hilo de ejecución (líneas 31-33). Por lo tanto, implementamos la interfaz nosotros mismos;
- línea 50: búsqueda de max;
Se obtienen los siguientes resultados:
![]() |
Si ahora sustituimos la línea 27 por la siguiente (Ejemplo06e2):
Stream<Long> sNombres = nombres.stream().parallel();
se obtienen los siguientes resultados:
![]() |
Por lo tanto, la ejecución en paralelo ha sido más lenta. Si pasamos a 10 millones de números con verbose=false, obtenemos los siguientes resultados:
para la ejecución secuencial:
para la ejecución en paralelo, que sigue siendo más lenta.
Se utiliza el método [Stream.min] de forma análoga:
// la persona más delgada
affiche("poids min", personnes.stream().min(Comparator.comparingDouble(Personne::getPoids)).get());
5.6.7. [reduce]
// la edad total de todas las personas
affiche("âge total (reduce)", personnes.stream().map(p -> p.getAge()).reduce(0, (a1, a2) -> a1 + a2));
El método [reduce] se presentó en el apartado 5.3. La línea 2 anterior suma las edades de todas las personas. El resultado es el siguiente:
5.6.8. [sorted]
// las personas por edad ascendente
affiche("personnes par âge croissant",
personnes.stream().sorted(Comparator.comparingInt(Personne::getAge)).collect(Collectors.toList()));
// las personas por orden alfabético de los nombres
List<Personne> lPersonnes=personnes.stream().sorted((p1, p2) -> p1.getNom().compareTo(p2.getNom())).collect(Collectors.toList());
affiche("personnes par ordre alphabétique des noms", lPersonnes);
El método [sorted] (líneas 3 y 5) tiene la siguiente firma:
![]() |
El método [sorted] admite como parámetro el tipo [Comparator] descrito en el apartado 5.6.6 para los métodos min y max. Permite ordenar un Stream según el orden del comparador que se le pasa como parámetro. Hemos visto que la interfaz [Comparator] ofrecía varios métodos estáticos por defecto que implementaban los comparadores habituales, en particular de números y cadenas de caracteres. Aquí utilizamos el método [Comparator.comparingInt], que admite como parámetro un tipo ToIntFunction, que es una interfaz funcional del método [applyAsInt] con la siguiente firma: int applyAsInt(T t). Aquí, el parámetro efectivo pasado al método [Comparator.comparingInt] en la línea 3 es la referencia del método [Personne.age], que devuelve la edad de la persona.
La interfaz [Comparator] no ofrece métodos estáticos para comparar cadenas de caracteres. En la línea 5, creamos nosotros mismos una lambda que implementa el único método de esta interfaz: int compare(T t1, T t2)
(p1, p2) -> p1.getNom().compareTo(p2.getNom())
Esta lambda compara los nombres de las personas. Los resultados obtenidos son los siguientes:
La ejecución en paralelo de la ordenación no parece posible, como muestra el siguiente código (Ejemplo06f1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Exemple06f1 {
// mappeur jSON
static ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// data
final long limite = 100L;
final boolean verbose = true;
// final long límite = 10_000_000L;
// final boolean verbose = false;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de nombres
long début = new Date().getTime();
List<Integer> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(new Random().nextInt(1000));
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// ordenación de los números - método secuencial
Stream<Integer> sNombres = nombres.stream();
début = new Date().getTime();
Comparator<Integer> compInt = (i1, i2) -> {
if (verbose) {
// hilo
System.out.printf("[%s]", Thread.currentThread().getName());
}
// comparación
int v1 = i1.intValue();
int v2 = i2.intValue();
if (v1 < v2) {
return +1;
} else {
if (v1 == v2) {
return 0;
} else {
return -1;
}
}
};
if (verbose) {
affiche("nombres", sNombres.sorted(compInt).collect(Collectors.toList()));
}
System.out.printf("tri séquentiel : durée (ms)=%s%n", new Date().getTime() - début);
}
public static <T> void affiche(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- líneas 30-36: se construye un flujo de limite nombres aleatorios;
- línea 32: se pasa la lambda compInt (líneas 38-55) al método [sorted]. Esta lambda ordena los números en orden descendente y muestra el subproceso que la ejecuta.
Los resultados obtenidos son los siguientes:
![]() |
Si en el código anterior sustituimos la línea 36 por la siguiente (Ejemplo06f2):
Stream<Integer> sNombres = nombres.stream().parallel();
se obtienen los siguientes resultados:
![]() |
Descubrimos que, sorprendentemente, la ordenación del flujo de números se ha realizado con un único hilo. No ha habido ningún paralelismo. ¿O es que hay algo que se me escapa?
5.6.9. [anyMatch, noneMatch, allMatch]
// ¿Hay personas de más de 100 años?
affiche("des personnes de + de 100 ans (anyMatch)", personnes.stream().anyMatch(p -> p.getAge() > 100));
// ¿Todas las personas tienen 100 años o menos?
affiche("des personnes de + de 100 ans (noneMatch)", personnes.stream().noneMatch(p -> p.getAge() > 100));
// ¿Todas las personas tienen más de 8 años?
affiche("des personnes de + de 8 ans (allMatch)", personnes.stream().allMatch(p -> p.getAge() > 8));
En las líneas 2, 4 y 6, los métodos [anyMatch, noneMatch, allMatch] tienen como parámetro un tipo Predicate descrito en el apartado 4.2. Por lo tanto, realizan un filtrado. Los tres devuelven un valor booleano:
- anyMatch devuelve true si existe al menos un elemento de Stream que cumpla el filtro;
- noneMatch devuelve true si no existe ningún elemento de Stream que cumpla el filtro;
- allMatch devuelve true si todos los elementos de Stream cumplen el filtro;
Los resultados obtenidos son los siguientes:
5.6.10. [collect(Collectors.groupingBy)]
// ¿Se agrupan las personas por sexo?
affiche("personnes regroupées par sexe", personnes.stream().collect(Collectors.groupingBy(p -> p.getSexe())));
El método [collect] se presentó en el apartado 5.6.3. Su parámetro es una implementación de la interfaz [Collector]. La clase [Collectors] ofrece una serie de métodos estáticos que implementan la interfaz [Collector]. Hasta ahora hemos utilizado el método [Collectors.toList()]. Aquí utilizamos el método estático [Collectors.groupingBy], que crea un diccionario a partir de Stream. Su firma es la siguiente:
![]() |
El método [groupingBy] crea, a partir de un tipo Stream<T>, un tipo Map<K,List<T>>. La clave K es proporcionada por el parámetro del método [groupingBy] de tipo Function<T,K>, cuyo único método tiene la firma: K apply(T t). Si queremos crear un diccionario indexado por el sexo de las personas, debemos proporcionar una función que genere el sexo a partir de una persona. Aquí, pasamos como parámetro efectivo del método [groupingBy] la referencia del método [Personne.getSexe]. Los resultados obtenidos son los siguientes:
En la línea 2, tenemos la cadena jSON de un diccionario indexado por dos claves: HOMME y FEMME.
El cálculo paralelo no aporta mejoras de rendimiento, como muestra el siguiente ejemplo (Ejemplo06g1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class Exemple06g1 {
// peor jSON
static ObjectMapper jsonMapper = new ObjectMapper();
public static void main(String[] args) throws JsonProcessingException {
// data
final long limite = 100L;
final boolean verbose = true;
// final long límite = 10_000_000L;
// final boolean verbose = false;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de nombres
long début = new Date().getTime();
List<Integer> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(new Random().nextInt(1000));
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// agrupación de números por centenas - método secuencial
Stream<Integer> sNombres = nombres.stream();
Function<Integer, Integer> groupByCent = n -> {
if (verbose) {
System.out.printf("[%s]", Thread.currentThread().getName());
}
return n / 100;
};
début = new Date().getTime();
// Map<Integer, List<Integer>> lNombres = sNombres.collect(Collectors.groupingBy(número -> número / 100));
Map<Integer, List<Integer>> lNombres = sNombres.collect(Collectors.groupingBy(groupByCent));
System.out.printf("%nregroupement séquentiel : durée (ms)=%s%n", new Date().getTime() - début);
// resultados
if (verbose) {
affiche("nombres regroupés", lNombres);
}
}
public static <T> void affiche(String message, T value) throws JsonProcessingException {
System.out.println(String.format("%s ----", message));
System.out.println(jsonMapper.writeValueAsString(value));
}
}
- líneas 23-38: construcción de un flujo de limite números;
- línea 47, los números se agrupan por centenas. Se utiliza la función lambda de las líneas 39-44 para poder mostrar el hilo de ejecución;
Los resultados de la ejecución son los siguientes:
![]() |
Si en el código se sustituye la línea 38 por la siguiente (Ejemplo06g2):
Stream<Integer> sNombres = nombres.stream().parallel();
se obtienen los siguientes resultados:
![]() |
Se observa que la ejecución en paralelo de la agrupación ha reducido el rendimiento.
5.6.11. [distinct]
// eliminación de elementos duplicados de una lista
affiche("distinct", Stream.of(1, 2, 1).distinct().collect(Collectors.toList()));
El método [distinct] tiene la siguiente firma:
![]() |
Permite eliminar los duplicados de un flujo. El método [Stream.of] (línea 2) tiene la siguiente firma:
![]() |
Permite crear un Stream a partir de valores proporcionados explícitamente. Los resultados de la ejecución son los siguientes:
5.6.12. [flatMap]
// de un Stream<Stream<T>>, se crea un Stream<T>
affiche("flatMap", Stream.of(1, 2, 3).flatMap(i -> Stream.of(i, i + 10)).collect(Collectors.toList()));
El método [flatMap] tiene la siguiente firma:
![]() ![]() |
El método [flatMap] admite como parámetro una función que:
- admite como parámetro un elemento de tipo T de Stream;
- devuelve como resultado un flujo Stream<R>;
Si en lugar del método [flatMap], se hubiera utilizado el método [map] descrito en el apartado 5.5, el resultado sería un tipo Stream<Stream<R>> en el que cada elemento de tipo T del flujo inicial habría dado lugar a un elemento Stream<R>. El método [flatMap], por su parte, devuelve un tipo Stream<R>. Este método aplana (flatten) los diferentes flujos Stream<R> en un único flujo. Esto es lo que muestran los resultados de la ejecución del código anterior:
Existen variantes especializadas de [flatMap]:
// de un Stream<IntStream>, se crea un IntStream del que se calcula la suma
affiche("flatMapToInt", Stream.of(1, 2, 3).flatMapToInt(i -> IntStream.of(i, i + 10)).sum());
El método [flatMapToInt] tiene la siguiente firma:
![]() |
El método [flatMapToInt] admite como parámetro una función que devuelve como resultado un tipo IntStream como se indica a continuación:
![]() |
IntStream es un flujo de int. Este tipo es preferible al tipo Stream<Integer>, ya que su procesamiento evita el boxing/unboxing entre los tipos Integer y int. Esta interfaz retoma numerosos métodos del tipo Stream<T> y añade otros, entre ellos el método [sum] mencionado anteriormente, que suma los elementos del IntStream.
El siguiente código ilustra el uso del método análogo [flatMapToDouble]:
// de un Stream<DoubleStream>, se crea un DoubleStream y luego una matriz
affiche("flatMapToDouble", Stream.of(1, 2, 3).flatMapToDouble(i -> DoubleStream.of(i, i * 1.2)).toArray());
El método [DoubleStream.toArray] permite pasar de un tipo DoubleStream a un tipo double[].
Los resultados, para estos dos ejemplos, son los siguientes:
El siguiente ejemplo muestra las mejoras de rendimiento obtenidas al pasar de un tipo Stream<Long> a un tipo LongStream (Ejemplo06i1):
package dvp.java8.streams;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
public class Exemple06i1 {
public static void main(String[] args) {
final long limite = 10_000_000L;
// número de procesadores
System.out.printf("La JVM a détecté [%s] coeurs sur votre machine%n", Runtime.getRuntime().availableProcessors());
// lista de números
long début = new Date().getTime();
List<Long> nombres = new ArrayList<>();
for (long i = 0; i < limite; i++) {
nombres.add(i);
}
System.out.printf("création de la liste des %s nombres en %s ms%n", limite, new Date().getTime() - début);
// suma de los números - método secuencial
début = new Date().getTime();
long somme = nombres.stream().reduce(0L, (s, i) -> s + i);
System.out.printf("somme séquentielle du Stream<Integer> : somme=%s, durée (ms)=%s%n", somme, new Date().getTime() - début);
}
}
- línea 22: cálculo de la suma de un flujo de números de tipo Long;
Se obtienen los siguientes resultados:
Ahora, sustituyamos la línea 22 por la siguiente (Ejemplo06i2):
long somme = nombres.stream().mapToLong(n -> n.longValue()).sum();
El método Stream<Integer>.mapToLong nos permite obtener un flujo de tipo LongStream de elementos de tipo primitivo long, que luego sumamos con la función sum. De este modo, obtenemos los siguientes resultados:
La ganancia de rendimiento es net.
5.6.13. Métodos de flujo de números primitivos
// máximo de un flujo de int
affiche("IntStream max", IntStream.of(1, 10, 8).max());
// mínimo de un flujo de dobles
affiche("DoubleStream min", DoubleStream.of(1.5, 10.4, 8.9).min());
// media de un flujo de int
affiche("IntStream average", IntStream.of(1, 10, 8).average().getAsDouble());
// estadísticas de un flujo de int
affiche("IntStream summaryStatistics", IntStream.of(1, 10, 8).summaryStatistics());
Los flujos de valores primitivos (int, long, double) ofrecen métodos adaptados a estos tipos. El resultado de la ejecución del código anterior es el siguiente:
- el resultado de la línea 2 del código es un tipo OptionalInt análogo al tipo Optional<Integer>. El valor almacenado en este objeto se puede obtener con el método [getAsInt()]. La presencia de un valor se puede comprobar con el método [isPresent()]. La línea 2 de los resultados no significa que la clase [OptionalInt] tenga campos llamados [asInt, present]. Por defecto, la biblioteca jSON utiliza todos los métodos públicos getX y isY del objeto que se va a serializar en jSON. Y aquí hay un método [getAsInt] y otro método [isPresent] sin que existan los campos [asInt, present];
- el resultado de la línea 4 del código es un tipo OptionalDouble análogo al tipo Optional<Double>;
- el resultado de la línea 6 del código es un tipo OptionalDouble cuyo valor se puede obtener con el método [getAsDouble()]. El método [average] calcula la media del flujo de números;
- el resultado de la línea 8 del código es un tipo IntSummaryStatistics definido de la siguiente manera:
![]() |
Se observa que el objeto IntSummaryStatistics obtenido proporciona diversa información sobre el flujo de números, como el número de valores, la suma, el máximo, el mínimo y la media.




































