Cambiando el chip hacia event-driven

Ni falta hace que escriba sobre las ventajas que proporciona el paradigma event-driven en el que los componentes de software reaccionan a eventos a los que se han suscrito previamente. El productor está desacoplado del consumidor y se pueden añadir consumidores conforme se necesitan nuevos tratamientos del evento.

Además, el dominio de los eventos va a crecer y dotar de más significado al código: Se puede tener un evento UserToPurchaseInsurance y un evento InsurancePurchased que semánticamente es más significativo que tener un objeto Insurance con un malvado campo status.

descarga

Event-driven sin broker de mensajería parece un poco engorro. Sin embargo, Spring facilita mucho el trabajo ya que despliega una infraestructura de publicación y suscripción a eventos para los beans registrados en el contexto. Si se da el caso de trabajar en un monolito sin broker de mensajería, los eventos de aplicación de Spring pueden ser un buen inicio para refactorizar algunas partes del código a una arquitectura orientada a eventos.

Este código incluiría 4 dependencias:

  • al proveedor de seguros,
  • al servicio que envía emails,
  • al repositorio y
  • al servicio que roba el coche asegurado:
if (insuranceProvider.purchase(insurance)) {
    sendEmail(insurance);
    persist(insurance);
    stealCar(insurance);
}

Podría convertirse en este otro código que tendría como dependencias el proveedor de seguros y el publicador de eventos. Como se indicó anteriormente, desacoplando productor de los N consumidores.

if (insuranceProvider.purchase(insurance)) {
    publishEvent(InsurancePurchasedEvent.from(insurance));
}

Los puntos de entrada de los consumidores en este caso serían tres listeners:

@Component
public class InsurancePurchasedEmailEventListener {
    @EventListener
    public void sendAnEmail(InsurancePurchased insurancePurchased) {}
}
@Component
public class InsurancePurchasedPersistenceEventListener {
    @EventListener
    public void persistInsurance(InsurancePurchased insurancePurchased) {}
}
@Component
public class InsurancePurchasedThiefEventListener {
    @EventListener
    public void stealTheCar(InsurancePurchased insurancePurchased) {}
}

Esta entrada surge tras haber leído el artículo Spring Events de Baeldung en el que explican exactamente cómo publicar el evento y recuerdan que por defecto los eventos son recogidos por los listeners en el mismo hilo, por lo que la ejecución no continuará hasta que se procese el evento por todos los listeners. Este comportamiento se puede alterar definiendo un bean ApplicationEventMulticaster.

Anuncios

Introducción a Spring Batch. Toma 2

Fuente: spring.io

 

Esta es la segunda parte de Introducción a Spring Batch. Toma 1 donde se explicaron algunos conceptos básicos sobre jobs de Spring Batch y se describió lo mínimo necesario para usar Spring Batch en una aplicación. ¡Continuamos!

 

5. Transaccionalidad

Un step se puede declarar como tasklet-oriented o chunk-oriented:

  • Step tasklet-oriented: Podemos definir una tarea simple con el método tasklet() que se caracteriza porque se crea una transacción por cada elemento. En caso de fallo de escritura, la transacción se revierte.
  • Step chunk-oriented: Un chunk es un conjunto de elementos que se leen y se procesan por separado pero se persisten juntos. El número de elementos por chunk se indica en la definición del step con el método chunk(). Con este tipo de steps, se crea una transacción por cada chunk.

Nos podría interesar elegir procesamiento chunk-oriented para maximizar el uso de la RAM o beneficiarnos de la transaccionalidad (por ejemplo, haciendo rollback de la escritura de 10 elementos que deberían ir juntos). Por otra parte, también nos podría interesar escribir un conjunto de elementos a la vez (p. ej, enviar a un endpoint que recibe un array de elementos).

Como siempre, con Spring Boot no es necesario declarar el transactionManager ya que es inyectado automáticamente por el framework.

En el siguiente commit, se ha declarado un ItemReader que lee un archivo CSV y un ItemWriter que muestra cada objeto Persona. También se ha cambiado la definición del step para establecer un chunk de 2 elementos.

Las implementaciones de FlatFileItemReader, DelimitedLineTokenizer, DefaultLineMapper y FieldSetMapper ofrecidas por Spring nos ayudan a leer un CSV en unas pocas líneas.

 

