ביצוע ניתוח מתקדם של נתוני אירועי Facebook עם מסד נתונים וקטורי

בעידן הדיגיטלי של היום, מקצוענים מכל התעשיות חייבים להישאר מעודכנים עם אירועים עתידיים, כנסים וסדנאות. עם זאת, מציאת אירועים המתאימים לתחומי העניין של אדם בקרב הים העצום של מידע מקוון מהווה אתגר משמעותי.

בבלוג זה מציגים פתרון חדש לאתגר זה: יישום מקיף שמושך נתוני אירועים מפייסבוק ומנתח את הנתונים המושפעים באמצעות MyScale. בעוד ש-MyScale קשור בדרך כלל לענף RAG או משמש כמסד נתונים וקטורי, היכולות שלו מעבר לתחומים אלה. נשתמש בו לניתוח נתונים, משתמשים בפונקציות החיפוש הווקטוריות שלו לניתוח אירועים הדומים באופן סמנטי, כך מספקים תוצאות ותובנות טובות יותר.

ייתכן שתבחין ש-Grok AI השתמש במסד נתונים וקטורי Qdrant כמנוע חיפוש כדי להשיג מידע בזמן אמת מ-X (בעבר ידוע כטוויטר). אפשר גם להעריך את כוחם של מסדי נתונים וקטורי בדרך זו עם MyScale על ידי שילוב MyScale עם רשתות אחרות כמו Apify כדי לשפר משימות חיים יומיומיות באמצעות פיתוח יישומים מותאמים אישית פשוטים.

אז בבלוג זה, בואו נפתח יישום שלוקח רק את שם העיר כקלט ומושך את כל האירועים הקשורים מפייסבוק. לאחר מכן, נבצע ניתוח נתונים וחיפוש סמנטי באמצעות היכולות הSQL המתקדמות של MyScale.

כלים וטכנולוגיות

נשתמש במספר כלים, כולל Apify, MyScale ו-OpenAI, לפיתוח האפליקציה המועילה הזו.

  • Apify: פלטפורמת סקריפינג ואוטומציה פופולרית שמאפשרת לצמצם משמעותית את תהליך איסוף הנתונים. היא מספקת את היכולת לסרוק נתונים ולאחר מכן להזרים אותם ל-LLMs. זה מאפשר לנו לאמן LLMs על נתונים בזמן אמת ולפתח יישומים.
  • MyScale: MyScale היא מסד נתונים וקטורי SQL שאנו משתמשים בו לאחסון ועיבוד נתונים מובנים ולא מובנים בצורה מותאמת אישית.
  • OpenAI: נשתמש במודל text-embedding-3-small מ-OpenAI כדי לקבל את ההטמעות של הטקסט ולאחר מכן לשמור את ההטמעות הללו ב-MyScale לניתוח נתונים וחיפושים סמנטיים.

איך להגדיר את MyScale ו-Apify

כדי להתחיל להגדיר את MyScale ו-Apify, תצטרך ליצור ספרייה חדשה וקובץ Python. תוכל לעשות זאת על ידי פתיחת המסוף או שורת הפקודה שלך והזנת הפקודות הבאות:

Shell

 

mkdir MyScale
cd MyScale
touch main.ipynb

בואו נתקין את החבילות. העתק את הפקודה שלמטה והדבק אותה במסוף שלך. חבילות אלו יספקו את הכלים והספריות הנחוצים לפיתוח האפליקציה שלנו.

Shell

 

pip install openai apify-client clickhouse-connect pandas numpy

זה צריך להתקין את כל התלמים במערכת שלך. כדי לאמת שהכל מותקן כראוי, תוכל להזין את הפקודה הבאה במסוף שלך.

Shell

 

pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'

זה צריך לכלול את כל ההסמכות המותקנות עם גרסאותיהן. אם אתה מציין שיש הסמכות חסרות, ייתכן שתצטרך להפעיל מחדש את פקודת ההתקנה עבור החבילה הספציפית הזו. עכשיו, אנו מוכנים לכתוב את הקוד שלנו לאחר ההתקנות.

הערה: אנו נעבוד במחברת פייתון. שקול כל בלוק קוד תא מחברת.

כיצד לגשת לנתונים עם Apify

עכשיו, אנו נשתמש ב-API של Apify כדי לגשת לנתוני אירועים של ניו יורק על ידי שימוש ב-מסננת אירועי פייסבוק

Python

 

     import pandas as pd
from apify_client import ApifyClient
# אתחול ApifyClient עם תעודת ה-API שלך
client = ApifyClient("Enter_your_apify_key_here")

# הכנת קלט השחקן
run_input = {
    "searchQueries": ["Sport New York"],
    "startUrls": [],
    "maxEvents": 50,
}

# הרץ את השחקן וחכה עד שהוא יסתיים
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)

