Parte 2: Captura de Cambios de Datos (CDC) para MongoDB con Debezium y Memphis.dev

Esta es la segunda parte de una serie de entradas de blog sobre la construcción de un sistema moderno orientado a eventos utilizando Memphis.dev.

Nuestro último blog post presentó una implementación de referencia para capturar eventos de captura de datos de cambio (CDC) desde una base de datos PostgreSQL utilizando Debezium Server y Memphis.dev. Al reemplazar Apache Kafka con Memphis.dev, la solución redujo sustancialmente los recursos operativos y la sobrecarga, ahorrando dinero y liberando a los desarrolladores para que se centren en construir nuevas funcionalidades.

Sin embargo, PostgreSQL no es la única base de datos comúnmente utilizada. Debezium proporciona conectores para varias bases de datos, incluida la base de datos documental no relacional MongoDB. MongoDB es popular entre los desarrolladores, especialmente aquellos que trabajan con lenguajes de programación dinámicos, ya que evita la desadaptación de impedancia objeto-relacional. Los desarrolladores pueden almacenar, consultar y actualizar objetos directamente en la base de datos.

En esta entrada de blog, demostramos cómo adaptar la solución CDC a MongoDB.

Resumen de la Solución

Aquí describimos la arquitectura de la solución de referencia para entregar eventos de captura de datos de cambio con Memphis.dev. La arquitectura no ha cambiado desde nuestra entrada de blog anterior excepto por el reemplazo de PostgreSQL con MongoDB.

A Todo Item generator script writes randomly generated records to MongoDB. Debezium Server receives CDC events from MongoDB and forwards them to the Memphis REST gateway through the HTTP client sink. The Memphis REST gateway adds the messages to a station in Memphis.dev. Lastly, a consumer script polls Memphis. dev for new messages and prints them to the console.

  • Generador de Todos: Inserta un elemento de tarea generado aleatoriamente en la colección de MongoDB cada 0.5 segundos. Cada elemento de tarea contiene una descripción, marca de tiempo de creación, fecha límite opcional y estado de finalización.
  • MongoDB: Configurado con una base de datos única que contiene una sola colección (todo_items).
  • Servidor Debezium: Instancia de Debezium Server configurada con conectores de origen MongoDB y de receptor HTTP Client.
  • Gateway REST de Memphis.dev: Utiliza la configuración out-of-the-box.
  • Memphis.dev: Configurado con una sola estación (todo-cdc-events) y un solo usuario (todocdcservice).
  • Consumidor de Impresión: Un script que utiliza el SDK de Python de Memphis.dev para consumir mensajes y imprimirlos en la consola.

Empezando

El tutorial de implementación está disponible en el directorio mongodb-debezium-cdc-example del repositorio de Soluciones Ejemplo de Memphis. Docker Compose será necesario para ejecutarlo.

Ejecutando la Implementación

Construir las imágenes de Docker para el Debezium Server, el consumidor de impresión y la configuración de la base de datos (creación de tablas y usuarios).

Actualmente, la implementación depende de una versión preliminar del Debezium Server para el soporte de autenticación JWT. Se construirá una imagen de Docker directamente desde la rama principal de los repositorios Debezium y Debezium Server. Tenga en cuenta que este paso puede tardar bastante (~20 minutos) en ejecutarse. Cuando se lance Debezium Server 2.3.0, cambiaremos al uso de la imagen de Docker de upstream.

Paso 1: Construir las Imágenes

Shell

 

$ docker compose build --pull --no-cache

Paso 2: Iniciar el Broker y la Puerta de Enlace REST de Memphis.dev

Iniciar el broker de Memphis.dev y el gateway REST. Tenga en cuenta que el servicio de Memphis-rest-gateway depende del servicio de broker de Memphis, por lo que también se iniciará el servicio de broker.

Shell

 

$ docker compose up -d memphis-rest-gateway
Shell

 

 
[+] Running 4/4
 ⠿ Network mongodb-debezium-cdc-example_default                   Created                                                        0.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        6.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                       16.8s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Started

Paso 3: Crear una Estación y Usuario Correspondiente en Memphis.dev

Los mensajes se entregan a “estaciones” en Memphis.dev; son equivalentes a los “temas” utilizados por los brokers de mensajes. Dirija su navegador a localhost:9000. Haga clic en el enlace “iniciar sesión con root” en la parte inferior de la página.

Inicie sesión con root (nombre de usuario) y memphis (contraseña). 

Siga el asistente para crear una estación llamada todo-cdc-events.

Crear un usuario llamado todocdcservice con el mismo valor para la contraseña.

Haga clic en “Siguiente” hasta que el asistente esté terminado:

Haga clic en “Ir al resumen de la estación” para ir a la página de resumen de la estación.