6. Tolerancia a fallos

Los componentes ItemReader, ItemProcessor e ItemWriter lanzarán excepciones en caso de error. Podemos evitar que ante una excepción rompa la aplicación con el método skipLimit() indicando el número de excepciones a saltar. Además los tipos de excepciones tienen que definirse usando el método skip() (tantas veces como sea necesario).

Para trabajar con los elementos saltados (que han provocado una excepción) se puede implementar SkipListener. Un ejemplo sería imprimir la entrada errónea o guardarla en otro CSV.

En el tercer commit, se ha introducido un error de lectura “-“ en el CSV y un error de escritura (lanzando la excepción IllegalArgumentException). Se ha incluido el método faultTolerant() en el step con un límite de 2 excepciones FlatFileParseException, FlatFileFormatException e IllegalArgumentException.

Como el límite es 2, el job se ejecutará correctamente saltando los 2 errores introducidos. Como ejercicio, podríamos añadir otro error al archivo CSV y ver cómo falla el job.

 

7. Reintentos

Es posible reintentar una acción que ha fallado previamente (tanto la lectura como el procesamiento o la escritura). En el cuarto commit se ha añadido una llamada al método retryLimit(1) para reintentar las excepciones del tipo IllegalArgumentException.

Es conveniente utilizar un límite para los reintentos ya que es posible que la misma excepción continúe sucediendo cada vez que se ejecute el mismo proceso.

 

8. Logs y estadísticas

El método StepBuilderFactory#listener() recibe cualquier tipo de listener como por ejemplo ChunkListener, StepListener, RetryListener… Todos estos pueden utilizarse para registrar el progreso (escribir logs) de un step. Como apoyo para escribir estos logs, podemos utilizar el contexto del step o del job para guardar timestamps o cualquier otro tipo de información.

Centrándonos en registrar el progreso de un job, definiendo JobBuilderFactory podemos usar el método .listener() pasando como argumento una instancia de JobExecutionListener.

La clase StepExecution tiene un método getSummary() que devuelve cierta información tras la ejecución del step, como por ejemplo:

El campo status devuelve un valor del enumerado BatchStatus y puede ser mapeado al código de finalización de la aplicación.

El campo exitStatus devuelve un valor del enumerado ExitStatus y representa el estado de finalización del step. Este valor no siempre es equivalente al valor del campo status. Además es posible personalizarlo para proporcionar más información y con ello, por ejemplo, definir un flujo condicional en el que el inicio de un step dependa del valor del campo exitStatus del step anterior.

En cualquier caso, Spring Batch permite definir flujos de steps donde un step se ejecuta  de manera condicional teniendo en cuenta el estado de finalización del step anterior.

En este commit se han definido unos cuantos listeners para registrar cada acción ocurrida durante el procesamiento batch. Además se ha añadido un atributo “timing” al contexto del chunk para medir el tiempo transcurrido durante el procesamiento de cada chunk.

 

9. Parando y re-arrancando un proceso batch

Spring Batch nos proporciona el bean JobOperator con los siguientes métodos:

  • getRunningExecutions(“jobName”) que devuelve una lista de ids de los jobs que se están ejecutando.
  • stop() que para la ejecución del step en cuanto se termina de ejecutar el código escrito en el mismo.
  • restart() que continúa la ejecución a partir del siguiente step.

 

10. Administración web

El proyecto Spring Cloud Data Flow proporciona una interfaz web y una CLI para administrar los jobs y streams que proceden de los proyectos Spring Cloud Task y Spring Cloud Stream. El objetivo del proyecto Spring Cloud Task es el de integrar los jobs de Spring Batch como microservicios en la nube.

Para convertir un proyecto en Spring Batch a uno de Spring Cloud Task únicamente es necesario añadir la dependencia y la anotación @EnableTask.

 

Aquí podemos encontrar los pasos necesarios para ejecutar un job dentro del Spring Cloud Data Flow:

