Reactividad en Servicios Web Java

Este artículo pretende ser un punto de partida para el desarrollo de Servicios Web Java utilizando técnicas de programación reactiva. Existen muchas publicaciones respecto a Reactividad en la Web, por lo que no entraremos en detalle sobre la teoría, aunque sí veremos un poco de contexto y resumen de las características más importantes para tenerlas en mente.

Contexto

La programación reactiva es un paradigma de programación que lleva tiempo entre nosotros. Se conceptuó en 2010 y en 2014 se publicó el “Reactive Manifesto” que sentaba las bases para el desarrollo de aplicaciones siguiendo el modelo reactivo. Surgió como consecuencia de la demanda de tiempos de respuesta más rápidos y alta disponibilidad de los sistemas, características logradas con modelos previos de Microservicios, pero dando solución a los problemas de uso excesivo de CPU, bloqueos en operaciones de entrada y salida o sobre uso de memoria (debido a grandes Threads Pools) de los que adolecían éstos modelos.

Sistemas Reactivos

Siguiendo los principios del Reactive Manifesto, los sistemas reactivos deben ser:
  • Responsivos: Deben responder en tiempo y de forma adecuada, gestionando los errores de manera apropiada. Responsabilidad es la base de la utilizada y usabilidad de sistemas.
  • Resilientes: Existen tolerancia y recuperación ante fallos. Se mantiene la responsividad incluso cuando el sistema falla.
  • Eslásticos: Existe la capacidad de expansión según la demanda. La responsividad se mantiene incluso con el aumento de carga de trabajo.
  • Orientados a Mensajes: La comunicación entre componentes se basa en el intercambio de mensajes asíncronos para evitar el acoplamiento y mejorar el aislamiento.

Programación Reactiva

La programación reactiva se centra en el trabajo de flujos asíncronos de orígenes de datos finitos o infinitos. No es algo nuevo, ya existen los Buses de Eventos, eventos generados por clicks, etc. La programación reactiva va más allá, habilitando el trabajo de “streams” o flujos de eventos. Es posible crear un flujo de cualquier tipo de eventos con los que ir alimentando un sistema. El flujo en sí, podrá ser filtrado, transformado, combinado y delimitado. El portal https://rxmarbles.com/ nos muestra diagramas de las posibles transformaciones de un flujo. Los eventos se emiten a través del flujo de forma ordenada en el tiempo y tienen como resultado un valor, un error o simplemente una señal de completado. Los servicios se pueden suscribir a dichos eventos, siempre de forma asíncrona, definiendo funciones que se ejecutarán (reaccionarán) ante los tres posibles respuestas de un evento. En RxJava, los streams se representan mediante objetos Observable. Los objetos Observer representan los suscriptores a los eventos emitidos por los Observables:
Observable.just("Reactividad", "Aplicaciones Web", "Java")
    .map(String::toUpperCase)
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            log.debug("Subscribed!");
        }

        @Override
        public void onNext(String s) {
            log.debug("Recived: {}", s);
        }

        @Override
        public void onError(Throwable e) {
            log.debug("Error: {}", e.getMessage());
        }

        @Override
        public void onComplete() {
            log.debug("Completed!");
        }
    });
RxMarble
Output
Subscribed!
Recived: REACTIVIDAD
Recived: APLICACIONES WEB
Recived: JAVA
Completed!

Frameworks Java Reactivos

Existen varios frameworks que implementan los Streams Reactivos como modelo de programación. Los más conocidos para desarrollar bajo Java, a día de publicación de este artículo, son:

Ejemplo: API REST Reactivo (SpringBoot + RxJava2)

Vamos a implementar un prototipo de API de servicios Rest para una aplicación web con los principios de un sistema reactivo utilizando Spring Boot y RxJava2.

Estructura