df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)

for item in client.dataset(run["defaultDatasetId"]).iterate_items():
    # השתמש בהערכת מילון כדי להחליף ערכים None במחרוזת ריקה
    row = {
        'Name': item.get('name', ''),
        'Datetime': item.get('dateTimeSentence', ''),
        'Description': item.get('description', ''),
        'Users_Going': item.get('usersGoing', ''),
        'Users_Interested': item.get('usersInterested', ''),
        'Users_Responded': item.get('usersResponded', ''),
        'City': item.get('location', {}).get('city', '') if item.get('location') else '',
        'Organized_By': item.get('organizedBy', ''),
        'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
    }
    # דאג לכך שכל הערכים None מוחלפים במחרוזת ריקה
    row = {k: (v if v is not None else '') for k, v in row.items()}
    dataframe1 = dataframe1._append(row, ignore_index=True)

# ניקוי הנתונים
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)

תסריט זה נותן לנו את פרטי האירועים הקרובים בצורה של pandas DataFrame.

הערה: אל תשכחו להוסיף את מפתח ה-API של Apify בתוך התסריט לעיל. אפשר למצוא את האסימון של ה-API בדף Integrations בחשבון Apify Console.

הכנת נתונים מקדימה

כשאנו אוספים נתונים גולמיים, הם מגיעים בפורמטים שונים. בתסריט זה, נביא את תאריכי האירועים לפורמט אחיד כדי שנוכל לבצע את סינון הנתונים בצורה יעילה יותר.

Python

 

     # הוספת ספריות הנחוצות לניהול נתונים ופירוק תאריכים
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser

# פונקציה לפירוק מחרוזות תאריך שעשויות לייצג טווח או תאריך יחיד
def parse_dates(date_str):
    # בדיקה אם מחרוזת התאריך מכילה מקף, מה שמעיד על טווח
    if '-' in date_str:
        parts = date_str.split('-')
        # אם המחרוזת מתחלקת לשני חלקים, זה טווח תקין
        if len(parts) == 2:
            try:
                # פירוק תאריכי ההתחלה והסיום, ועיצובם לפורמט קריא
                start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
                end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
                return start_date, end_date
            except ValueError:
                # במקרה של שגיאת פירוק, לא לעשות כלום (יתומקד מתחת)
                pass  
    # אם זה לא טווח או אם נכשל בפירוק הטווח, לנסות לפרק כתאריך יחיד
    try:
        parsed_date = parser.parse(date_str, fuzzy=True)
        # לעצב את התאריך היחיד לפורמט של start_date ולפורמט אחר לend_date
        start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
        end_date = parsed_date.strftime('%a, %b %d')  # Omitting time for end_date
        return start_date, end_date
    except ValueError:
        # להחזיר NaN לשני התאריכים אם נכשל הפירוק
        return np.nan, np.nan  

# פונקציה למיצוע תאריך מפורט, זמן ויום ממחרוזת תאריך
def extract_date_time_day(date_str):
    try:
        # פירוק המחרוזת תאריך, מאפשר גם כמה גמישות בפורמט הקלט
        parsed_date = parser.parse(date_str, fuzzy=True)
        # מיצוע ועיצוב החלקים של התאריך, הזמן והיום
        date = parsed_date.strftime('%Y-%m-%d')
        day = parsed_date.strftime('%a')
        # לקבוע אם המחרוזת המקורית כללה רכיב זמן
        time_component = parsed_date.strftime('%I:%M %p') not in date_str
        time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
    except ValueError:
        # אם נכשל הפירוק, לקבוע תאריך, זמן ויום כNaN
        date, time, day = np.nan, np.nan, np.nan
    
    return date, time, day

# החילוץ פונקצית parse_dates על פני הטבלה, יצירת עמודות חדשות לתאריכי ההתחלה והסיום
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)

# מחיקת שורות שבהן Start_Date הוא NaN, מה שמעיד על כישלון בפירוק
dataframe = dataframe1.dropna(subset=['Start_Date'])

# החילוץ extract_date_time_day לחלוקת התאריכים ההתחלה והסיום לעמודות נפרדות לתאריך, זמן ויום
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))

# הסרת העמודה 'Datetime' המקורית מאחר שהיא כבר לא נחוצה
dataframe=dataframe.drop(['Datetime'], axis=1)

# המרת 'Start_Date' ו'End_Date' לפורמט datetime, והוצאת רק החלק התאריך
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date

# המרת 'Start_Time' לפורמט datetime, שמרת על מידע הזמן
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])

קטע הקוד הזה משתמש בpandas עם חבילות datetime ו-dateutil של Python לעיצוב הנתונים.

יצירת ממשקים מוטבעים