Lo más común es registrar la aplicación con la URI de Maven (maven://) para que Spring Cloud Data Flow se descargue el artefacto del repositorio. Otros tipos de aplicaciones que se pueden registrar son sinks y sources.

Podemos consultar las estadísticas de los jobs accediendo a: http://localhost:9393/dashboard/index.html#/jobs/executions/1

Y las estadísticas de los steps en: http://localhost:9393/dashboard/index.html#/jobs/executions/1/1

Sin duda merece la pena darle una oportunidad a Spring Cloud Data Flow, tiene cosas muy chulas como por ejemplo, una herramienta visual para pintar el flujo de datos entre los distintos microservicios.

Es posible definir el directorio en el que se almacenarán los logs de la aplicación. No obstante, aunque no especifiquemos ninguna ruta, en el log de Spring Cloud Data Flow podremos distinguir una primera línea que comienza con “Logs will be in ” indicando la carpeta temporal elegida por Spring para este propósito.

En este commit podemos ver lo fácil que es convertir un proyecto Spring Cloud Data Flow:

 

11. Particionamiento

Existen diferentes estrategias para escalar un proceso batch ya sea de manera multihilo o multiproceso.

El particionamiento de un step es una de ellas y permite ejecutar steps en máquinas remotas o en hilos locales. Consiste en definir un step maestro (master step) que delegará el trabajo particionado en steps esclavos (slave steps). Para usar el particionamiento de steps es necesario definir tanto la estrategia de particionado como los esclavos.

El step maestro utilizará una implementación de Partitioner para escribir en el contexto de ejecución toda la información que necesitará cada esclavo para procesar su partición de datos.

En este último commit se ha añadido un segundo archivo CSV y se ha definido un bean CustomMultiResourcePartitioner que utilizará un step maestro para escribir el nombre del archivo CSV en cada contexto. Cada esclavo recibe un contexto diferente por parte del maestro.

Los dos esclavos se ejecutan en hilos diferentes porque se ha declarado un taskExecutor con el DSL del StepBuilderFactory.

Además, se ha utilizado la anotación @StepScope para recuperar el nombre del archivo guardado en el contexto usando el Spring Expression Language.

 

12. Extra:

Spring Batch proporciona múltiples implementaciones para facilitar el desarrollo de procesos batch. Aquí puedes encontrar una lista de ellas:

View at Medium.com

Esta segunda toma de la introducción a Spring Batch es una traducción del artículo publicado en el blog de Fintonic Product Team.

Introducción a Spring Batch. Toma 1

1. ¿Qué es un proceso batch?

Un proceso batch es una tarea automática para procesar un gran de volumen de datos. Este proceso se puede ejecutar sin interacción humana y repetirse periódicamente.

Qué no es un proceso batch: no es una tarea programada (un cron). Es bastante común programar un proceso batch, pero no es necesario hacerlo.

2. ¿Qué debemos tener en cuenta con un proceso batch?

  • Transaccionalidad porque queremos hacer roll back cuando los datos han sido invalidados.
  • Tolerancia a fallos porque no queremos que la aplicación termine cuando ocurra una excepción.
  • Reintentos porque… sometimes shit happens.
  • Logs y estadísticas porque de vez en cundo necesitamos saber qué ocurre dentro del proceso batch.
  • Parada y arranque de los procesos batch.
  • Administración web porque mola.
  • Particionamiento porque el trabajo puede ser compartido entre diferentes máquinas.

Cada uno de estos aspectos será comentado más tarde. He preparado unos cuantos commits en un repo para repasar todos estos puntos mostrando las diferencias en el código.

3. Conceptos de Spring Batch

Spring Batch es un proyecto que nos proporciona un framework para desarrollar aplicaciones batch. Este proyecto tiene un largo recorrido y es el resultado de la contribución de varias empresas con mucha experiencia en procesamiento batch.

Jobs, Steps, ItemReader, ItemProcessor and ItemWriter:

Fuente: Spring Batch Reference Documentation https://docs.spring.io/spring-batch/trunk/reference/html/domain.html

En Spring Batch se ejecutan jobs que se dividen en steps. Cada step tiene un componente para obtener los objetos (ItemReader), otro para procesarlos (ItemProcessor) y uno más para persistirlos (ItemWriter). El componente de procesamiento de datos es opcional.

Normalmente necesitaremos una instancia de JobBuilderFactory para declarar el Job y un StepBuilderFactory  para declarar el Step. No hay problema, Spring nos proporciona ambas.

JobExecution y JobLauncher

Un job puede ejecutarse por una instancia de JobLauncher. Durante su ejecución la información es almacenada y compartida en el JobExecutionContext y el StepExecutionContext.

El JobLauncher devuelve un JobExecution que nos da información sobre la ejecución, como por ejemplo el resultado de la misma: COMPLETED, FAILED…

JobInstance, JobParameters y RunIdIncrementer

JobInstance es la combinación de un Job y sus JobParameters. Una de las reglas de Spring Batch es que no se puede volver a ejecutar un Job si su JobExecution tiene estado COMPLETED. Sin embargo, se puede utilizar un RunIdIncrementer para ejecutar el mismo job varias veces ya que éste modifica internamente sus parámetros.

JobRepository

Spring Batch gestiona por sí sólo una base de datos con información sobre la ejecución de los jobs instanciando un bean de JobRepository. Para ello sólo necesitamos declarar la dependencia de H2 en el entorno de desarrollo.

4. Requisitos mínimos para comprobar que todo esto funciona

  • Dependencia de Spring Boot Starter Batch (aquí es donde reside toda la magia).
  • Dependencia del driver de la base de datos.
  • Anotación @EnableBatchProcessing en la clase de configuración de Spring.
  • Bean que define del job.
  • Bean que define el step con una tarea que escriba un mensaje “Reading…”. Cuando esa tarea devuelva un null indicará el fin de la fuente de datos y por tanto que el job ha terminado.
@Bean
public Job job(Step step1) throws Exception {
    return jobBuilderFactory.get("job1")
        .incrementer(new RunIdIncrementer())
        .start(step1)
        .build();
}

@Bean
public Step step1() {
    return stepBuilderFactory.get("step1")
        .tasklet(new Tasklet() {
            public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
                System.out.println("Reading...");
                return null;
            }
        })
        .build();
}

