בפוסט הבלוגי האחרון שלנו, הצגנו יישום ייחוס ללכידת אירועי תיקון נתונים (CDC) ממסד נתונים של MongoDB באמצעות Debezium Server ו-Memphis.dev. בסוף הפוסט, ציינו שרשומות MongoDB מקודדות כמחרוזות בהודעות CDC של Debezium כך:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" : "{\\"_id\\": {\\"$oid\\": \\"645fe9eaf4790c34c8fcc2ed\\"},\\"creation_timestamp\\": {\\"$date\\": 1684007402978},\\"due_date\\": {\\"$date\\": 1684266602978},\\"description\\": \\"buy milk\\",\\"completed\\": false}",
...
}
}
אנו רוצים להשתמש בפונקציונליות Schemaverse של Memphis.dev לבדיקת הודעות כנגד תבנית צפויה. הודעות שאינן תואמות את התבנית מועברות לתחנת מוות כך שלא משפיעות על צרכנים תחתונים. אם כל זה נשמע כמו יוונית עתיקה, אל תדאגו! נסביר את הפרטים בפוסט הבלוגי הבא שלנו.
כדי להשתמש בפונקציונליות כמו Schemaverse, עלינו לפענח את רשומות MongoDB כמסמכים JSON. פוסט הבלוג זה מתאר שינוי בזרם ה-CDC של MongoDB שלנו שמוסיף שירות טרנספורמר לפיענוח רשומות MongoDB למסמכים JSON.
תיאור הפיתרון
הפיתרון הקודם כלל שש רכיבים:
1. יצרן פריטי עבודה: מכניס פריט עבודה באופן אקראי שנוצר בקולקציה MongoDB כל 0.5 שניות. כל פריט עבודה כולל תיאור, תאריך יצירה, תאריך עיכוב אופציונלי וסטטוס של סיום.
2. MongoDB: מותאם עם מסד נתונים יחידה המכילה רשימה יחידה (todo_items).
3. Debezium Server: מופע של שרת Debezium המותאם עם מקור MongoDB ומתוך תקשורת שקת HTTP Client.
4. Memphis.dev REST Gateway: משתמש בהתאמה מוכנה.
5. Memphis.dev: מותאם עם תחנה יחידה (todo-cdc-events) ומשתמש יחיד (todocdcservice)
6. צופה הדפסה: תוכנית המשתמשת ב-SDK הפייתון של Memphis.dev לצריך הודעות ולהדפיס אותן לתוך התפריט.
באיטרציה זו, אנו מוסיפים שני רכיבים נוספים:
1. שירות מרופד: שירות מרופד הצורך הודעות מתחנת todo-cdc-events, מפענח את רשומות MongoDB, ודוחף אותם לתחנת cleaned-todo-cdc-events.
2. צופה הדפסה נקי: מופע שני של צופה הדפסה המדפיס הודעות שנדחפו לתחנת cleaned-todo-cdc-events.
הארכיטקטורה המעודכנת נראית ככה:
A Deep Dive Into the Transformer Service
מערך של שירות ההמרה של הודעות
שירות ההמרה משתמש ב-SDK של Memphis.dev בשפת Python. בואו נעבור על היישום של המרה. השיטה הראשית של המרה שלנו מתחילה בחיבור למוקד של Memphis.dev. הפרטים לחיבור נלקחים ממשתני הסביבה. הכתובת, שם המשתמש, הסיסמה, שם תחנת הקלט ושם תחנת הפלט מועברים באמצעות משתני סביבה על פי ההצעות מהחמישה-גורמים של האפליקציה.
async def main():
try:
print("Waiting on messages...")
memphis = Memphis()
await memphis.connect(host=os.environ[HOST_KEY],
username=os.environ[USERNAME_KEY],
password=os.environ[PASSWORD_KEY])
ברגע שנוצר חיבור, אנו יוצרים אובייקטים של צרכן ויצרן. ב-Memphis.dev, לצרכנים וליצרנים יש שמות. השמות אלו מופיעים בממשק המשתמש של Memphis.dev, מה שמספק את תחושת הפתיחות לתהליכים המערכתיים.
print("Creating consumer")
consumer = await memphis.consumer(station_name=os.environ[INPUT_STATION_KEY],
consumer_name="transformer",
consumer_group="")
print("Creating producer")
producer = await memphis.producer(station_name=os.environ[OUTPUT_STATION_KEY],
producer_name="transformer")
API הצרכן משתמש בתבנית עיצוב של פונקציית התגובה. כשהודעות נמשכות מהמוקד, פונקצית ההתגובה המוצגת מוקרנת עם רשימת הודעות כארגומנט שלה.
print("Creating handler")
msg_handler = create_handler(producer)
print("Setting handler")
consumer.consume(msg_handler)
לאחר הקבלת הקולבק, אנו מתחילים את לולאת האירועים asyncio. בשלב זה, שירות הטרנספורמר מפסיק וממתין עד שהודעות זמינות למשוך מהברוקר.
- השאר את הנוכחי שלך חי כך שהצרכן ימשיך לקבל נתונים.
await asyncio.Event().wait()
יצירת פונקציית יוזר הודעה
הפונקציה ליצירת יוזר הודעה לוקחת אובייקט ייצור ומחזירה פונקציית קולבק. מאחר ופונקציית הקולבק מקבלת רק טיעון אחד, אנו משתמשים בדפוס הסגירה closure pattern כדי להעביר באופן סמנטי את היצרן לפונקציית msg_handler כשאנו יוצרים אותה.
הפונקציה msg_handler מקבלת שלוש טיעונים כשנקראת: רשימת הודעות, שגיאה (אם קרתה אחת), והקשר המורכב ממילון. המנהל שלנו עובר על ההודעות, קורא לפונקציית ההפיכה על כל אחת מהן, שולח את ההודעות לתחנה השנייה באמצעות היצרן, ומאשר שהודעת העבודה התבצעה. ב-Memphis.dev, הודעות אינן מסומנות כמשוחררות עד שהצרכן מאשר אותן. זה מונע מהודעות ליפול אם מילת שגיאה קורתת במהלך עיבוד.
def create_handler(producer):
async def msg_handler(msgs, error, context):
try:
for msg in msgs:
transformed_msg = deserialize_mongodb_cdc_event(msg.get_data())
await producer.produce(message=transformed_msg)
await msg.ack()
except (MemphisError, MemphisConnectError, MemphisHeaderError) as e:
print(e)
return
return msg_handler
פונקציית הטרנספורמציה של הודעה
עכשיו, אנו מגיעים לחלק העיקרי של השירות: פונקציית ההמרה של ההודעה. ערכי הודעה (המחזירה על ידי השיטה get_data()) מאוחסנים כאובייקטים bytearray. אנו משתמשים בספריית Python json כדי לפשט את הודעותינו להיפרכה של קולקציות Python (רשימה ומילון) וסוגים ראשוניים (int, float, str, ו-None).
def deserialize_mongodb_cdc_event(input_msg):
obj = json.loads(input_msg)
אנו מצפים שלאובייקט יהיה תכונה payload עם אובייקט כערך. אותו אובייקט יש אז שתי תכונות (“לפני” ו-“אחרי”), שהן או None או מחרוזות המכילות אובייקטים חינמטיים ב-JSON. אנו משתמשים שוב בספריית JSON כדי לפשט ולהחליף את המחרוזות באובייקטים.
if "payload" in obj:
payload = obj["payload"]
if "before" in payload:
before_payload = payload["before"]
if before_payload is not None:
payload["before"] = json.loads(before_payload)
if "after" in payload:
after_payload = payload["after"]
if after_payload is not None:
payload["after"] = json.loads(after_payload)
לבסוף, אנו מחזירים להמיר שוב את כל הרשומה ה-JSON וממירים אותה חזרה ל-bytearray לשימוש בשליחה לסוכן.
output_s = json.dumps(obj)
output_msg = bytearray(output_s, "utf-8")
return output_msg
הואראי! האובייקטים שלנו נראים כך:
{
"schema" : ...,
"payload" : {
"before" : null,
"after" :
"_id": { "$oid": "645fe9eaf4790c34c8fcc2ed" },
"creation_timestamp": { "$date": 1684007402978 },
"due_date": { "$date" : 1684266602978 },
"description": "buy milk",
"completed": false
},
...
}
}
ריצה של שירות ההמרה
אם עקבתם אחרי שבעת השלבים בפוסט הבלוג הקודם, אתם זקוקים רק לשלושה שלבים נוספים. כדי להפעיל את שירות ההמרה ולוודא שהוא פועל:
שלב 8: הפעל את שירות ההמרה
$ docker compose up -d cdc-transformer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cdc-transformer Started 1.3s
שלב 9: הפעל את הצרכן ההדפסה השני
$ docker compose up -d cleaned-printing-consumer
[+] Running 3/3
⠿ Container mongodb-debezium-cdc-example-memphis-metadata-1 Hea... 0.5s
⠿ Container mongodb-debezium-cdc-example-memphis-1 Healthy 1.0s
⠿ Container cleaned-printing-consumer Started 1.3s
שלב 10: בדוק את ממשק ממפיס יו
כשהמרכז מתחיל להפיק הודעות ל-Memphis.dev, תיצור תחנה שנייה בשם “cleaned-todo-cdc-events”. אתם אמורים לראות את התחנה החדשה בדף סקירת התחנות בממשק ממפיס יו כך:
דף הפרטים עבור הדף "cleaned-todo-cdc-events" צריך להציג את הטרנספורמר שמחובר כייצרן, הלקוח המדפיס וההודעות המעובדות:
מזל טוב! אנו כעת מוכנים לטפל באימות הודעות באמצעות Schemaverse בפוסט הבלוג הבא שלנו. התעדכנו!
במקרה שפספסתם את החלקים 1 ו-2:
Source:
https://dzone.com/articles/part-3-transforming-mongodb-cdc-event-messages