כדי להבין ולחפש אירועים באופן עמוק, ניצור ממשקים מוטבעים מתיאוריהם באמצעות text-embedding-3-small. ממשקים אלו לוקחים את המהות התחבירית של כל אירוע, ועוזרים ליישום להחזיר תוצאות טובות יותר.

Python

 

     # הכנסת ספריית OpenAI לגישה ל-API.
from openai import OpenAI 
# התחלת �לקוח OpenAI עם מפתח API.
openai_client = OpenAI(api_key="your_openai_api_key_here")
# פונקציה לקבלת ממשקים טקסט 
def get_embedding(text, model="text-embedding-3-small"):
    return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# חילוץ וקטורי הממשקים מאובייקט הממשקים
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# הוספת ממשקים כעמודה חדשה ב-DataFrame.
dataframe['Description_Embeddings'] = embeddings_series

עכשיו, נכניס את ה-DataFrame החדש עם הממשקים ל-MyScale.

חיבור עם MyScale

כפי שנדון בהתחלה, נשתמש ב-MyScale כמסד נתונים וקטורי לאחסון וניהול נתונים. כאן, נתחבר ל-MyScale לשם הכנה לאחסון נתונים.

Python

 

     import clickhouse_connect
client = clickhouse_connect.get_client(
    host='host_name_here',
    port=443,
    username='username_here',
    password='passwd_here'
)

הגדרת החיבור הזו מבטיחה שהיישום שלנו יוכל לתקשר עם MyScale, ולהשתמש בכוחם של SQL לשימוש בניהול נתונים וניתוח.

הערה: ראה פרטי החיבור למידע נוסף איך להתחבר לקלאסטר MyScale.

יצירת טבלאות ואינדקסים באמצעות MyScale

כעת נוצר טבלה על פי DataFrame שלנו. כל הנתונים יאוחסנו בטבלה זו, כולל הממשקים.

Python

 

     client.command("""
    CREATE TABLE default.Events (
    Name String,
    Description String,
    Users_Going Int64,
    Users_Interested Int64,
    Users_Responded Int64,
    City String,
    Organized_By String,
    Street_Address String,
    Start_Date Date32,
    End_Date Nullable(Date32),
    Start_Time Nullable(DateTime64),
    Start_Day String,
    End_Day String,
    Description_Embeddings Array(Float32),
    CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
    ) ENGINE = MergeTree()
    ORDER BY (Name);
    """)

הביטויים SQL לעיל יוצרים טבלה בשם Events בצבר. ה-CONSTRAINT מוודא שכל הווקטורים המרומזים הם באורך זהה 1536.

אחסון הנתונים ויצירת אינדקס ב-MyScale

בשלב זה, אנו מכניסים את הנתונים המעובדים ל-MyScale. זה כרוך בהכנסת הנתונים בחציון כדי להבטיח אחסון ושליפה יעילים.

Python

 

     batch_size = 10  # Adjust based on your needs

num_batches = len(dataframe) // batch_size

for i in range(num_batches):
    start_idx = i * batch_size
    end_idx = start_idx + batch_size
    batch_data = dataframe[start_idx:end_idx]
    # print(batch_data["Description_Embeddings"])
    client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
    print(f"Batch {i+1}/{num_batches} inserted.")

client.command("""
ALTER TABLE default.Events
    ADD VECTOR INDEX vector_index Description_Embeddings
    TYPE MSTG
""")

באמצעות pandas, הקוד לעיל מעביר ביעילות את המאגר המוכנים שלנו למסד הנתונים של MyScale.

ניתוח נתונים באמצעות MyScale

לבסוף, אנו משתמשים ביכולות הניתוח של MyScale כדי לבצע ניתוח ולאפשר חיפוש סמנטי. על ידי ביצוע שאילתות SQL, אנו יכולים לנתח אירועים על פי נושאים, מיקומים ותאריכים. אז בואו ננסה לכתוב כמה שאילתות.

שאילתת SQL פשוטה

בואו ננסה קודם כל לקבל את 10 התוצאות הראשונות מהטבלה.

Python

 

     results=client.query("""
        SELECT Name,Description FROM default.Events LIMIT 10
    """)
for row in results.named_results():
        print(row["Name"])
        print(row['Description'])

שאילתה זו תחזיר פשוט את 10 התוצאות הראשונות מהטבלה events.

גילוי אירועים על פי רלוונטיות סמנטית

בואו ננסה למצוא את 10 האירועים הקרובים ביותר עם תחושה דומה לאירוע מופנה, כמו זה: "אחד המופעים הממוקדים בארץ – פועל מאז 1974 …עכשיו בשנת ה-50 שלנו !!! השנקנטדי שלנו". זה מתבצע על ידי השוואת הטקסטים הסמנטיים של תיאורי האירועים, מה שמבטיח התאמה בנושאים וברגשות.

Python

 

embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, Description,
        distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
    """)
for row in results.named_results():
        print("Title of the event  ", row["Name"])
        print("Description of the event  ", row['Description'])
        print("Distance : ", row["dist"])

אירועים מפופולריים על פי פופולריות

שאילתה זו מדרגת את 10 האירועים העיקריים לפי מספר המשתתפים והמשתמשים המעוניינים, ומדגישה אירועים פופולאריים מפסטיבלים בעירונים גדולים עד לועידות גדולות. היא מתאימה למי שמחפש להצטרף לאירועים גדולים ואנרגטיים.

Python

 

     results = client.query(f"""
        SELECT Name, City, Users_Going, Users_Interested, Users_Responded
        FROM default.Events
        ORDER BY Users_Going DESC, Users_Interested DESC
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("City ", row["City"])
        print("Users Going ", row["Users_Going"])
        print("Interested Users ", row["Users_Interested"])

אירועים מקומיים פופולאריים בניו יורק

על ידי שילוב רלוונטיות ופופולאריות, שאילתה זו מזהה אירועים דומים בעיר ניו יורק הקשורים לאירוע ספציפי ומדרגת אותם לפי נוכחות, מה שמספק רשימה מעודדת שמשקפת את התרבות החייה של העיר ומושכת עניין מקומי.

Python

 

     embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
        SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
        FROM default.Events
        WHERE City LIKE '%New York%' and dist < 1.5
        ORDER BY Users_Going DESC,dist
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("Description ", row["Description"])
        print("Users Going ", row["Users_Going"])

מארגני אירועים מובילים

שאילתה זו מדרגת את 10 המארגנים האירועים המובילים לפי סך המשתתפים והמשתמשים המעוניינים, ומדגישה את אלו שמצטיינים ביצירת אירועים מרתקים ומשכים קהלים גדולים. היא מספקת תובנות למתכנני אירועים ולמשתתפים שמעוניינים באירועים ברמה העליונה.

Python

 

     # איזה לקוח משכה את המספר הגדול ביותר של משתמשים 
results = client.query(f"""
       SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
        FROM default.Events
        GROUP BY Organized_By
        ORDER BY Total_Users DESC
        Limit 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Organized_By"])
        print("Total_Users ", row["Total_Users"])

יישום RAG

קודם כבר חקרנו את MyScale לניתוח נתונים, והדגשנו את יכולותיו בשיפור זרימת העבודה של הנתונים שלנו. מעתה ואילך, נלך צעד נוסף קדימה על ידי יישום Retrieval-Augmented Generation (RAG), מסגרת חדשנית המשלבת מאגר ידע חיצוני עם LLMs. צעד זה יעזור לכם להבין את הנתונים שלכם טוב יותר ולגלות תובנות מפורטות יותר. לאחר מכן, תראו כיצד להשתמש ב-RAG עם MyScale, מה שיהפוך את העבודה עם נתונים למעניינת ופרודוקטיבית יותר.

Python

 

     from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

query="Can you please suggest me some events related to basketball"
# השתמש בשיטת get_embedding המוגדרת לעיל, היא מקבלת רשימה של משפטים
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
        FROM default.Events
        ORDER BY Users_Going DESC,dist
        LIMIT 5
    """)
PROMPT_TEMPLATE = """
Your objective is to formulate a response to a question using only the information provided below:
{context}
---
Your task is to carefully analyze the provided context and provide an answer to the following question based solely on the information given:
{question}
"""
# שלב את התיאורים של התוצאות העליונות. 
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)

לאורך כל הבלוג הזה, צפינו ש-MyScale הוא הרבה יותר מרשת וקטורים שניתן להשתמש בה לפיתוח של כל מיני יישומים. ניתן להשתמש בו כמסד נתונים SQL פשוט או ליישומים AI מתקדמים, הכוללים את רוב תחום הפיתוח. אנו מעודדים אתכם לנסות ולחקור את התכונות המתקדמות על ידי הרשמה לחבילת החינם וקבלת 5 מיליון מאגר וקטורים בחינם.

מסקנה

חקרנו את היכולות והפונקציות של MyScale עם Apify Scraper במהלך תהליך פיתוח יישום לניתוח אירועים. MyScale הדגים את יכולותיו המעולות בחיפוש וקטורי בכיף בעודו שומר על כל הפונקציות של מסדי נתונים SQL, מה שמאפשר למפתחים לבצע חיפושים סמנטיים בשפת SQL המוכרת עם מהירות ודיוק רבים יותר.

יכולות MyScale אינן מוגבלות רק ליישום זה: תוכלו לאמץ אותו כדי לפתח כל יישומי AI באמצעות השיטה RAG

אם יש לכם משוב או הצעות, אנא צורו קשר עםנו.

Source:
https://dzone.com/articles/performing-advanced-facebook-event-data-analysis-w