Puedes clonar el repo que he preparado para este post y ver el código añadido en el primer commit para ejecutar Spring Batch con lo mínimo necesario.

Este artículo es una traducción del que escribí hace unos meses en el blog de Fintonic Engineering.

Los enums siguen molando

Escribí hace tiempo una entrada en la que hacía público mi amor hacia los enums. A día de hoy creo que no los uso lo suficiente y voy poco a poco aprendiendo cosas nuevas. Con todo esto, no me ha quedado otra que escribir una segunda parte: Los enums siguen molando.

Una característica por la que los enums son conocidos y se suelen usar es la imposibilidad de invocar al constructor (ni con reflection) por lo que son buenos candidatos para utilizarlos como singletons.

En Effective Java, el autor define los métodos constant-specific methods como las implementaciones de un método abstracto en el enum. Esto permite asociar comportamiento a los elementos que se definen. Y obliga a definir el comportamiento por cada elemento nuevo.

public enum PiecesToClean {
    LIVING_ROOM {
        @Override public void clean() {
            System.out.println("Cleaning the living room");
        }
    },
    KITCHEN {
        @Override public void clean() {
            System.out.println("Cleaning the kitchen");
        }
    },
    BATH_ROOM {
        @Override public void clean() {
            System.out.println("Cleaning the bath room");
        }
    };

    public abstract void clean();
}

Por otra parte, Joshua Bloch propone el strategy enum pattern en el que los diferentes comportamientos de los enums son definidos en el constructor de cada elemento. De esta forma, el compilador nos obliga a definir el comportamiento para cada constante pero además contamos con la flexibilidad del patrón strategy a la hora de escribir diferentes comportamientos reutilizables para cada enum.

public enum PiecesToClean {
    LIVING_ROOM(CleaningStrategy.SHALLOW),
    KITCHEN(CleaningStrategy.DEEP),
    BATH_ROOM(CleaningStrategy.DEEP);

    private final CleaningStrategy cleaningStrategy;

    PiecesToClean(CleaningStrategy cleaningStrategy) {
        this.cleaningStrategy = cleaningStrategy;
    }

    public void clean() {
        cleaningStrategy.clean();
    }
}
public enum CleaningStrategy {
    SHALLOW() {
        @Override public void clean() {
            System.out.println("Cleaning fast");
        }
    },
    DEEP() {
        @Override public void clean() {
            System.out.println("Cleaning with love");
        }
    };

    public abstract void clean();
}

En el mismo capítulo también se propone lanzar una excepción para el caso por defecto en los bloques switch sobre el enum ya que el autor pone en evidencia el peligro de los enums en bloques switch teniendo en cuenta que se puede añadir un elemento nuevo pero olvidar añadir en los bloques switch (diseminados por todo el código) el caso para esa constante, por lo que aplicará el caso por defecto.

switch (piece) {
   case LIVING_ROOM:
      piece.clean(); break;
   case KITCHEN:
      piece.clean(); break;
   case BATH_ROOM:
      piece.clean(); break;
   default:
      throw new IllegalArgumentException("You should have defined this use case");
}

Mi compi Jorge me enseñó el otro día cómo utilizaba un enum para implementar un Predicate de Java 8. Cada elemento del enum es una regla de negocio y cada uno implementa test() para determinar si se cumple la regla.

public enum BusinessRule implements Predicate {
    RULE_1() {
        @Override public boolean test(DomainObjectUnderRules domainObjectUnderRules) {
            // logic to test rule 1
        }
    },
    RULE_2 {
        @Override public boolean test(DomainObjectUnderRules domainObjectUnderRules) {
            // logic to test rule 2
        }
    };
}

A parte de impresionarme esta forma de usar enums entendí que Java 8 está facilitando mucho el uso de los enumerados, sobre todo, en los casos en los que el enum recibe una lambda en el constructor.

Para terminar, en el libro se recuerda que en Java 7 se introdujo EnumMap que es una implementación de mapas especialmente desarrollada para enums, muy compacta y muy eficiente y cuyo uso no es muy extendido.

 

Speedment, un ORM para Java

Speedment es un ORM para Java que no es muy conocido pero debería serlo ya que es un firme candidato para plantar cara a los más conocidos ORMs como son Hibernate y jOOQ.

El objetivo principal de esta herramienta es aprovechar al máximo los streams de Java 8 para escribir código que extraiga información de la base de datos. Esto trae consigo muchas ventajas, la principal es que no hay necesidad de conocer una nueva API como la de JPA.

El uso es terriblemente sencillo: Lanzando un goal de maven ejecuta un entorno gráfico desde el que se configura el acceso a la base de datos y Speedment genera todas las clases Java a partir del esquema. A partir de este punto, sólo es necesario manipular los streams para obtener los datos dentro de los objetos que ha generado automáticamente Speedment.

mainscreen

La puesta en marcha es rapidísima y el uso de Java para escribir consultas a la base de datos ayuda a no perder horas depurando SQL erróneo y permite aprovechar la asistencia sobre el código Java del propio IDE.

En cuanto a buenas prácticas, algo que valoro mucho es evitar diseminar la lógica de negocio en diferentes lugares: El modelo de dominio, la base de datos y la capa de acceso a la base de datos y lo que va de la mano: Diseminar la lógica de negocio entre diferentes tecnologías (lo que generalmente requiere más de una persona para resolver un bug). Speedment como ORM ayuda a evitar estos males, permitiendo que la lógica de negocio esté en un sólo lugar y se manejen sólo con streams.

¿Y si no queremos tener el modelo de negocio inundado de dependencias en forma de anotaciones? ¿Y si no queremos tener un ORM que haga magia y nos cueste arreglarla 2 días cada vez que se rompe? ¿Y si jugamos a tener un modelo de acceso a base de datos bien diferenciado del modelo de negocio?

Yo también quiero opinar sobre si Java es lento

Mucha gente tiene la idea de que Java es lento, cosa que es lógico pensar teniendo en cuenta que la máquina virtual es una máquina encima de otra máquina (la física) y una capa intermedia seguro que supone mayor tiempo de ejecución.

También hay gente que se queja de que las aplicaciones en Java son más lentas y, sin embargo, no tienen el cuidado o el conocimiento necesario para desarrollar código Java que no sea redundante o que haga un uso óptimo de la memoria.

En la Java Magazine de este mes hay un artículo sobre cómo funciona el intérprete y el compilador JIT que compila en tiempo de ejecución Java bytecode a instrucciones del procesador en el que se están ejecutando. Ya sólo el hecho de usar un compilador JIT hace que la ejeucución de código Java sea más rápida que cualquier código ejecutado en un intérprete (como el que tiene Java).

