利用向量資料庫進行高級Facebook事件數據分析

在今日的數位時代,各行各業的專業人士都必須持續關注即將到來的活動、會議和研討會。然而,在浩瀚的線上資訊中有效地尋找到與個人興趣相符的活動,是一大挑戰。

本篇部落格介紹了一個創新的解決方案:一個全面的應用程式,專門用於從Facebook抓取活動資料,並使用MyScale分析這些抓取的數據。雖然MyScale通常與RAG技術堆疊或作為向量資料庫相關聯,但其功能遠超過這些領域。我們將利用其進行數據分析,利用其向量搜索功能來分析語義上相似的活動,從而提供更好的結果和見解。

你可能注意到Grok AI利用了Qdrant向量資料庫作為搜索引擎,從X(前稱Twitter)數據中檢索即時資訊。透過將MyScale與其他平台如Apify整合,你也可以這樣評估向量資料庫的力量,透過開發簡單的個人化應用程式來增強日常生活的任務。

因此,在本篇部落格中,讓我們開發一個應用程式,只需輸入城市名稱,就能從Facebook抓取所有相關活動。隨後,我們將使用MyScale的高級SQL向量功能進行數據分析和語義搜索。

工具與技術

我們將使用多種工具,包括Apify、MyScale和OpenAI,來開發這款實用應用程式。

  • Apify: 一個廣受歡迎的網頁抓取及自動化平台,顯著簡化了數據收集流程。它具備抓取數據並將其輸入至大型語言模型(LLMs)的能力,讓我們能夠利用即時數據訓練LLMs並開發應用程式。
  • MyScale: MyScale是一款SQL向量資料庫,我們用它來高效存儲和處理結構化與非結構化數據。
  • OpenAI: 我們將使用OpenAI的模型text-embedding-3-small來獲取文本嵌入,然後將這些嵌入保存到MyScale中,以便進行數據分析和語義搜尋。

如何設置MyScale和Apify

要開始設置MyScale和Apify,您需要創建一個新目錄和一個Python文件。您可以通過打開終端或命令行並輸入以下命令來完成此操作:

Shell

 

mkdir MyScale
cd MyScale
touch main.ipynb

讓我們安裝所需的套件。複製下面的命令,並將其粘貼到您的終端中。這些套件將提供我們開發應用程式所需的工具和庫。

Shell

 

pip install openai apify-client clickhouse-connect pandas numpy

這應該會在您的系統中安裝所有依賴項。為了確認一切安裝正確,您可以在終端中輸入以下命令。

Shell

 

pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'

此處應列出所有已安裝的依賴項及其版本。若發現有任何遺漏的依賴項,可能需要重新運行該特定包的安裝命令。現在,安裝完成後,我們即可開始編寫代碼。

注意:我們將在Python筆記本中工作。將每個代碼塊視為一個筆記本單元。

如何使用Apify進行數據抓取

接下來,我們將利用Apify API抓取紐約市的事件數據,使用Facebook Events 爬蟲

Python

 

     import pandas as pd
from apify_client import ApifyClient
# 使用您的API令牌初始化ApifyClient
client = ApifyClient("Enter_your_apify_key_here")

# 準備Actor輸入
run_input = {
    "searchQueries": ["Sport New York"],
    "startUrls": [],
    "maxEvents": 50,
}

# 運行Actor並等待其完成
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)

df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)

for item in client.dataset(run["defaultDatasetId"]).iterate_items():
    # 使用字典推導式將None值替換為空字符串
    row = {
        'Name': item.get('name', ''),
        'Datetime': item.get('dateTimeSentence', ''),
        'Description': item.get('description', ''),
        'Users_Going': item.get('usersGoing', ''),
        'Users_Interested': item.get('usersInterested', ''),
        'Users_Responded': item.get('usersResponded', ''),
        'City': item.get('location', {}).get('city', '') if item.get('location') else '',
        'Organized_By': item.get('organizedBy', ''),
        'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
    }
    # 確保所有None值均被空字符串替換
    row = {k: (v if v is not None else '') for k, v in row.items()}
    dataframe1 = dataframe1._append(row, ignore_index=True)

# 清理數據
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)

此腳本以pandas DataFrame的形式提供我們即將到來的事件詳情。

注意:請勿忘記在上面的腳本中添加您的Apify API金鑰。您可以在Apify控制台的整合頁面找到您的API令牌。

數據預處理

當我們收集原始數據時,它們以各種格式出現。在本腳本中,我們將把事件日期統一成單一格式,以便更有效地進行數據篩選。

Python

 

     # 導入必要庫進行資料處理及日期解析
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser

# 函數用於解析可能代表範圍或單一日期的日期字串
def parse_dates(date_str):
    # 檢查日期字串是否包含連字號,指示範圍
    if '-' in date_str:
        parts = date_str.split('-')
        # 如果字串分割成兩部分,則為有效範圍
        if len(parts) == 2:
            try:
                # 解析開始和結束日期,將其格式化為易讀格式
                start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
                end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
                return start_date, end_date
            except ValueError:
                # 解析錯誤時不採取行動(將於下方處理)
                pass  
    # 如果不是範圍或解析範圍失敗,嘗試解析為單一日期
    try:
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 對開始日期格式化單一日期,並對結束日期採用不同格式
        start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
        end_date = parsed_date.strftime('%a, %b %d')  # Omitting time for end_date
        return start_date, end_date
    except ValueError:
        # 解析失敗時,對兩個日期均返回NaN
        return np.nan, np.nan  

# 函數用於從日期字串中提取詳細的日期、時間和星期
def extract_date_time_day(date_str):
    try:
        # 解析日期字串,允許輸入格式具有一定靈活性
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 提取並格式化日期、時間和星期部分
        date = parsed_date.strftime('%Y-%m-%d')
        day = parsed_date.strftime('%a')
        # 確定原始字串是否包含時間組件
        time_component = parsed_date.strftime('%I:%M %p') not in date_str
        time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
    except ValueError:
        # 解析失敗時,將日期、時間和星期設為NaN
        date, time, day = np.nan, np.nan, np.nan
    
    return date, time, day

# 對數據框應用parse_dates函數,創建開始和結束日期的新列
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)

# 刪除Start_Date為NaN的行,表示解析不成功
dataframe = dataframe1.dropna(subset=['Start_Date'])

# 應用extract_date_time_day將開始和結束日期分割成單獨的日期、時間和星期列
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))

# 移除不再需要的原始'Datetime'列
dataframe=dataframe.drop(['Datetime'], axis=1)

# 將'Start_Date'和'End_Date'轉換為日期時間格式,僅提取日期部分
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date

# 將'Start_Time'轉換為日期時間格式,保留時間信息
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])

此程式碼片段使用Python的pandasdatetimedateutil套件來格式化資料。

生成嵌入向量

為了深入理解和搜尋事件,我們將使用text-embedding-3-small從事件描述中生成嵌入向量。這些嵌入向量捕捉了每個事件的語義本質,有助於應用程式返回更佳結果。

Python

 

     # 導入OpenAI庫以存取API。
from openai import OpenAI 
# 使用API金鑰初始化OpenAI客戶端。
openai_client = OpenAI(api_key="your_openai_api_key_here")
# 獲取文字嵌入的函數 
def get_embedding(text, model="text-embedding-3-small"):
    return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# 從嵌入物件中提取嵌入向量
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# 將嵌入向量新增為DataFrame中的新欄位。
dataframe['Description_Embeddings'] = embeddings_series

現在,我們將把包含嵌入向量的新DataFrame插入MyScale。

連接MyScale

如開始所述,我們將使用MyScale作為向量資料庫來儲存和管理資料。在此,我們將連接到MyScale,為資料儲存做準備。

Python

 

     import clickhouse_connect
client = clickhouse_connect.get_client(
    host='host_name_here',
    port=443,
    username='username_here',
    password='passwd_here'
)

此連接設定確保我們的應用程式能與MyScale通訊,並利用SQL的力量進行資料操作和分析。

注意:關於如何連接到MyScale叢集的更多資訊,請參閱連線詳情

使用MyScale建立表格和索引

我們現在根據我們的DataFrame建立一個表格。所有資料,包括嵌入向量,都將儲存在此表格中。

Python

 

     client.command("""
    CREATE TABLE default.Events (
    Name String,
    Description String,
    Users_Going Int64,
    Users_Interested Int64,
    Users_Responded Int64,
    City String,
    Organized_By String,
    Street_Address String,
    Start_Date Date32,
    End_Date Nullable(Date32),
    Start_Time Nullable(DateTime64),
    Start_Day String,
    End_Day String,
    Description_Embeddings Array(Float32),
    CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
    ) ENGINE = MergeTree()
    ORDER BY (Name);
    """)

上述SQL語句在集群上創建了一個名為Events的表。CONSTRAINT確保所有向量嵌入的長度均為1536

在MyScale中存儲數據並創建索引

在此步驟中,我們將處理後的數據插入MyScale。這涉及批量插入數據以確保高效的存儲和檢索。

Python

 

     batch_size = 10  # Adjust based on your needs

num_batches = len(dataframe) // batch_size

for i in range(num_batches):
    start_idx = i * batch_size
    end_idx = start_idx + batch_size
    batch_data = dataframe[start_idx:end_idx]
    # print(batch_data["Description_Embeddings"])
    client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
    print(f"Batch {i+1}/{num_batches} inserted.")