Paso 4: Iniciar el Consumidor de Impresión

Utilizamos el SDK de Python de Memphis.dev para crear un script de consumidor que verifica la estación de eventos todo-cdc e imprime los mensajes en la consola.

Shell

 

$ docker compose up -d printing-consumer
Shell

 

 
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container printing-consumer                                Started     

Paso 5: Inicio y Configuración de MongoDB

Para capturar cambios, debe habilitarse la funcionalidad de replicación de MongoDB. Hay varios pasos:

  • El nombre del conjunto de réplicas debe establecerse. Esto se puede hacer pasando el nombre de un conjunto de réplicas en la línea de comandos o en el archivo de configuración. En el archivo Docker Compose, ejecutamos MongoDB con el argumento de línea de comandos –replSet rs0 para establecer el nombre del conjunto de réplicas.
  • Cuando se utiliza la replicación y se habilita la autorización, se debe proporcionar un archivo de clave común a cada instancia de réplica. Generamos un archivo de clave siguiendo las instrucciones en la documentación de MongoDB. Luego construimos una imagen que extiende la imagen oficial de MongoDB incluyendo el archivo de clave.
  • El conjunto de réplicas necesita ser inicializado una vez que MongoDB está en funcionamiento. Utilizamos un script que configura la instancia en el arranque. El script llama al comando replSetInitiate con una lista de las direcciones IP y puertos de cada instancia de MongoDB en el conjunto de réplicas. Este comando hace que las instancias de MongoDB se comuniquen entre sí y seleccionen a un líder.

En general, las réplicas se utilizan para aumentar la fiabilidad (alta disponibilidad). La mayoría de la documentación que encontrarás describe cómo configurar una réplica con múltiples instancias de MongoDB. En nuestro caso, el conector de MongoDB de Debezium aprovecha la funcionalidad de replicación para capturar eventos de cambios de datos. Aunque seguimos los pasos para configurar un conjunto de réplicas, solo utilizamos una instancia de MongoDB.

El script generador de elementos de tareas pendientes crea un nuevo elemento de tarea pendiente cada medio segundo. Los valores de los campos se generan de forma aleatoria. Los elementos se agregan a una colección de MongoDB llamada “todo_items”.

En el archivo Docker Compose, el script generador de elementos de tareas pendientes está configurado para depender de la instancia de Mongodb en un estado saludable y del éxito en la ejecución del script de configuración de la base de datos. Al iniciar el script generador de elementos de tareas pendientes, Docker Compose también iniciará MongoDB y ejecutará el script de configuración de la base de datos.

Shell

 

$ docker compose up -d todo-generator
Shell

 

[+] Running 3/3
 ⠿ Container mongodb                 Healthy                                                                                     8.4s
 ⠿ Container mongodb-database-setup  Exited                                                                                      8.8s
 ⠿ Container mongodb-todo-generator  Started   

Paso 6: Iniciar el Servidor Debezium

El último servicio que necesita ser iniciado es el Servidor Debezium. El servidor está configurado con un conector de origen para MongoDB y el conector de receptor HTTP Client a través de un archivo de propiedades Java:

Shell

 

 
debezium.sink.type=http
debezium.sink.http.url=http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single
debezium.sink.http.time-out.ms=500
debezium.sink.http.retries=3
debezium.sink.http.authentication.type=jwt
debezium.sink.http.authentication.jwt.username=todocdcservice
debezium.sink.http.authentication.jwt.password=todocdcservice
debezium.sink.http.authentication.jwt.url=http://memphis-rest-gateway:4444/
debezium.source.connector.class=io.debezium.connector.mongodb.MongoDbConnector
debezium.source.mongodb.connection.string=mongodb://db
debezium.source.mongodb.user=root
debezium.source.mongodb.password=mongodb
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.topic.prefix=tutorial
debezium.format.key=json
debezium.format.value=json
quarkus.log.console.json=false

La mayoría de las opciones son autoexplicativas. La URL del receptor HTTP es digna de una explicación detallada. La puerta de enlace REST de Memphis.dev espera recibir solicitudes POST con un path en el siguiente formato:

/stations/{station}/produce/{quantity}

El marcador de posición {station} se reemplaza con el nombre de la estación a la que enviar el mensaje. El marcador de posición {quantity} se reemplaza con el valor single (para un solo mensaje) o batch (para varios mensajes).

El mensaje(s) se transmite(n) como carga útil de la solicitud POST. La puerta de enlace REST admite tres formatos de mensaje (texto plano, JSON o protocol buffer). El valor (text/, application/json, application/x-protobuf) del campo de encabezado content-type determina cómo se interpreta la carga útil.

