חלק 3: המרת הודעות אירועי CDC של MongoDB

בפוסט הבלוגי האחרון שלנו, הצגנו יישום ייחוס ללכידת אירועי תיקון נתונים (CDC) ממסד נתונים של MongoDB באמצעות Debezium Server ו-Memphis.dev. בסוף הפוסט, ציינו שרשומות MongoDB מקודדות כמחרוזות בהודעות CDC של Debezium כך:

Python

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
            ...
         }
}

אנו רוצים להשתמש בפונקציונליות Schemaverse של Memphis.dev לבדיקת הודעות כנגד תבנית צפויה. הודעות שאינן תואמות את התבנית מועברות לתחנת מוות כך שלא משפיעות על צרכנים תחתונים. אם כל זה נשמע כמו יוונית עתיקה, אל תדאגו! נסביר את הפרטים בפוסט הבלוגי הבא שלנו.

כדי להשתמש בפונקציונליות כמו Schemaverse, עלינו לפענח את רשומות MongoDB כמסמכים JSON. פוסט הבלוג זה מתאר שינוי בזרם ה-CDC של MongoDB שלנו שמוסיף שירות טרנספורמר לפיענוח רשומות MongoDB למסמכים JSON.

תיאור הפיתרון

הפיתרון הקודם כלל שש רכיבים:

1. יצרן פריטי עבודה: מכניס פריט עבודה באופן אקראי שנוצר בקולקציה MongoDB כל 0.5 שניות. כל פריט עבודה כולל תיאור, תאריך יצירה, תאריך עיכוב אופציונלי וסטטוס של סיום.

2. MongoDB: מותאם עם מסד נתונים יחידה המכילה רשימה יחידה (todo_items).

3. Debezium Server: מופע של שרת Debezium המותאם עם מקור MongoDB ומתוך תקשורת שקת HTTP Client.

4. Memphis.dev REST Gateway: משתמש בהתאמה מוכנה.

5. Memphis.dev: מותאם עם תחנה יחידה (todo-cdc-events) ומשתמש יחיד (todocdcservice)

6. צופה הדפסה: תוכנית המשתמשת ב-SDK הפייתון של Memphis.dev לצריך הודעות ולהדפיס אותן לתוך התפריט.

באיטרציה זו, אנו מוסיפים שני רכיבים נוספים:

1. שירות מרופד: שירות מרופד הצורך הודעות מתחנת todo-cdc-events, מפענח את רשומות MongoDB, ודוחף אותם לתחנת cleaned-todo-cdc-events.

2. צופה הדפסה נקי: מופע שני של צופה הדפסה המדפיס הודעות שנדחפו לתחנת cleaned-todo-cdc-events.

הארכיטקטורה המעודכנת נראית ככה:

A Deep Dive Into the Transformer Service

מערך של שירות ההמרה של הודעות

שירות ההמרה משתמש ב-SDK של Memphis.dev בשפת Python. בואו נעבור על היישום של המרה. השיטה הראשית של המרה שלנו מתחילה בחיבור למוקד של Memphis.dev. הפרטים לחיבור נלקחים ממשתני הסביבה. הכתובת, שם המשתמש, הסיסמה, שם תחנת הקלט ושם תחנת הפלט מועברים באמצעות משתני סביבה על פי ההצעות מהחמישה-גורמים של האפליקציה.

Python

async def main():
    try:
        print("Waiting on messages...")
        memphis = Memphis()
        await memphis.connect(host=os.environ[HOST_KEY],
                              username=os.environ[USERNAME_KEY],
                              password=os.environ[PASSWORD_KEY])

ברגע שנוצר חיבור, אנו יוצרים אובייקטים של צרכן ויצרן. ב-Memphis.dev, לצרכנים וליצרנים יש שמות. השמות אלו מופיעים בממשק המשתמש של Memphis.dev, מה שמספק את תחושת הפתיחות לתהליכים המערכתיים. 

Python

print("Creating consumer")
        consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
                                          consumer_name="transformer",
                                          consumer_group="")

        print("Creating producer")
        producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
                                          producer_name="transformer")

API הצרכן משתמש בתבנית עיצוב של פונקציית התגובה. כשהודעות נמשכות מהמוקד, פונקצית ההתגובה המוצגת מוקרנת עם רשימת הודעות כארגומנט שלה.

Python

print("Creating handler")
        msg_handler = create_handler(producer)

        print("Setting handler")
        consumer.consume(msg_handler)

לאחר הקבלת הקולבק, אנו מתחילים את לולאת האירועים asyncio. בשלב זה, שירות הטרנספורמר מפסיק וממתין עד שהודעות זמינות למשוך מהברוקר.

  • השאר את הנוכחי שלך חי כך שהצרכן ימשיך לקבל נתונים.