client.command("""
ALTER TABLE default.Events
    ADD VECTOR INDEX vector_index Description_Embeddings
    TYPE MSTG
""")

使用pandas,上述代碼高效地將我們準備好的數據集傳輸到MyScale數據庫中。

使用MyScale進行數據分析

最後,我們利用MyScale的分析能力進行分析並啟用語義搜索。通過執行SQL查詢,我們可以根據主題、地點和日期分析事件。那麼,讓我們嘗試編寫一些查詢。

簡單的SQL查詢

首先,讓我們嘗試從表中獲取前10個結果

Python

 

     results=client.query("""
        SELECT Name,Description FROM default.Events LIMIT 10
    """)
for row in results.named_results():
        print(row["Name"])
        print(row['Description'])

此查詢將簡單地返回events表中的前10個結果。

通過語義相關性發現事件

讓我們嘗試找到與參考事件類似的即將到來的前10個事件,例如:“國內運行時間最長的節目之一 – 自1974年以來一直運營…現在是我們的第50年!!!我們的Schenectady”。這是通過比較事件描述的語義嵌入來實現的,確保在主題和情感上匹配。

Python

 

embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, Description,
        distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
    """)
for row in results.named_results():
        print("Title of the event  ", row["Name"])
        print("Description of the event  ", row['Description'])
        print("Distance : ", row["dist"])

按人氣趨勢的事件

此查詢根據參與人數和感興趣用戶數量,排名前10大熱門活動,從大型城市節慶到主要會議,突顯受歡迎的活動。適合尋求參與大型、活力充沛聚會的人士。

Python

 

     results = client.query(f"""
        SELECT Name, City, Users_Going, Users_Interested, Users_Responded
        FROM default.Events
        ORDER BY Users_Going DESC, Users_Interested DESC
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("City ", row["City"])
        print("Users Going ", row["Users_Going"])
        print("Interested Users ", row["Users_Interested"])

紐約熱門本地活動

此查詢結合相關性和受歡迎程度,識別紐約市內與特定活動相關的類似活動,並按參與人數排序,提供一個反映城市活躍文化和吸引當地興趣的精選活動列表。

Python

 

     embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
        SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
        FROM default.Events
        WHERE City LIKE '%New York%' and dist < 1.5
        ORDER BY Users_Going DESC,dist
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("Description ", row["Description"])
        print("Users Going ", row["Users_Going"])

領先的活動主辦方

此查詢根據總參與人數和感興趣用戶數量,排名前10大活動主辦方,突顯那些在創造吸引人的活動和吸引大量觀眾方面表現出色的主辦方。為活動策劃者和對頂級活動感興趣的參與者提供洞察。

Python

 

     # 哪位客戶吸引了最多用戶 
results = client.query(f"""
       SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
        FROM default.Events
        GROUP BY Organized_By
        ORDER BY Total_Users DESC
        Limit 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Organized_By"])
        print("Total_Users ", row["Total_Users"])

實施RAG

先前,我們已探討了MyScale在數據分析中的應用,強調其在提升數據工作流程方面的能力。接下來,我們將進一步採用Retrieval-Augmented Generation(RAG)這一創新框架,它結合了外部知識庫與大型語言模型(LLMs)。此舉將助您更深入地理解數據,並發掘更詳盡的洞察。隨後,您將了解如何將RAG與MyScale結合使用,使數據工作更加有趣且高效。

Python

 

     from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

query="Can you please suggest me some events related to basketball"
# 使用上述定義的get_embedding方法,它接受一個句子列表
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
        FROM default.Events
        ORDER BY Users_Going DESC,dist
        LIMIT 5
    """)
PROMPT_TEMPLATE = """
Your objective is to formulate a response to a question using only the information provided below:
{context}
---
Your task is to carefully analyze the provided context and provide an answer to the following question based solely on the information given:
{question}
"""
# 合併頂級結果的描述。 
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)

在本篇博客中,我們觀察到MyScale不僅僅是一個向量數據庫,它可用於開發各種應用。我們可以將其作為簡單的SQL數據庫使用,或是用於高級AI應用,涵蓋了大部分開發領域。我們鼓勵您親自嘗試,通過註冊免費層級並獲得500萬免費向量存儲,來探索其先進功能。

結論

我們已通過開發事件分析應用程序的過程,探索了MyScale與Apify Scraper的能力和功能。MyScale展示了其在高性能向量搜索方面的卓越能力,同時保留了SQL數據庫的所有功能,幫助開發者使用熟悉的SQL語法進行語義搜索,速度和準確性都有顯著提升。

MyScale 的功能不僅限於此應用:您可以採用它來開發任何使用 RAG 方法的人工智慧應用。 

如有任何反饋或建議,請隨時聯繫我們。

Source:
https://dzone.com/articles/performing-advanced-facebook-event-data-analysis-w