El cliente HTTP de Debezium Server produce solicitudes REST que son consistentes con estos patrones. Las solicitudes utilizan el verbo POST; cada solicitud contiene un solo mensaje codificado en JSON como carga útil, y el encabezado content-type se establece en application/JSON. Utilizamos todo-CDC-eventos como nombre de estación y el valor de cantidad único en la URL del endpoint para enrutar mensajes e indicar cómo la puerta de enlace REST debe interpretar las solicitudes:

http://memphis-rest-gateway:4444/stations/todo-cdc-events/produce/single

La propiedad debezium.sink.http.authentication.type=jwt indica que el cliente HTTP debe utilizar la autenticación JWT. Las propiedades de nombre de usuario y contraseña son evidentes, pero la propiedad debezium.sink.http.authentication.jwt.The URL merece una explicación. Se adquiere un token inicial utilizando el endpoint /auth/authenticate, mientras que la autenticación se refresca utilizando el endpoint separado /auth/refreshToken. La autenticación JWT en el cliente HTTP agrega el endpoint apropiado a la URL base dada.

Debezium Server puede iniciarse con el siguiente comando:

Shell

 

$ docker compose up -d debezium-server
Shell

 

[+] Running 5/5
 ⠿ Container mongodb                                              Healthy                                                        1.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1      Healthy                                                        0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1               Healthy                                                        1.0s
 ⠿ Container mongodb-debezium-cdc-example-memphis-rest-gateway-1  Running                                                        0.0s
 ⠿ Container debezium-server                                      Started

Paso 7: Confirmar que el Sistema Funciona

Revise la pantalla de resumen de la estación todo-cdc-events en la interfaz web de Memphis.dev para confirmar que el productor y el consumidor están conectados y que los mensajes se están entregando.

Y, imprima los registros del contenedor de impresión-consumidor:

Shell

 

$ docker logs --tail 2 printing-consumer

mensaje:

Shell

 

bytearray(b'{"schema":{"type":"struct","fields":[{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"before"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"after"},{"type":"struct","fields":[{"type":"array","items":{"type":"string","optional":false},"optional":true,"field":"removedFields"},{"type":"string","optional":true,"name":"io.debezium.data.Json","version":1,"field":"updatedFields"},{"type":"array","items":{"type":"struct","fields":[{"type":"string","optional":false,"field":"field"},{"type":"int32","optional":false,"field":"size"}],"optional":false,"name":"io.debezium.connector.mongodb.changestream.truncatedarray","version":1},"optional":true,"field":"truncatedArrays"}],"optional":true,"name":"io.debezium.connector.mongodb.changestream.updatedescription","version":1,"field":"updateDescription"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"rs"},{"type":"string","optional":false,"field":"collection"},{"type":"int32","optional":false,"field":"ord"},{"type":"string","optional":true,"field":"lsid"},{"type":"int64","optional":true,"field":"txnNumber"},{"type":"int64","optional":true,"field":"wallTime"}],"optional":false,"name":"io.debezium.connector.mongo.Source","field":"source"},{"type":"string","optional":true,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"tutorial.todo_application.todo_items.Envelope"},"payload":{"before":null,"after":"{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ec\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402475},\\"due_date\\": {\\"$date\\": 1684266602475},\\"description\\": \\"GMZVMKXVKOWIOEAVRYWR\\",\\"completed\\": false}","updateDescription":null,"source":{"version":"2.3.0-SNAPSHOT","connector":"mongodb","name":"tutorial","ts_ms":1684007402000,"snapshot":"false","db":"todo_application","sequence":null,"rs":"rs0","collection":"todo_items","ord":1,"lsid":null,"txnNumber":null,"wallTime":1684007402476},"op":"c","ts_ms":1684007402478,"transaction":null}}')

Formato de los mensajes de CDC

Los mensajes entrantes están formados como JSON. Los mensajes tienen dos campos de nivel superior (esquema y carga útil). El esquema describe el esquema del registro (nombres y tipos de campo), mientras que la carga útil describe el cambio en el registro. El objeto de carga útil en sí contiene dos campos (antes y después) que indican el valor del registro antes y después del cambio.

Para MongoDB, el Debezium Server codifica el registro como una cadena de JSON serializado:

Shell

 

 
{
"before" : null,

"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}"
}

Esto tendrá implicaciones para el procesamiento posterior de mensajes, que describiremos en una futura entrada de blog de esta serie.

¡Felicidades! Ahora tienes un ejemplo funcional de cómo capturar eventos de cambios de datos de una base de datos MongoDB utilizando el Debezium Server y transferir los eventos a Memphis.dev para el procesamiento posterior.

¡Parte 3 saldrá pronto!

Source:
https://dzone.com/articles/part-2-change-data-capture-cdc-for-mongodb-with-de