Pero en el artículo remarcan que no sólo JIT compila a código máquina si no que se aplican muchas optimizaciones de bajo nivel y es esto lo que hace, en realidad, que el tiempo de ejecución de un programa Java pueda en ocasiones superar al tiempo de ejecución de un programa con compilación AOT. De hecho, comentan en el artículo que algunos métodos se pueden volver a compilar en diferentes fases de la aplicación para aplicar diferentes optimizaciones que dependen de la fase de ejecución.

Ahora me imagino la máquina virtual como un servicio de optimización gratuito. Compilando una aplicación C++, consigo código máquina pero no código máquina optimizado para la arquitectura en la que se ejecuta. Yo no sabría optimizarlo -desarrollo aplicaciones de propósito general-, sin embargo, el equipo de desarrollo del compilador JIT sí conoce las diferentes arquitecturas en donde se ejecuta la JVM y la hacen inteligente para aprovechar cada arquitectura al máximo.

Java Mission Control

A partir del JDK 7 update 40 está incluido Java Mission Control una aplicación para analizar métricas de rendimiento de aplicaciones que se ejecutan en la máquina virtual que es terriblemente fácil de usar.

Con esta aplicación probé lo que escribí en mi anterior artículo para ver hasta que punto me equivoqué. El objetivo es comprobar cuantos GCs ejecutan y la duración del conjunto en una ejecución con los parámetros -Xms1G -Xmx1G y en otra sin -Xms1G para comprobar si penaliza el redimensionamiento del heap.

El programa ejecutado consume de media por iteración (ejecuta 10 iteraciones) unos 640 MB. Es el tamaño que le paso por parámetro para que cree un array de 640 M de bytes ya que con un máximo de 1 GB para el heap un tamaño mayor que 640 MB provoca una OutOfMemoryException. El resto ocupado será memoria reservada (o virtual).

Para crear un archivo con las métricas y sólo el parámetro -Xmx1G ejecuto:

java -Xmx1G
-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder
-XX:StartFlightRecording=duration=10s,
filename=record-gcing1G_NoXms1G.jfr,
settings=settings.jfc
GCing 640

Xmx1g

Flight Recording es el módulo encargado de grabar las métricas que posteriormente se analizan con Java Mission Control. Se activa con los flags –XX:+UnlockCommercialFeatures -XX:+FlightRecorder y -XX:StartFlightRecording al que se incluyen parámetros como la duración de la grabación, el nombre del archivo que creará y el nombre de la plantilla de métricas que tiene que tomar.

Esta plantilla se crea y exporta con Java Mission Control en el menú Window y la opción Flight Recording Template Manager. Este módulo por defecto no recoge las métricas específicas del GC, para que lo haga hay que indicarle una plantilla creada con JMC en la que se haya seleccionado all para Event Options GC.

Para crear el segundo archivo de métricas con parámetros -Xms1G y -Xmx1G ejecuto:

java -Xms1G -Xmx1G
-XX:+UnlockCommercialFeatures
-XX:+FlightRecorder
-XX:StartFlightRecording=duration=10s,
filename=record-gcing1G_Xms1G.jfr,
settings=settings.jfc
GCing 640

Xmxs1GXmx1G

Sin establecer un tamaño de heap inicial se ejecutan 20 procesos GC ya sean DefNew o SerialOld durando poco más de 52 milisegundos mientras que estableciendo 1G de heap inicial (porque se ha medido que la aplicación consume de media 640 MB) se ejecutan 18 procesos de GC que en total duran un poco más de 46 milisegundos. Estos datos se pueden ver en la pestaña izquierda Memory y pestañas inferiores Garbage Collections, GC Times y Allocations.

Como conclusión no se puede decir que la mejora en reducción del tiempo de ejecución sea sustancial porque casi es despreciable usando -Xms1G, pero en el JDK ahora tenemos una gran herramienta muy fácil de usar que pone en bandeja un montón de métricas que no costará nada analizar cuando se perciba un rendimiento pobre.

Enlace a zip con grabaciones, plantilla y código fuente.