בעידן הדיגיטלי של היום, מקצוענים מכל התעשיות חייבים להישאר מעודכנים עם אירועים עתידיים, כנסים וסדנאות. עם זאת, מציאת אירועים המתאימים לתחומי העניין של אדם בקרב הים העצום של מידע מקוון מהווה אתגר משמעותי.
בבלוג זה מציגים פתרון חדש לאתגר זה: יישום מקיף שמושך נתוני אירועים מפייסבוק ומנתח את הנתונים המושפעים באמצעות MyScale. בעוד ש-MyScale קשור בדרך כלל לענף RAG או משמש כמסד נתונים וקטורי, היכולות שלו מעבר לתחומים אלה. נשתמש בו לניתוח נתונים, משתמשים בפונקציות החיפוש הווקטוריות שלו לניתוח אירועים הדומים באופן סמנטי, כך מספקים תוצאות ותובנות טובות יותר.
ייתכן שתבחין ש-Grok AI השתמש במסד נתונים וקטורי Qdrant כמנוע חיפוש כדי להשיג מידע בזמן אמת מ-X (בעבר ידוע כטוויטר). אפשר גם להעריך את כוחם של מסדי נתונים וקטורי בדרך זו עם MyScale על ידי שילוב MyScale עם רשתות אחרות כמו Apify כדי לשפר משימות חיים יומיומיות באמצעות פיתוח יישומים מותאמים אישית פשוטים.
אז בבלוג זה, בואו נפתח יישום שלוקח רק את שם העיר כקלט ומושך את כל האירועים הקשורים מפייסבוק. לאחר מכן, נבצע ניתוח נתונים וחיפוש סמנטי באמצעות היכולות הSQL המתקדמות של MyScale.
כלים וטכנולוגיות
נשתמש במספר כלים, כולל Apify, MyScale ו-OpenAI, לפיתוח האפליקציה המועילה הזו.
- Apify: פלטפורמת סקריפינג ואוטומציה פופולרית שמאפשרת לצמצם משמעותית את תהליך איסוף הנתונים. היא מספקת את היכולת לסרוק נתונים ולאחר מכן להזרים אותם ל-LLMs. זה מאפשר לנו לאמן LLMs על נתונים בזמן אמת ולפתח יישומים.
- MyScale: MyScale היא מסד נתונים וקטורי SQL שאנו משתמשים בו לאחסון ועיבוד נתונים מובנים ולא מובנים בצורה מותאמת אישית.
- OpenAI: נשתמש במודל
text-embedding-3-small
מ-OpenAI כדי לקבל את ההטמעות של הטקסט ולאחר מכן לשמור את ההטמעות הללו ב-MyScale לניתוח נתונים וחיפושים סמנטיים.
איך להגדיר את MyScale ו-Apify
כדי להתחיל להגדיר את MyScale ו-Apify, תצטרך ליצור ספרייה חדשה וקובץ Python. תוכל לעשות זאת על ידי פתיחת המסוף או שורת הפקודה שלך והזנת הפקודות הבאות:
mkdir MyScale
cd MyScale
touch main.ipynb
בואו נתקין את החבילות. העתק את הפקודה שלמטה והדבק אותה במסוף שלך. חבילות אלו יספקו את הכלים והספריות הנחוצים לפיתוח האפליקציה שלנו.
pip install openai apify-client clickhouse-connect pandas numpy
זה צריך להתקין את כל התלמים במערכת שלך. כדי לאמת שהכל מותקן כראוי, תוכל להזין את הפקודה הבאה במסוף שלך.
pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'
זה צריך לכלול את כל ההסמכות המותקנות עם גרסאותיהן. אם אתה מציין שיש הסמכות חסרות, ייתכן שתצטרך להפעיל מחדש את פקודת ההתקנה עבור החבילה הספציפית הזו. עכשיו, אנו מוכנים לכתוב את הקוד שלנו לאחר ההתקנות.
הערה: אנו נעבוד במחברת פייתון. שקול כל בלוק קוד תא מחברת.
כיצד לגשת לנתונים עם Apify
עכשיו, אנו נשתמש ב-API של Apify כדי לגשת לנתוני אירועים של ניו יורק על ידי שימוש ב-מסננת אירועי פייסבוק.
import pandas as pd
from apify_client import ApifyClient
# אתחול ApifyClient עם תעודת ה-API שלך
client = ApifyClient("Enter_your_apify_key_here")
# הכנת קלט השחקן
run_input = {
"searchQueries": ["Sport New York"],
"startUrls": [],
"maxEvents": 50,
}
# הרץ את השחקן וחכה עד שהוא יסתיים
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)
df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)
for item in client.dataset(run["defaultDatasetId"]).iterate_items():
# השתמש בהערכת מילון כדי להחליף ערכים None במחרוזת ריקה
row = {
'Name': item.get('name', ''),
'Datetime': item.get('dateTimeSentence', ''),
'Description': item.get('description', ''),
'Users_Going': item.get('usersGoing', ''),
'Users_Interested': item.get('usersInterested', ''),
'Users_Responded': item.get('usersResponded', ''),
'City': item.get('location', {}).get('city', '') if item.get('location') else '',
'Organized_By': item.get('organizedBy', ''),
'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
}
# דאג לכך שכל הערכים None מוחלפים במחרוזת ריקה
row = {k: (v if v is not None else '') for k, v in row.items()}
dataframe1 = dataframe1._append(row, ignore_index=True)
# ניקוי הנתונים
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)
תסריט זה נותן לנו את פרטי האירועים הקרובים בצורה של pandas
DataFrame.
הערה: אל תשכחו להוסיף את מפתח ה-API של Apify בתוך התסריט לעיל. אפשר למצוא את האסימון של ה-API בדף Integrations בחשבון Apify Console.
הכנת נתונים מקדימה
כשאנו אוספים נתונים גולמיים, הם מגיעים בפורמטים שונים. בתסריט זה, נביא את תאריכי האירועים לפורמט אחיד כדי שנוכל לבצע את סינון הנתונים בצורה יעילה יותר.
# הוספת ספריות הנחוצות לניהול נתונים ופירוק תאריכים
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser
# פונקציה לפירוק מחרוזות תאריך שעשויות לייצג טווח או תאריך יחיד
def parse_dates(date_str):
# בדיקה אם מחרוזת התאריך מכילה מקף, מה שמעיד על טווח
if '-' in date_str:
parts = date_str.split('-')
# אם המחרוזת מתחלקת לשני חלקים, זה טווח תקין
if len(parts) == 2:
try:
# פירוק תאריכי ההתחלה והסיום, ועיצובם לפורמט קריא
start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
return start_date, end_date
except ValueError:
# במקרה של שגיאת פירוק, לא לעשות כלום (יתומקד מתחת)
pass
# אם זה לא טווח או אם נכשל בפירוק הטווח, לנסות לפרק כתאריך יחיד
try:
parsed_date = parser.parse(date_str, fuzzy=True)
# לעצב את התאריך היחיד לפורמט של start_date ולפורמט אחר לend_date
start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
end_date = parsed_date.strftime('%a, %b %d') # Omitting time for end_date
return start_date, end_date
except ValueError:
# להחזיר NaN לשני התאריכים אם נכשל הפירוק
return np.nan, np.nan
# פונקציה למיצוע תאריך מפורט, זמן ויום ממחרוזת תאריך
def extract_date_time_day(date_str):
try:
# פירוק המחרוזת תאריך, מאפשר גם כמה גמישות בפורמט הקלט
parsed_date = parser.parse(date_str, fuzzy=True)
# מיצוע ועיצוב החלקים של התאריך, הזמן והיום
date = parsed_date.strftime('%Y-%m-%d')
day = parsed_date.strftime('%a')
# לקבוע אם המחרוזת המקורית כללה רכיב זמן
time_component = parsed_date.strftime('%I:%M %p') not in date_str
time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
except ValueError:
# אם נכשל הפירוק, לקבוע תאריך, זמן ויום כNaN
date, time, day = np.nan, np.nan, np.nan
return date, time, day
# החילוץ פונקצית parse_dates על פני הטבלה, יצירת עמודות חדשות לתאריכי ההתחלה והסיום
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)
# מחיקת שורות שבהן Start_Date הוא NaN, מה שמעיד על כישלון בפירוק
dataframe = dataframe1.dropna(subset=['Start_Date'])
# החילוץ extract_date_time_day לחלוקת התאריכים ההתחלה והסיום לעמודות נפרדות לתאריך, זמן ויום
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))
# הסרת העמודה 'Datetime' המקורית מאחר שהיא כבר לא נחוצה
dataframe=dataframe.drop(['Datetime'], axis=1)
# המרת 'Start_Date' ו'End_Date' לפורמט datetime, והוצאת רק החלק התאריך
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date
# המרת 'Start_Time' לפורמט datetime, שמרת על מידע הזמן
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])
קטע הקוד הזה משתמש בpandas
עם חבילות datetime
ו-dateutil
של Python לעיצוב הנתונים.
יצירת ממשקים מוטבעים
כדי להבין ולחפש אירועים באופן עמוק, ניצור ממשקים מוטבעים מתיאוריהם באמצעות text-embedding-3-small
. ממשקים אלו לוקחים את המהות התחבירית של כל אירוע, ועוזרים ליישום להחזיר תוצאות טובות יותר.
# הכנסת ספריית OpenAI לגישה ל-API.
from openai import OpenAI
# התחלת �לקוח OpenAI עם מפתח API.
openai_client = OpenAI(api_key="your_openai_api_key_here")
# פונקציה לקבלת ממשקים טקסט
def get_embedding(text, model="text-embedding-3-small"):
return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# חילוץ וקטורי הממשקים מאובייקט הממשקים
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# הוספת ממשקים כעמודה חדשה ב-DataFrame.
dataframe['Description_Embeddings'] = embeddings_series
עכשיו, נכניס את ה-DataFrame
החדש עם הממשקים ל-MyScale.
חיבור עם MyScale
כפי שנדון בהתחלה, נשתמש ב-MyScale כמסד נתונים וקטורי לאחסון וניהול נתונים. כאן, נתחבר ל-MyScale לשם הכנה לאחסון נתונים.
import clickhouse_connect
client = clickhouse_connect.get_client(
host='host_name_here',
port=443,
username='username_here',
password='passwd_here'
)
הגדרת החיבור הזו מבטיחה שהיישום שלנו יוכל לתקשר עם MyScale, ולהשתמש בכוחם של SQL לשימוש בניהול נתונים וניתוח.
הערה: ראה פרטי החיבור למידע נוסף איך להתחבר לקלאסטר MyScale.
יצירת טבלאות ואינדקסים באמצעות MyScale
כעת נוצר טבלה על פי DataFrame שלנו. כל הנתונים יאוחסנו בטבלה זו, כולל הממשקים.
client.command("""
CREATE TABLE default.Events (
Name String,
Description String,
Users_Going Int64,
Users_Interested Int64,
Users_Responded Int64,
City String,
Organized_By String,
Street_Address String,
Start_Date Date32,
End_Date Nullable(Date32),
Start_Time Nullable(DateTime64),
Start_Day String,
End_Day String,
Description_Embeddings Array(Float32),
CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
) ENGINE = MergeTree()
ORDER BY (Name);
""")
הביטויים SQL לעיל יוצרים טבלה בשם Events
בצבר. ה-CONSTRAINT
מוודא שכל הווקטורים המרומזים הם באורך זהה 1536
.
אחסון הנתונים ויצירת אינדקס ב-MyScale
בשלב זה, אנו מכניסים את הנתונים המעובדים ל-MyScale. זה כרוך בהכנסת הנתונים בחציון כדי להבטיח אחסון ושליפה יעילים.
batch_size = 10 # Adjust based on your needs
num_batches = len(dataframe) // batch_size
for i in range(num_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
batch_data = dataframe[start_idx:end_idx]
# print(batch_data["Description_Embeddings"])
client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
print(f"Batch {i+1}/{num_batches} inserted.")
client.command("""
ALTER TABLE default.Events
ADD VECTOR INDEX vector_index Description_Embeddings
TYPE MSTG
""")
באמצעות pandas
, הקוד לעיל מעביר ביעילות את המאגר המוכנים שלנו למסד הנתונים של MyScale.
ניתוח נתונים באמצעות MyScale
לבסוף, אנו משתמשים ביכולות הניתוח של MyScale כדי לבצע ניתוח ולאפשר חיפוש סמנטי. על ידי ביצוע שאילתות SQL, אנו יכולים לנתח אירועים על פי נושאים, מיקומים ותאריכים. אז בואו ננסה לכתוב כמה שאילתות.
שאילתת SQL פשוטה
בואו ננסה קודם כל לקבל את 10 התוצאות הראשונות מהטבלה.
results=client.query("""
SELECT Name,Description FROM default.Events LIMIT 10
""")
for row in results.named_results():
print(row["Name"])
print(row['Description'])
שאילתה זו תחזיר פשוט את 10 התוצאות הראשונות מהטבלה events
.
גילוי אירועים על פי רלוונטיות סמנטית
בואו ננסה למצוא את 10 האירועים הקרובים ביותר עם תחושה דומה לאירוע מופנה, כמו זה: "אחד המופעים הממוקדים בארץ – פועל מאז 1974 …עכשיו בשנת ה-50 שלנו !!! השנקנטדי שלנו". זה מתבצע על ידי השוואת הטקסטים הסמנטיים של תיאורי האירועים, מה שמבטיח התאמה בנושאים וברגשות.
embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
SELECT Name, Description,
distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
""")
for row in results.named_results():
print("Title of the event ", row["Name"])
print("Description of the event ", row['Description'])
print("Distance : ", row["dist"])
אירועים מפופולריים על פי פופולריות
שאילתה זו מדרגת את 10 האירועים העיקריים לפי מספר המשתתפים והמשתמשים המעוניינים, ומדגישה אירועים פופולאריים מפסטיבלים בעירונים גדולים עד לועידות גדולות. היא מתאימה למי שמחפש להצטרף לאירועים גדולים ואנרגטיים.
results = client.query(f"""
SELECT Name, City, Users_Going, Users_Interested, Users_Responded
FROM default.Events
ORDER BY Users_Going DESC, Users_Interested DESC
LIMIT 10
""")
for row in results.named_results():
print("Event Name ", row["Name"])
print("City ", row["City"])
print("Users Going ", row["Users_Going"])
print("Interested Users ", row["Users_Interested"])
אירועים מקומיים פופולאריים בניו יורק
על ידי שילוב רלוונטיות ופופולאריות, שאילתה זו מזהה אירועים דומים בעיר ניו יורק הקשורים לאירוע ספציפי ומדרגת אותם לפי נוכחות, מה שמספק רשימה מעודדת שמשקפת את התרבות החייה של העיר ומושכת עניין מקומי.
embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
FROM default.Events
WHERE City LIKE '%New York%' and dist < 1.5
ORDER BY Users_Going DESC,dist
LIMIT 10
""")
for row in results.named_results():
print("Event Name ", row["Name"])
print("Description ", row["Description"])
print("Users Going ", row["Users_Going"])
מארגני אירועים מובילים
שאילתה זו מדרגת את 10 המארגנים האירועים המובילים לפי סך המשתתפים והמשתמשים המעוניינים, ומדגישה את אלו שמצטיינים ביצירת אירועים מרתקים ומשכים קהלים גדולים. היא מספקת תובנות למתכנני אירועים ולמשתתפים שמעוניינים באירועים ברמה העליונה.
# איזה לקוח משכה את המספר הגדול ביותר של משתמשים
results = client.query(f"""
SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
FROM default.Events
GROUP BY Organized_By
ORDER BY Total_Users DESC
Limit 10
""")
for row in results.named_results():
print("Event Name ", row["Organized_By"])
print("Total_Users ", row["Total_Users"])
יישום RAG
קודם כבר חקרנו את MyScale לניתוח נתונים, והדגשנו את יכולותיו בשיפור זרימת העבודה של הנתונים שלנו. מעתה ואילך, נלך צעד נוסף קדימה על ידי יישום Retrieval-Augmented Generation (RAG), מסגרת חדשנית המשלבת מאגר ידע חיצוני עם LLMs. צעד זה יעזור לכם להבין את הנתונים שלכם טוב יותר ולגלות תובנות מפורטות יותר. לאחר מכן, תראו כיצד להשתמש ב-RAG עם MyScale, מה שיהפוך את העבודה עם נתונים למעניינת ופרודוקטיבית יותר.
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
query="Can you please suggest me some events related to basketball"
# השתמש בשיטת get_embedding המוגדרת לעיל, היא מקבלת רשימה של משפטים
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
FROM default.Events
ORDER BY Users_Going DESC,dist
LIMIT 5
""")
PROMPT_TEMPLATE = """
Your objective is to formulate a response to a question using only the information provided below:
{context}
---
Your task is to carefully analyze the provided context and provide an answer to the following question based solely on the information given:
{question}
"""
# שלב את התיאורים של התוצאות העליונות.
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)
לאורך כל הבלוג הזה, צפינו ש-MyScale הוא הרבה יותר מרשת וקטורים שניתן להשתמש בה לפיתוח של כל מיני יישומים. ניתן להשתמש בו כמסד נתונים SQL פשוט או ליישומים AI מתקדמים, הכוללים את רוב תחום הפיתוח. אנו מעודדים אתכם לנסות ולחקור את התכונות המתקדמות על ידי הרשמה לחבילת החינם וקבלת 5 מיליון מאגר וקטורים בחינם.
מסקנה
חקרנו את היכולות והפונקציות של MyScale עם Apify Scraper במהלך תהליך פיתוח יישום לניתוח אירועים. MyScale הדגים את יכולותיו המעולות בחיפוש וקטורי בכיף בעודו שומר על כל הפונקציות של מסדי נתונים SQL, מה שמאפשר למפתחים לבצע חיפושים סמנטיים בשפת SQL המוכרת עם מהירות ודיוק רבים יותר.
יכולות MyScale אינן מוגבלות רק ליישום זה: תוכלו לאמץ אותו כדי לפתח כל יישומי AI באמצעות השיטה RAG.
אם יש לכם משוב או הצעות, אנא צורו קשר עםנו.
Source:
https://dzone.com/articles/performing-advanced-facebook-event-data-analysis-w