Partiendo desde cero, utilizamos el sistema de prototipado rápido de aplicaciones Spring Boot, Spring Initializr. Solamente vamos a necesitar las dependencias de Web Starter, para el soporte Web, y Lombok, para agilizar el código:
Spring Initializr
Spring tiene sus propias librerías para el desarrollo de aplicaciones reactivas (WebFlux y CloudStream), pero utilizamos RxJava2 por motivos académicos. Agregamos la dependencia de RxJava2 a nuestro proyecto. Si utilizamos Gradle, tendremos el fichero build.gradle similar al siguiente:
plugins {
    id 'org.springframework.boot' version '2.1.7.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

group = 'com.poc.ractive'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'

configurations {
    compileOnly {
        extendsFrom annotationProcessor
    }

    // Undertow instead of Tomcat
    compile.exclude group: 'org.springframework.boot', module: 'spring-boot-starter-tomcat'
}

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation('org.springframework.boot:spring-boot-starter-undertow')
    compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.11'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
Con RxJava podemos utilizar servidores de aplicaciones como Tomcat, Jetty o Undertow. En este caso utilizaremos Undertow por simple preferencia del autor. Ya tenemos la base de nuestro proyecto. Vamos a exponer un servicio REST que nos devuelva información de “Cosas”, siendo nuestro modelo el siguiente:
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
@EqualsAndHashCode(of = "id")
public class Thing implements Serializable {

    private String id;
    private String value;

}
Las anotaciones de Lombok nos permiten ahorrarnos todo el código repetitivo. El siguiente paso es crear el Servicio que gestione nuestra lógica de negocio sobre el modelo. Añadiremos un Mapa de objetos “Thing” para tener instancias de pruebas:
@Service
@Slf4j
public class ThingsService implements InitializingBean {

    /**
     * Map of Thing mocks
     */
    private Map<String, Thing> thingsMap = new HashMap<>();

    @Override
    public void afterPropertiesSet() {
        // Populates the Map with mocks at bean intialization
        IntStream.range(1, 10).forEach(
                index -> thingsMap.put("thing" + index, Thing.builder()
                        .id("thing" + index)
                        .value("value" + index)
                        .build()));
    }
}
Nos falta el exponer el API Rest para que nuestro servicio pueda ser utilizado por sistemas externos. Primero definimos el tipo de respuesta que va a dar el API:
@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ThingResponse implements Serializable {

    private Thing thing;
    private String message;

}
Y por último creamos el controlador que expone el Servicio Rest:
@RestController
@Slf4j
public class ThingsController {

    /**
     * Service for managing {@link Thing} logic
     */
    private ThingsService thingsService;

    /**
     * Custom constructor for dependency injection
     *
     * @param thingsService Service for managing {@link Thing}
     */
    public ThingsController(@Autowired ThingsService thingsService) {
        this.thingsService = thingsService;
    }

    /**
     * Gets the {@link Thing} related to the given Identifier
     *
     * @param id Entity identifier
     * @return {@link ThingResponse} with http 200 (Ok) if the wanted {@link Thing} instance exists or Http 400
     * (Bad Request) if not.
     */
    @GetMapping(value = "/api/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
    public Single<ResponseEntity<ThingResponse>> getThingById(@PathVariable String id) {
        String transactionId = UUID.randomUUID().toString();
        log.debug("{}: Request received for id: '{}'", transactionId, id);

        // TODO
        
        return null;
    }

    /**
     * Builds the Response based on the given {@link Thing} instance.
     *
     * @param thing {@link Thing} instance. Only instances with Id are valid.
     * @return {@link ThingResponse} with http 200 (Ok) if the wanted {@link Thing} instance exists or Http 400
     * (Bad Request) if not.
     */
    private static ResponseEntity<ThingResponse> mapThingResponse(Thing thing) {
        if (thing.getId() != null) {
            ThingResponse tResponse = ThingResponse.builder().thing(thing).message("OK").build();
            return new ResponseEntity<>(tResponse, HttpStatus.OK);
        } else {
            ThingResponse tResponse = ThingResponse.builder().message(thing.getValue()).build();
            return new ResponseEntity<>(tResponse, HttpStatus.BAD_REQUEST);
        }
    }
La estructura del controlador es la típica en aplicaciones Spring Boot. Utilizamos la anotación @RestController para indicar que vamos a exponer servicios Rest e inyectamos la instancia del Servicio que realizará la lógica de Negocio directamente en el Constructor. La anotación @Autowired en este caso es opcional, pero la utilizamos por claridad del código. Para seguir las buenas prácticas, se generará un identificador único de transacción en cada petición para poder tracear las operaciones. Hemos creado una función “getThingById” que será invocada mediante un GET a nuestro API y devolverá la respuesta en formato Json. La clase Single<T> es la versión reactiva de la llamada a un método. Equivale a un Observable que emite un solo evento o error y es idónea para éste tipo de aplicaciones. Por ahora nuestro método devuelve nada, tenemos que completar el “TODO” con la llamada al Servicio. Para ello, vamos a comentar diferentes escenarios y cómo resolverlos:

Operaciones simples

Queremos obtener la información de una Cosa sin realizar operaciones colaterales. Agregamos el método “findById” al Servicio “ThingService“:
/**
 * Gets a {@link Thing} instance by its identifier
 *
 * @param id Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> findById(final String transactionId, final String id) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating findById...", transactionId);
        if (thingsMap.containsKey(id)) {
            emitter.onSuccess(thingsMap.get(id));
        } else {
            emitter.onError(new IllegalArgumentException("Thing with id '" + id + "' not found"));
        }
    });
}
En nuestro método, creamos un Reactive Stream de un solo evento mediante “Single.create(…)” y nos suscribimos al flujo. El método “Single.create(…)” acepta una instancia del interfaz funcional “SingleOnSubscribe<T>” con el siguiente descriptor:
/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of a {@link SingleEmitter} instance that allows pushing
 * an event in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface SingleOnSubscribe<T> {

    /**
     * Called for each SingleObserver that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull SingleEmitter<T> emitter) throws Exception;
}
Los métodos “onSucces” y “onError” nos permiten indicar si la operación se ha realizado correctamente. Actualizamos nuestro Controlador para invocar el Servicio y mostrar algunas trazas que nos ayuden a entender el funcionamiento interno:
@GetMapping(value = "/api/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
public Single<ResponseEntity<ThingResponse>> getThingById(@PathVariable String id) {
    String transactionId = UUID.randomUUID().toString();
    log.debug("{}: Request received for id: '{}'", transactionId, id);
    Single<Thing> response = thingsService.findById(transactionId, id);
    log.debug("{}: Request for id: '{}' processed.", transactionId, id);

    return response
            .doOnDispose(() -> log.debug("{}: Disposing", transactionId))
            .doOnSubscribe(disposable -> log.debug("{}: Subscribed", transactionId))
            .doOnSuccess(thing -> log.debug("{}: Success", transactionId))
            .doOnTerminate(() -> log.debug("{}: Terminate", transactionId))
            .subscribeOn(Schedulers.io())
            .onErrorReturn(throwable -> Thing.builder().value(throwable.getMessage()).build())
            .map(ThingsController::mapThingResponse);
}
Utilizamos el método “subscribeOn” para indicar el Scheduler (pool de Threads) donde se van a suscribir nuestras operaciones. Existen varios Schedudlers facilitados por el framework de RxJava2. Ojeando la documentación vemos que el más indicado para nuestro tipo de aplicación es “Schedulers.io” ya que provee un pool de threads reutilizable para propósitos de IO. Iniciamos el servidor y lanzamos la petición. Solicitamos la información del objeto que responde al identificador “thing5“, que existe en el mapa de objetos de prueba:
$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"}
Hasta aquí todo correcto, veamos que muestran las trazas:
DEBUG [  XNIO-1 task-3] ThingsController: 2afb5b25776e: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-3] ThingsController: 2afb5b25776e: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Subscribed
DEBUG [readScheduler-1] ThingsService   : 2afb5b25776e: Evaluating findById...
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Success
DEBUG [readScheduler-1] ThingsController: 2afb5b25776e: Terminate 
Los servidores de aplicaciones tienen su propio thread pool para las peticiones que llegan, XNIO-1 para el Undertow que utilizamos. Según llega la petición, Undertow crea un nuevo hilo “XNIO-1 task-3” en el que delega la petición. Vemos que se entra en el método Rest, se crea el Single, y ahí termina la responsabilidad del hilo de Undertow, que vuelve a estar disponible en el pool para nuevas peticiones. Paralelamente, Schedulers.io crea un nuevo hilo, “readScheduler-1”, que ejecuta toda la lógica de negocio. Vemos como el suscriptor secuencialmente se suscribe al flujo de eventos, ejecuta la lógica y devuelve el resultado. El siguiente diagrama de secuencia muestra todo el proceso:
Lo que muestra el diagrama es que el Controlador no queda bloqueado en ningún momento, permitiendo aceptar peticiones que se van ejecutando en paralelo. Igualmente el hilo generado por la petición al servidor queda liberado y disponible para nuevas solicitudes en el pool del Servidor Web. Estas características aprovechan toda la capacidad de procesamiento y memoria de la máquina optimizando recursos y permitiendo una escalabilidad horizontal (responsibidad y elasticidad de sistemas). La siguiente petición muestra el comportamiento del suscriptor al flujo de eventos cuando la petición devuelve un error:
$> curl -X GET http://localhost:8080/api/unknownThing
{"thing":null,"message":"Thing with id 'unknownThing' not found"}
Output
DEBUG [  XNIO-1 task-5] ThingsController: 786622a7da22: Request received for id: 'unknownThing'
DEBUG [  XNIO-1 task-5] ThingsController: 786622a7da22: Request for id: 'unknownThing' processed.
DEBUG [readScheduler-1] ThingsController: 786622a7da22: Subscribed
DEBUG [readScheduler-1] ThingsService   : 786622a7da22: Evaluating findById...
DEBUG [readScheduler-1] ThingsController: 786622a7da22: Terminate

Operaciones combinadas I

Cuando las peticiones a nuestro API requieren operaciones un poco más complejas donde se invocan varios métodos o incluso varios servicios, podemos combinar las operaciones. Por ejemplo, siguiendo el ejemplo, cada vez que se invoque el API, además de devolver el objeto “Cosa” requerido, podría ser necesario realizar operaciones “doCoolOperationsA”, “doCoolOperationsB” y, además, loguear en un histórico la petición:
/**
 * Performs iterative operations where each operation doesn't depends on the previous one.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateById(final String transactionId, final String id) {
    return this.findById(transactionId, id)
            .flatMap(thing -> doCoolOperationsA(transactionId, thing))
            .flatMap(thing -> doCoolOperationsB(transactionId, thing))
            .flatMap(thing -> logById(transactionId, thing));
}
/**
 * Performs a logging of the given Thing's identifier
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> logById(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating logById...", transactionId);
        log.debug("{}: Logging Id -> {}", transactionId, thing.getId());
        emitter.onSuccess(thing);
    });
}
/**
 * Performs cool operations with the given {@link Thing} instance
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> doCoolOperationsA(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating doCoolOperationsA...", transactionId);
        emitter.onSuccess(thing);
    });
}
/**
 * Performs cool operations with the given {@link Thing} instance
 *
 * @param transactionId Current Transaction Identifier
 * @param thing         Current {@link Thing} instance
 * @return {@link Thing} instance
 */
private Single<Thing> doCoolOperationsB(final String transactionId, Thing thing) {
    return Single.create(emitter -> {
        log.debug("{}: Evaluating doCoolOperationsB...", transactionId);
        emitter.onSuccess(thing);
    });
}
Modificamos el controlador para invocar el método “operateById” del Servicio “ThingService” y lanzamos la petición REST:
$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"}
Output
DEBUG [  XNIO-1 task-1] ThingsController: 575923ae3691: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-1] ThingsController: 575923ae3691: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Subscribed
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating findById...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating doCoolOperationsA...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating doCoolOperationsB...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Evaluating logById...
DEBUG [readScheduler-1] ThingsService   : 575923ae3691: Logging Id -> thing5
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Success
DEBUG [readScheduler-1] ThingsController: 575923ae3691: Terminate
Utilizando ‘flatMap’, al igual que ocurre con los Streams de Java8, combinamos las operaciones en un solo hilo. Cada operación se irá invocando de manera secuencial hasta completar la lógica. RxJava provee operadores para poder componer nuestros Flujos de tal manera que solo necesitemos suscribirnos a un Thread para ejecutar la lógica correctamente. En el código anterior, podríamos generar una suscripción por cada llamada a un método diferente, obligando a ejecutar cada bloque de código en un hilo diferente, pero no se considera una buena práctica ya que podríamos caer en el llamado “Callback Hell”.

Operaciones combinadas II

El ejemplo anterior muestra como combinar operaciones de tal manera que sean independientes unas de otras, cada una ejecuta su lógica sin tener en cuenta el resultado de la anterior. Si necesitamos que las acciones ejecutadas utilicen el resultado de pasos anteriores, podemos combinar los métodos de la siguiente manera:
/**
 * Performs iterative operations where each operation depends on the result of previous one.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateByIdDependent(final String transactionId, final String id) {
    return this.findById(transactionId, id)
            .flatMap(thing -> doCoolOperationsA(transactionId, thing)
                    .flatMap(next -> doCoolOperationsB(transactionId, next)
                            .flatMap(moreNext -> logById(transactionId, moreNext))));
}

Operaciones en paralelo

Si las operaciones a realizar ante la respuesta a un servicio pueden dividirse en tareas independientes, podemos suscribirnos a varios threads para que se ejecuten en paralelo (nested subscriptions). Como se ha comentado antes, no es una práctica muy recomendada, principalmente en éste tipo de aplicaciones web, ya que complica la trazabilidad y debugeo, y puede derivar en el “Callback Hell”. Sin embargo, si nuestras operaciones son costosas, puede estar justificado. Modificamos nuestro código para utilizar varios hilos en la ejecución de la tarea principal:
/**
 * Performs concurrent operations using different threads and merging the result in a single response
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateByIdMergingOps(final String transactionId, final String id) {
    Thing thing = this.findById(transactionId, id).blockingGet();
    Single<Thing> operationA = doCoolOperationsA(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationB = doCoolOperationsB(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationC = logById(transactionId, thing).subscribeOn(Schedulers.io());

    return operationA.mergeWith(operationB).mergeWith(operationC).lastOrError();
}
Modificamos el controlador para invocar el método ‘operateByIdMergingOps del Servicio “ThingService” y lanzamos la petición REST:
$> curl -X GET http://localhost:8080/api/thing5
{"thing":{"id":"thing5","value":"value5"},"message":"OK"} 
Output
DEBUG [  XNIO-1 task-1] ThingsController: 04eed3276fbe: Request received for id: 'thing5'
DEBUG [  XNIO-1 task-1] ThingsService   : 04eed3276fbe: Evaluating findById...
DEBUG [  XNIO-1 task-1] ThingsController: 04eed3276fbe: Request for id: 'thing5' processed.
DEBUG [readScheduler-1] ThingsController: 04eed3276fbe: Subscribed
DEBUG [readScheduler-2] ThingsService   : 04eed3276fbe: Evaluating doCoolOperationsA...
DEBUG [readScheduler-3] ThingsService   : 04eed3276fbe: Evaluating doCoolOperationsB...
DEBUG [readScheduler-4] ThingsService   : 04eed3276fbe: Evaluating logById...
DEBUG [readScheduler-4] ThingsService   : 04eed3276fbe: Logging Id -> thing5
DEBUG [readScheduler-3] ThingsController: 04eed3276fbe: Terminate
Comprobamos que cada operación se ha suscrito a un hilo diferente (readScheduler-1, readScheduler-2 y readScheduler-3) y se ha ejecutado en paralelo, siendo el último hilo en ejecutarse el que nos da devuelto la respuesta a la invocación del servicio.

Operaciones en paralelo II

En ocasiones, es posible que se necesite combinar los diferentes resultados de operaciones ejecutadas en paralelo. Para ello, disponemos del operador Zip, que utiliza una una función que recibe los valores como resultado de mezclar dos reactive streams para generar un valor nuevo. Es importante tener en cuenta que al ser operaciones asíncronas no es posible predecir el orden con el que se combinan los resultados:
/**
 * Performs concurrent operations using different threads and merging the result in a single response. The
 * result is the combination of all the results of the executed threads.
 *
 * @param transactionId Current Transaction Identifier
 * @param id            Thing's identifier
 * @return {@link Thing} instance if exists or IllegalArgumentException if not.
 */
public Single<Thing> operateByIdMergingZip(final String transactionId, final String id) {
    Thing thing = this.findById(transactionId, id).blockingGet();
    Single<Thing> operationA = doCoolOperationsA(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationB = doCoolOperationsB(transactionId, thing).subscribeOn(Schedulers.io());
    Single<Thing> operationC = logById(transactionId, thing).subscribeOn(Schedulers.io());

    return operationA
            .zipWith(operationB, ThingsService::mergeThings)
            .zipWith(operationC, ThingsService::mergeThings);
}

/**
 * Merges two {@link Thing} instances in a new one
 *
 * @param thing1 {@link Thing} in a 'A' state
 * @param thing2 {@link Thing} in a 'B' state
 * @return {@link Thing} in a 'A+B' state
 */
private static Thing mergeThings(Thing thing1, Thing thing2) {
    return new Thing(thing1.getId(), thing1.getValue() + ", " + thing2.getValue());
}

Backpressure

La posibilidad de trabajar con eventos infinitos en nuestros streams reactivos da lugar a escenarios donde los Observables (emisores de eventos) generan eventos mucho más rápido de lo que pueden ser consumidos por nuestros Observers (consumidores). Esta circunstancia es llamada Backpressure y es un escenario a tener en cuenta a la hora de implementar nuestro API reactivo. Existen varios métodos para gestionar backpressure. Por ejemplo, existen estrategias conservadoras que implementan buffers donde los elementos emitidos se van acumulando hasta que un consumidor los procesa y estrategias más agresivas donde los elementos no consumidos a tiempo se descartan o se sobrescriben por los más recientes. La elección de la estrategia correcta dependerá de nuestro sistema.

Sumario

En éste artículo hemos visto los principios de la programación reactiva y los motivos por los que es interesante tenerla en cuenta a la hora de diseñar o mejorar nuestras arquitecturas. Se ha mostrado un ejemplo de API reactiva con Spring Boot y RxJava2, exponiendo varias situaciones más o menos generales y como afrontarlas en código y, por último, hemos hablado de una característica inherente al uso de streams, el backpressure, a tener muy en cuenta.
Avatar

Ingeniero en Informática de Sistemas por la Universidad Politécnica de Madrid, vive en el mundo del desarrollo de aplicaciones desde que cayó en sus manos un Spectrum. Especializado en Backend Java, sus días transcurren entre frameworks, arquitecturas y microservicios. Desconecta practicando fotografía y deportes de montaña (rutas, Snowboarding... lo que proceda según la época). Actualmente, trabaja como Designer en Future Space.