Python

await asyncio.Event().wait()

יצירת פונקציית יוזר הודעה

הפונקציה ליצירת יוזר הודעה לוקחת אובייקט ייצור ומחזירה פונקציית קולבק. מאחר ופונקציית הקולבק מקבלת רק טיעון אחד, אנו משתמשים בדפוס הסגירה closure pattern כדי להעביר באופן סמנטי את היצרן לפונקציית msg_handler כשאנו יוצרים אותה.

הפונקציה msg_handler מקבלת שלוש טיעונים כשנקראת: רשימת הודעות, שגיאה (אם קרתה אחת), והקשר המורכב ממילון. המנהל שלנו עובר על ההודעות, קורא לפונקציית ההפיכה על כל אחת מהן, שולח את ההודעות לתחנה השנייה באמצעות היצרן, ומאשר שהודעת העבודה התבצעה. ב-Memphis.dev, הודעות אינן מסומנות כמשוחררות עד שהצרכן מאשר אותן. זה מונע מהודעות ליפול אם מילת שגיאה קורתת במהלך עיבוד.

Python

def create_handler(producer):
    async def msg_handler(msgs, error, context):
        try:
            for msg in msgs:
                transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
                await producer.produce(message=transformed_msg)
                await msg.ack()
        except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
            print(e)
            return

    return msg_handler

פונקציית הטרנספורמציה של הודעה

עכשיו, אנו מגיעים לחלק העיקרי של השירות: פונקציית ההמרה של ההודעה. ערכי הודעה (המחזירה על ידי השיטה get_data()) מאוחסנים כאובייקטים bytearray. אנו משתמשים בספריית Python json כדי לפשט את הודעותינו להיפרכה של קולקציות Python (רשימה ומילון) וסוגים ראשוניים (int, float, str, ו-None).

Python

def deserialize_mongodb_cdc_event(input_msg):
    obj = json.loads(input_msg)

אנו מצפים שלאובייקט יהיה תכונה payload עם אובייקט כערך. אותו אובייקט יש אז שתי תכונות (“לפני” ו-“אחרי”), שהן או None או מחרוזות המכילות אובייקטים חינמטיים ב-JSON. אנו משתמשים שוב בספריית JSON כדי לפשט ולהחליף את המחרוזות באובייקטים. 

Python

 

if "payload" in obj:
        payload = obj["payload"]

        if "before" in payload:
            before_payload = payload["before"]
            if before_payload is not None:
                payload["before"] = json.loads(before_payload)

        if "after" in payload:
            after_payload = payload["after"]
            if after_payload is not None:
                payload["after"] = json.loads(after_payload)

לבסוף, אנו מחזירים להמיר שוב את כל הרשומה ה-JSON וממירים אותה חזרה ל-bytearray לשימוש בשליחה לסוכן. 

Python

 

output_s = json.dumps(obj)
    output_msg = bytearray(output_s, "utf-8")
    return output_msg

הואראי! האובייקטים שלנו נראים כך:

Python

 

{
	"schema" : ...,
        "payload" : {
        	"before" : null,
            "after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},

...
}
}

ריצה של שירות ההמרה

אם עקבתם אחרי שבעת השלבים בפוסט הבלוג הקודם, אתם זקוקים רק לשלושה שלבים נוספים. כדי להפעיל את שירות ההמרה ולוודא שהוא פועל:

שלב 8: הפעל את שירות ההמרה

Python

 

$ docker compose up -d cdc-transformer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cdc-transformer                                  Started                                                            1.3s

שלב 9: הפעל את הצרכן ההדפסה השני

Python

 

$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
 ⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1  Hea...                                                             0.5s
 ⠿ Container mongodb-debezium-cdc-example-memphis-1           Healthy                                                            1.0s
 ⠿ Container cleaned-printing-consumer                        Started                                                            1.3s

שלב 10: בדוק את ממשק ממפיס יו

כשהמרכז מתחיל להפיק הודעות ל-Memphis.dev, תיצור תחנה שנייה בשם “cleaned-todo-cdc-events”. אתם אמורים לראות את התחנה החדשה בדף סקירת התחנות בממשק ממפיס יו כך:

דף הפרטים עבור הדף "cleaned-todo-cdc-events" צריך להציג את הטרנספורמר שמחובר כייצרן, הלקוח המדפיס וההודעות המעובדות: 

מזל טוב! אנו כעת מוכנים לטפל באימות הודעות באמצעות Schemaverse בפוסט הבלוג הבא שלנו. התעדכנו!

במקרה שפספסתם את החלקים 1 ו-2:

Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages