Streams: Futures en Secuencia
Hasta ahora en este capítulo, hemos estado trabajando principalmente con futuros
individuales. La única gran excepción fue el receiver de canal asíncrono que
utilizamos. Recuerda cómo usamos el receiver para nuestro canal asíncrono en el
“Paso de Mensajes” al principio del capítulo. El método
recv asíncrono produce una secuencia de elementos a lo largo del tiempo. Esta
es una instancia de un patrón mucho más general, a menudo llamado stream.
Una secuencia de elementos es algo que ya hemos visto antes, cuando miramos el
trait Iterator en el capítulo 13. Sin embargo, hay dos diferencias entre los
iteradores y el receptor de canal asíncrono. La primera es el elemento del
tiempo: los iteradores son sincrónicos, mientras que el receptor de canal es
asíncrono. La segunda es la API. Cuando trabajamos directamente con un
Iterator, llamamos a su método sincrónico next. Con el stream
trpl::Receiver, en particular, llamamos a un método asíncrono recv
en su lugar. Estas APIs, de otro modo, se sienten muy similares.
Un stream es similar a una forma asíncrona de iteración. Mientras que el
trpl::Receiver espera específicamente recibir mensajes, la API de stream
general es mucho más general: proporciona el siguiente elemento de la misma
manera que Iterator, pero de forma asíncrona. La similitud entre los
iteradores y los streams en Rust significa que en realidad podemos crear un
stream a partir de cualquier iterador. Al igual que con un iterador, podemos
trabajar con un stream llamando a su método next y luego esperando la
salida, como en el listado 17-30.
extern crate trpl; // required for mdbook test
fn main() {
trpl::run(async {
let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let iter = values.iter().map(|n| n * 2);
let mut stream = trpl::stream_from_iter(iter);
while let Some(value) = stream.next().await {
println!("The value was: {value}");
}
});
}
Comenzamos con un array de números, que convertimos en un iterador y luego
llamamos a map para duplicar todos los valores. Luego convertimos el
iterador en un stream usando la función trpl::stream_from_iter. Luego
recorremos los elementos en el stream a medida que llegan con el bucle while let.
Desafortunadamente, cuando intentamos ejecutar el código, no compila. En su
lugar, como podemos ver en la salida, informa que no hay un método next
disponible.
error[E0599]: no method named `next` found for struct `Iter` in the current scope
--> src/main.rs:10:40
|
10 | while let Some(value) = stream.next().await {
| ^^^^
|
= note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
= note: consider using `--verbose` to print the full type name to the console
= help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
|
1 + use crate::trpl::StreamExt;
|
1 + use futures_util::stream::stream::StreamExt;
|
1 + use std::iter::Iterator;
|
1 + use std::str::pattern::Searcher;
|
help: there is a method `try_next` with a similar name
|
10 | while let Some(value) = stream.try_next().await {
| ~~~~~~~~
Como sugiere la salida, la razón del error del compilador es que necesitamos el
método correcto en el ámbito para poder usar el método next. Dada nuestra
discusión hasta ahora, podrías esperar razonablemente que sea Stream, pero el
trait que necesitamos aquí es en realidad StreamExt. El Ext allí es por
“extensión”: este es un patrón común en la comunidad de Rust para extender un
trait con otro.
¿Por qué necesitamos StreamExt en lugar de Stream, y qué hace el trait
Stream en sí? Brevemente, la respuesta es que en todo el ecosistema de Rust,
el trait Stream define una interfaz de bajo nivel que combina
efectivamente los traits Iterator y Future. El trait StreamExt
suministra un conjunto de APIs de nivel superior sobre Stream, incluyendo
el método next así como otros métodos de utilidad similares a los
proporcionados por el trait Iterator. Volveremos a los traits
Stream y StreamExt con un poco más de detalle al final del capítulo.
Por ahora, esto es suficiente para dejarnos seguir avanzando.
La solución al error del compilador es agregar una declaración use para
trpl::StreamExt, como en el listado 17-31.
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
Con todas esas piezas juntas, este código funciona como queremos. ¡Lo que es
más, ahora que tenemos StreamExt en el ámbito, podemos usar todos sus
métodos de utilidad, al igual que con los iteradores! Por ejemplo, en el
listado 17-32, usamos el método filter para filtrar todo menos los
múltiplos de tres y cinco.
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::run(async { let values = 1..101; let iter = values.map(|n| n * 2); let stream = trpl::stream_from_iter(iter); let mut filtered = stream.filter(|value| value % 3 == 0 || value % 5 == 0); while let Some(value) = filtered.next().await { println!("The value was: {value}"); } }); }
Por supuesto, esto no es muy interesante. Podríamos hacer eso con iteradores normales y sin nada asíncrono. Así que veamos algunas de las otras cosas que podemos hacer que son únicas para los streams.
Componiendo Streams
Muchos conceptos se representan naturalmente como streams: elementos que se vuelven disponibles en una cola, o trabajar con más datos de los que pueden caber en la memoria de una computadora al extraer (pull) solo fragmentos (chunks) de ellos del sistema de archivos a la vez, o datos que llegan a través de la red a lo largo del tiempo. Debido a que los streams son futuros, también podemos usarlos con cualquier otro tipo de futuro, y podemos combinarlos de maneras interesantes. Por ejemplo, podemos agrupar eventos para evitar activar demasiadas llamadas de red, establecer tiempos de espera en secuencias de operaciones de larga duración, o restringir eventos de la interfaz de usuario para evitar hacer trabajo innecesario.
Empecemos creando un pequeño stream de mensajes, como un sustituto
para un stream de datos que podríamos ver desde un WebSocket u otro
protocolo de comunicación en tiempo real. En el listado 17-33, creamos
una función get_messages que devuelve impl Stream<Item = String>. Para
su implementación, creamos un canal asíncrono, recorremos las primeras
diez letras del alfabeto inglés y las enviamos a través del canal.
Nosotros también usamos un nuevo tipo: ReceiverStream, que convierte
el receptor rx del trpl::channel en un Stream con un método next.
De vuelta en main, usamos un bucle while let para imprimir todos los
mensajes del stream.
extern crate trpl; // required for mdbook test use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = get_messages(); while let Some(message) = messages.next().await { println!("{message}"); } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
Cuando ejecutamos este código, obtenemos exactamente los resultados que esperábamos:
Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'
Podríamos hacer esto con la API Receiver regular, o incluso la API
Iterator regular, sin embargo. Agreguemos algo que requiera streams:
agregando un tiempo de espera que se aplique a cada elemento en el stream, y
una demora en los elementos que emitimos.
En el listado 17-34, comenzamos agregando un tiempo de espera al stream
con el método timeout, que proviene del trait StreamExt. Luego
actualizamos el cuerpo del bucle while let, porque el stream ahora
devuelve un Result. La variante Ok indica que un mensaje llegó a
tiempo; la variante Err indica que el tiempo de espera se agotó
antes de que llegara algún mensaje. Hacemos un match en ese
resultado y ya sea imprimimos el mensaje cuando lo recibimos
exitosamente, o imprimimos un aviso sobre el tiempo de espera. Finalmente,
ten en cuenta que fijamos los mensajes después de aplicar el tiempo
de espera a ellos, porque el helper de tiempo de espera produce un
stream que necesita ser fijado para ser sondeado.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for message in messages { tx.send(format!("Message: '{message}'")).unwrap(); } ReceiverStream::new(rx) }
Comenzamos agregando un tiempo de espera al stream con el método timeout, que
proviene del trait StreamExt. Luego actualizamos el cuerpo del ciclo
while let, porque ahora el stream devuelve un Result. La variante Ok
indica que llegó un mensaje a tiempo; la variante Err indica que el tiempo de
espera se agotó antes de que llegara algún mensaje. Hacemos un match sobre ese
resultado y o bien imprimimos el mensaje cuando lo recibimos con éxito o
mostramos un aviso sobre el tiempo de espera. Finalmente, observa que fijamos
(pin) los mensajes después de aplicarles el timeout, porque el helper de timeout
produce un stream que debe ser fijado para poder ser polleado.
Sin embargo, como no hay demoras entre los mensajes, este tiempo de espera
no cambia el comportamiento del programa. Agreguemos una demora variable
a los mensajes que enviamos. En get_messages, usamos el método
enumerate del iterador con el array messages para que podamos
obtener el índice de cada elemento que estamos enviando junto con el
elemento en sí. Luego aplicamos una demora de 100 milisegundos
a los elementos de índice par y una demora de 300 milisegundos a los
elementos de índice impar, para simular las diferentes demoras que
podríamos ver de un stream de mensajes en el mundo real. Debido a que
nuestro tiempo de espera es de 200 milisegundos, esto debería afectar
la mitad de los mensajes.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) }
En get_messages, usamos el método iterador enumerate con el arreglo
messages para poder obtener el índice de cada elemento que enviamos junto con
el elemento en sí. Luego aplicamos un retraso de 100 milisegundos a los
elementos con índice par y un retraso de 300 milisegundos a los elementos con
índice impar para simular los diferentes retrasos que podríamos ver en una
secuencia de mensajes en el mundo real. Como nuestro tiempo de espera es de 200
milisegundos, esto debería afectar a la mitad de los mensajes.
Para usar sleep entre mensajes en la función get_messages sin bloquear,
necesitamos usar async. Sin embargo, no podemos hacer de get_messages una
función asíncrona, porque entonces devolveríamos un Future<Output = Stream<Item = String>>
en lugar de un Stream<Item = String>>. El llamador tendría que esperar a
get_messages para obtener acceso al stream. Pero recuerda:
todo en un futuro dado sucede de manera lineal; la concurrencia ocurre
entre futuros. Esperar a get_messages requeriría que enviara todos
los mensajes, incluyendo las esperas (sleep) entre el envío de
cada mensaje, antes de devolver el stream receptor. Como resultado,
el tiempo de espera terminaría siendo inútil. No habría demoras
en el stream en sí: todas las demoras ocurrirían antes de que el
stream estuviera disponible.
En su lugar, dejamos get_messages como una función regular que devuelve un
stream, y generamos una tarea para manejar las llamadas asíncronas
sleep.
Nota: llamar a
spawn_taskde esta manera funciona porque ya configuramos nuestro runtime. Llamar a esta implementación particular despawn_tasksin configurar primero un runtime causará un pánico. Otras implementaciones eligen diferentes compensaciones: pueden generar un nuevo runtime y así evitar el pánico, pero terminar con un poco de sobrecarga adicional, o simplemente no proporcionar una forma independiente de generar tareas sin referencia a un runtime. Debes asegurarte de saber qué compensación ha elegido tu runtime y escribir tu código en consecuencia.
Ahora nuestro código tiene un resultado mucho más interesante. Entre
cada par de mensajes, vemos un error reportado: Problem: Elapsed(()).
Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'
El tiempo de espera no previene que los mensajes lleguen al final: aún obtenemos todos los mensajes originales. Esto se debe a que nuestro canal es ilimitado: puede contener tantos mensajes como podamos ajustar en memoria. Si el mensaje no llega antes de que se agote el tiempo, nuestro manejador de stream lo tendrá en cuenta, pero cuando vuelva a sondear el stream, el mensaje puede haber llegado ahora.
Puedes obtener un comportamiento diferente si es necesario usando otros tipos de canales, o otros tipos de streams más generalmente. Veamos uno de esos en la práctica en nuestro ejemplo final para esta sección, combinando un stream de intervalos de tiempo con este stream de mensajes.
Combinando Streams
Primero, creemos otro stream, que emitirá un elemento cada milisegundo si
lo dejamos ejecutarse directamente. Para simplificar, podemos usar la
función sleep para enviar un mensaje con un retraso, y combinarlo
con el mismo enfoque de crear un stream a partir de un canal que
usamos en get_messages. La diferencia es que esta vez, vamos a
enviar de vuelta el conteo de intervalos que ha transcurrido, así
que el tipo de retorno será impl Stream<Item = u32>, y podemos
llamar a la función get_intervals.
En el listado 17-36, comenzamos definiendo un count en la tarea. (También
podríamos definirlo fuera de la tarea, pero es más claro limitar el
alcance de cualquier variable dada). Luego creamos un bucle
infinito. Cada iteración del bucle duerme (sleep) de forma
asíncrona durante un milisegundo, incrementa el conteo y luego lo
envía a través del canal. Debido a que todo esto está envuelto
en la tarea creada por spawn_task, todo se limpiará junto con
el runtime, incluyendo el bucle infinito.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let mut messages = pin!(get_messages().timeout(Duration::from_millis(200))); while let Some(result) = messages.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
Este tipo de bucle infinito, que solo termina cuando todo el runtime se desmonta, es bastante común en Rust asíncrono: muchos programas necesitan seguir ejecutándose indefinidamente. Con async, esto no bloquea nada más, siempre que haya al menos un punto de espera (await) en cad a iteración a través del bucle.
En el bloque async de la función principal, comenzamos llamando a
get_intervals. Luego combinamos los streams messages e
intervals con el método merge, que combina múltiples streams
en un solo stream que produce elementos de cualquiera de los
streams de origen tan pronto como los elementos están disponibles,
sin imponer ningún orden particular. Finalmente, recorremos
ese stream combinado en lugar de sobre messages (listado 17-37).
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals();
let merged = messages.merge(intervals);
while let Some(result) = merged.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
Comenzamos llamando a get_intervals. Luego combinamos los streams messages e
intervals con el método merge, que une múltiples streams en un solo stream
que produce elementos de cualquiera de los streams de origen tan pronto como los
elementos estén disponibles, sin imponer un orden particular. Finalmente,
iteramos sobre ese stream combinado en lugar de hacerlo sobre messages.
En este punto, ni messages ni intervals necesitan ser fijados o
mutables, porque ambos se combinarán en el único stream merged.
Sin embargo, esta llamada a merge no compila. (Tampoco lo hace la
llamada next en el bucle while let, pero volveremos a eso después de
arreglar esto). Los dos streams tienen diferentes tipos. El stream
messages tiene el tipo Timeout<impl Stream<Item = String>>,
donde Timeout es el tipo que implementa Stream para una
llamada a timeout. Mientras tanto, el stream intervals
tiene el tipo impl Stream<Item = u32>. Para combinar estos
dos streams, necesitamos transformar uno de ellos para que
coincida con el otro.
En el listado 17-38, volvemos a trabajar el stream intervals, porque
messages ya está en el formato básico que queremos y tiene que manejar errores
de tiempo de espera. Primero, podemos usar el método auxiliar map para
transformar los intervals en una cadena. En segundo lugar, necesitamos hacer
coincidir el Timeout de messages. Sin embargo, como en realidad no
queremos un tiempo de espera para intervals, podemos simplemente crear un
tiempo de espera que sea más largo que los otros tiempos de
espera que estamos usando. Aquí, creamos un tiempo de espera de 10 segundos con
Duration::from_secs(10). Finalmente, necesitamos hacer stream mutable, para
que las llamadas next del bucle while let puedan iterar a través del stream,
y fijarlo para que sea seguro hacerlo.
extern crate trpl; // required for mdbook test
use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};
fn main() {
trpl::run(async {
let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals()
.map(|count| format!("Interval: {count}"))
.timeout(Duration::from_secs(10));
let merged = messages.merge(intervals);
let mut stream = pin!(merged);
while let Some(result) = stream.next().await {
match result {
Ok(message) => println!("{message}"),
Err(reason) => eprintln!("Problem: {reason:?}"),
}
}
})
}
fn get_messages() -> impl Stream<Item = String> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
for (index, message) in messages.into_iter().enumerate() {
let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
trpl::sleep(Duration::from_millis(time_to_sleep)).await;
tx.send(format!("Message: '{message}'")).unwrap();
}
});
ReceiverStream::new(rx)
}
fn get_intervals() -> impl Stream<Item = u32> {
let (tx, rx) = trpl::channel();
trpl::spawn_task(async move {
let mut count = 0;
loop {
trpl::sleep(Duration::from_millis(1)).await;
count += 1;
tx.send(count).unwrap();
}
});
ReceiverStream::new(rx)
}
Esto nos lleva casi a donde necesitamos estar. Todo se verifica el tipo. Sin embargo, si ejecutas esto, habrá dos problemas. Primero, nunca se detendrá. ¡Tendrás que detenerlo con ctrl-c! En segundo lugar, los mensajes del alfabeto inglés estarán enterrados en medio de todos los mensajes del contador de intervalos:
--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--
En el listado 17-39 se muestra una forma de resolver estos últimos dos
problemas. Primero, usamos el método throttle en el stream intervals, para
que no abrume al stream messages. La limitación (throttling) es una forma de
limitar la tasa a la que una función será llamada—o, en este caso, con qué
frecuencia se sondeará el stream. Una vez cada cien milisegundos debería
bastante, porque eso está en el mismo rango de tiempo que la frecuencia con la
que llegan nuestros mensajes.
Para limitar el número de elementos que aceptaremos de un stream, podemos
usar el método take. Lo aplicamos al stream combinado, porque
queremos limitar la salida final, no solo un stream u otro.
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval: {count}")) .throttle(Duration::from_millis(100)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(message) => println!("{message}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }) } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; tx.send(format!("Message: '{message}'")).unwrap(); } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; tx.send(count).unwrap(); } }); ReceiverStream::new(rx) }
Ahora, cuando ejecutamos el programa, se detiene después de extraer veinte
elementos del stream, y los intervalos no abruman a los mensajes. También
no obtenemos Interval: 100 o Interval: 200 o así, sino que
en su lugar obtenemos Interval: 1, Interval: 2, y así sucesivamente —
¡incluso cuando tenemos un stream de origen que puede producir un evento
cada milisegundo! Eso se debe a que la llamada throttle produce un nuevo
stream, envolviendo el stream original, de modo que el stream original
solo se sondea a la tasa de limitación, no a su propia tasa “nativa”.
No tenemos un montón de mensajes de intervalo no manejados que
estamos eligiendo ignorar. En su lugar, ¡nunca producimos esos mensajes de
intervalo en primer lugar! Esta es la “pereza” inherente de los futuros
de Rust en acción nuevamente, lo que nos permite elegir nuestras
características de rendimiento.
Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12
Hay una última cosa que necesitamos manejar: ¡errores! Con ambos
streams basados en canales, las llamadas send podrían fallar
cuando el otro lado del canal se cierra—y eso es solo una cuestión de
cómo el runtime ejecuta los futuros que componen el stream. Hasta
ahora hemos ignorado esto llamando a unwrap, pero en una
aplicación bien comportada, deberíamos manejar explícitamente el
error, al mínimo terminando el bucle para que no intentemos enviar
más mensajes. El listado 17-40 muestra una estrategia de error
simple: imprime el problema y luego break de los bucles. Como
siempre, la forma correcta de manejar un error de envío de
mensaje variará—¡solo asegúrate de tener una estrategia!
extern crate trpl; // required for mdbook test use std::{pin::pin, time::Duration}; use trpl::{ReceiverStream, Stream, StreamExt}; fn main() { trpl::run(async { let messages = get_messages().timeout(Duration::from_millis(200)); let intervals = get_intervals() .map(|count| format!("Interval #{count}")) .throttle(Duration::from_millis(500)) .timeout(Duration::from_secs(10)); let merged = messages.merge(intervals).take(20); let mut stream = pin!(merged); while let Some(result) = stream.next().await { match result { Ok(item) => println!("{item}"), Err(reason) => eprintln!("Problem: {reason:?}"), } } }); } fn get_messages() -> impl Stream<Item = String> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]; for (index, message) in messages.into_iter().enumerate() { let time_to_sleep = if index % 2 == 0 { 100 } else { 300 }; trpl::sleep(Duration::from_millis(time_to_sleep)).await; if let Err(send_error) = tx.send(format!("Message: '{message}'")) { eprintln!("Cannot send message '{message}': {send_error}"); break; } } }); ReceiverStream::new(rx) } fn get_intervals() -> impl Stream<Item = u32> { let (tx, rx) = trpl::channel(); trpl::spawn_task(async move { let mut count = 0; loop { trpl::sleep(Duration::from_millis(1)).await; count += 1; if let Err(send_error) = tx.send(count) { eprintln!("Could not send interval {count}: {send_error}"); break; }; } }); ReceiverStream::new(rx) }
Ahora que hemos visto un montón de async en la práctica, echemos un
paso atrás y profundicemos en algunos de los detalles de cómo
Future, Stream, y los otros traits clave que Rust usa para
hacer que async funcione.