ベクトルデータベースを用いた高度なFacebookイベントデータ分析

現代のデジタル時代において、すべての業界のプロフェッショナルは、開催予定のイベント、会議、ワークショップに関する最新情報を常に更新し続けなければなりません。しかし、膨大なオンライン情報の中から自分の興味に合致するイベントを効率的に見つけることは、大きな課題となっています。

このブログでは、この課題に対する革新的な解決策を紹介します。それは、Facebookからイベントデータをスクレイピングし、MyScaleを使用してスクレイピングしたデータを分析する包括的なアプリケーションです。MyScaleはRAGテックスタックやベクトルデータベースとして一般的に関連付けられていますが、その機能はこれらの分野を超えています。私たちはそのベクトル検索機能を活用して意味的に類似したイベントを分析し、より良い結果と洞察を提供します。

また、Grok AIがQdrantベクトルデータベースを検索エンジンとして使用してX(以前はTwitterと呼ばれていた)データからリアルタイム情報を取得することに気づくかもしれません。MyScaleを他のプラットフォーム(例えばApify)と統合することで、ベクトルデータベースの力をこのように評価し、シンプルな個人用アプリケーションの開発を通じて日常生活のタスクを強化することもできます。

そこでこのブログでは、都市名を入力として受け取り、Facebookから関連するすべてのイベントをスクレイピングするアプリケーションを開発しましょう。次に、MyScaleの高度なSQLベクトル機能を使用してデータ分析と意味的な検索を実行します。

ツールとテクノロジー

この便利なアプリケーションを開発するために、Apify、MyScale、OpenAIなどの複数のツールを使用します。

  • Apify: データ収集のプロセスを大幅に合理化する人気のウェブスクレイピングおよび自動化プラットフォームです。データをスクレイピングし、その後LLMに供給する能力を提供します。これにより、LLMをリアルタイムデータでトレーニングし、アプリケーションを開発できます。
  • MyScale: MyScaleはSQLベクトルデータベースであり、構造化および非構造化データを最適化された方法で保存および処理するために使用します。
  • OpenAI: OpenAIからtext-embedding-3-smallモデルを使用してテキストの埋め込みを取得し、それらの埋め込みをMyScaleに保存してデータ分析とセマンティック検索を行います。

MyScaleとApifyの設定方法

MyScaleとApifyの設定を開始するためには、新しいディレクトリとPythonファイルを作成する必要があります。ターミナルまたはコマンドラインを開き、以下のコマンドを入力してください。

Shell

 

mkdir MyScale
cd MyScale
touch main.ipynb

パッケージをインストールしましょう。以下のコマンドをコピーし、ターミナルに貼り付けてください。これらのパッケージは、アプリケーション開発に必要なツールやライブラリを提供します。

Shell

 

pip install openai apify-client clickhouse-connect pandas numpy

これで、システム内のすべての依存関係がインストールされたはずです。すべてが正しくインストールされていることを確認するために、ターミナルで以下のコマンドを入力できます。

Shell

 

pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'

これには、インストールされたすべての依存関係とそのバージョンが含まれているはずです。不足している依存関係を見つけた場合は、特定のパッケージのインストールコマンドを再実行する必要がある場合があります。これで、インストールが完了したのでコードを書く準備ができました。

注意:Pythonノートブックで作業します。各コードブロックをノートブックセルと考えてください。

Apifyを使用したデータスクレイピング

次に、Apify APIを使用して、Facebookイベントスクレイパーを使用して、ニューヨーク市のイベントデータをスクレイピングします。

Python

 

     import pandas as pd
from apify_client import ApifyClient
# APIトークンでApifyClientを初期化する
client = ApifyClient("Enter_your_apify_key_here")

# アクターの入力を準備する
run_input = {
    "searchQueries": ["Sport New York"],
    "startUrls": [],
    "maxEvents": 50,
}

# アクターを実行し、完了するまで待つ
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)

df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)

for item in client.dataset(run["defaultDatasetId"]).iterate_items():
    # 辞書の理解を使用してNone値を空の文字列で置き換える
    row = {
        'Name': item.get('name', ''),
        'Datetime': item.get('dateTimeSentence', ''),
        'Description': item.get('description', ''),
        'Users_Going': item.get('usersGoing', ''),
        'Users_Interested': item.get('usersInterested', ''),
        'Users_Responded': item.get('usersResponded', ''),
        'City': item.get('location', {}).get('city', '') if item.get('location') else '',
        'Organized_By': item.get('organizedBy', ''),
        'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
    }
    # すべてのNone値が空の文字列で置き換えられていることを確認する
    row = {k: (v if v is not None else '') for k, v in row.items()}
    dataframe1 = dataframe1._append(row, ignore_index=True)

# データのクリーニング
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)

このスクリプトは、pandas DataFrameの形式で今後のイベントの詳細を提供します。

注意: 上記スクリプトにApify APIキーを追加することを忘れないでください。APIトークンはApifyコンソールの統合ページで確認できます。

データ前処理

生データを収集する際、それは様々なフォーマットで来ます。このスクリプトでは、イベント日付を統一フォーマットに変換して、データフィルタリングをより効率的に行えるようにします。

Python

 

     # データ操作と日付解析に必要なライブラリをインポート
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser

# 範囲または単一の日付を表す可能性のある日付文字列を解析する関数
def parse_dates(date_str):
    # 日付文字列にダッシュが含まれているかどうかをチェックし、範囲を示します
    if '-' in date_str:
        parts = date_str.split('-')
        # 文字列が2つの部分に分割された場合、それは有効な範囲です
        if len(parts) == 2:
            try:
                # 開始日と終了日を解析し、読みやすい形式にフォーマットします
                start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
                end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
                return start_date, end_date
            except ValueError:
                # 解析エラーの場合は何もしない(以下で処理します)
                pass  
    # 範囲でない場合や範囲の解析が失敗した場合は、単一の日付として解析を試みます
    try:
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 開始日に対して単一の日付をフォーマットし、終了日に対しては異なるフォーマットを使用します
        start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
        end_date = parsed_date.strftime('%a, %b %d')  # Omitting time for end_date
        return start_date, end_date
    except ValueError:
        # 解析が失敗した場合は、両方の日付にNaNを返します
        return np.nan, np.nan  

# 日付文字列から詳細な日付、時刻、曜日を抽出する関数
def extract_date_time_day(date_str):
    try:
        # 入力形式に多少の柔軟性を許容して日付文字列を解析します
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 日付、時刻、および曜日の部分を抽出してフォーマットします
        date = parsed_date.strftime('%Y-%m-%d')
        day = parsed_date.strftime('%a')
        # 元の文字列に時間コンポーネントが含まれていたかどうかを判断します
        time_component = parsed_date.strftime('%I:%M %p') not in date_str
        time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
    except ValueError:
        # 解析に失敗した場合は、日付、時刻、および曜日をNaNに設定します
        date, time, day = np.nan, np.nan, np.nan
    
    return date, time, day

# parse_dates関数をデータフレーム全体に適用し、開始日と終了日の新しい列を作成します
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)

# Start_DateがNaNの行を削除し、解析が失敗したことを示します
dataframe = dataframe1.dropna(subset=['Start_Date'])

# extract_date_time_dayを適用して、開始日と終了日を個別の日付、時刻、および曜日の列に分割します
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))

# もはや必要でないため、元の'Datetime'列を削除します
dataframe=dataframe.drop(['Datetime'], axis=1)

# 'Start_Date'と'End_Date'をdatetime形式に変換し、日付のみを抽出します
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date

# 'Start_Time'をdatetime形式に変換し、時刻情報を保持します
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])

このコードスニペットは、pandasとPythonのdatetimeおよびdateutilパッケージを使用してデータをフォーマットします。

埋め込み生成

イベントを深く理解して検索するために、text-embedding-3-smallを使用して説明文から埋め込みを生成します。これらの埋め込みは、各イベントの意味的な本質を捉え、アプリケーションがより良い結果を返すのに役立ちます。

Python

 

     # OpenAIライブラリをインポートしてAPIアクセスを行います。
from openai import OpenAI 
# APIキーを使用してOpenAIクライアントを初期化します。
openai_client = OpenAI(api_key="your_openai_api_key_here")
# テキスト埋め込みを取得する関数 
def get_embedding(text, model="text-embedding-3-small"):
    return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# 埋め込みオブジェクトから埋め込みベクトルを抽出します
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# 埋め込みをDataFrameに新しい列として追加します。
dataframe['Description_Embeddings'] = embeddings_series

次に、埋め込みを含む新しいDataFrameをMyScaleに挿入します。

MyScaleとの接続

冒頭で述べたように、MyScaleをベクトルデータベースとしてデータの保存と管理に使用します。ここでは、データ保存の準備としてMyScaleに接続します。

Python

 

     import clickhouse_connect
client = clickhouse_connect.get_client(
    host='host_name_here',
    port=443,
    username='username_here',
    password='passwd_here'
)

この接続設定により、アプリケーションはMyScaleと通信し、SQLの力を使ってデータ操作と分析を行うことができます。

: MyScaleクラスターに接続する方法の詳細については、接続詳細を参照してください。

MyScaleを使用してテーブルとインデックスを作成

これからDataFrameに従ってテーブルを作成します。すべてのデータ、埋め込みを含めて、このテーブルに保存されます。

Python

 

     client.command("""
    CREATE TABLE default.Events (
    Name String,
    Description String,
    Users_Going Int64,
    Users_Interested Int64,
    Users_Responded Int64,
    City String,
    Organized_By String,
    Street_Address String,
    Start_Date Date32,
    End_Date Nullable(Date32),
    Start_Time Nullable(DateTime64),
    Start_Day String,
    End_Day String,
    Description_Embeddings Array(Float32),
    CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
    ) ENGINE = MergeTree()
    ORDER BY (Name);
    """)

上記のSQLステートメントは、クラスタ上にEventsという名前のテーブルを作成します。CONSTRAINTは、すべてのベクトル埋め込みが同じ長さ1536であることを保証します。

MyScaleでのデータの保存とインデックスの作成

このステップでは、処理されたデータをMyScaleに挿入します。これには、データの効率的な保存と取得を確保するためにバッチ挿入を行います。

Python

 

     batch_size = 10  # Adjust based on your needs

num_batches = len(dataframe) // batch_size

for i in range(num_batches):
    start_idx = i * batch_size
    end_idx = start_idx + batch_size
    batch_data = dataframe[start_idx:end_idx]
    # print(batch_data["Description_Embeddings"])
    client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
    print(f"Batch {i+1}/{num_batches} inserted.")

client.command("""
ALTER TABLE default.Events
    ADD VECTOR INDEX vector_index Description_Embeddings
    TYPE MSTG
""")

pandasを使用して、上記のコードは事前に準備されたデータセットをMyScaleデータベースに効率的に転送します。

MyScaleを使用したデータ分析

最後に、MyScaleの分析機能を使用して分析を実行し、セマンティック検索を有効にします。SQLクエリを実行することで、トピック、場所、日付に基づいてイベントを分析できます。それでは、いくつかのクエリを書いてみましょう。

シンプルなSQLクエリ

まずは上位10件の結果をテーブルから取得してみましょう。

Python

 

     results=client.query("""
        SELECT Name,Description FROM default.Events LIMIT 10
    """)
for row in results.named_results():
        print(row["Name"])
        print(row['Description'])

このクエリは、eventsテーブルから上位10件の結果を単純に返します。

セマンティック的な関連性によるイベントの発見

次に、「国で最も長く続くショーの一つ – 1974年以来運営中…今すぐ50年目!!! 私たちのシェネクタディ」のような参照イベントに類似した上位10件の関連性の高い今後のイベントを見つけてみましょう。これは、イベントの説明のセマンティック埋め込みを比較することで達成され、テーマや感情の一致を確保します。

Python

 

embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, Description,
        distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
    """)
for row in results.named_results():
        print("Title of the event  ", row["Name"])
        print("Description of the event  ", row['Description'])
        print("Distance : ", row["dist"])

人気のイベントトレンド

このクエリは、参加者数と興味を持つユーザー数に基づいてトップ10のイベントをランキングし、大都市の祭典から主要な会議までの人気イベントを強調します。大規模で活気ある集会に参加したい人に最適です。

Python

 

     results = client.query(f"""
        SELECT Name, City, Users_Going, Users_Interested, Users_Responded
        FROM default.Events
        ORDER BY Users_Going DESC, Users_Interested DESC
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("City ", row["City"])
        print("Users Going ", row["Users_Going"])
        print("Interested Users ", row["Users_Interested"])

ニューヨークの人気地元イベント

関連性と人気を組み合わせ、このクエリはニューヨーク市内の特定のイベントに関連する類似イベントを特定し、参加者数に基づいてそれらをランク付けして、都市の活気ある文化を反映し、地元の関心を引くカリキュラムリストを提供します。

Python

 

     embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
        SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
        FROM default.Events
        WHERE City LIKE '%New York%' and dist < 1.5
        ORDER BY Users_Going DESC,dist
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("Description ", row["Description"])
        print("Users Going ", row["Users_Going"])

主要イベント主催者

このクエリは、参加者数と興味を持つユーザー数に基づいてトップ10のイベント主催者をランキングし、魅力的なイベントを作成し、大規模な観客を引き付ける優れた能力を持つ者を強調します。トップレベルのイベントに関心のあるイベントプランナーや参加者に有益な洞察を提供します。

Python

 

     # どのクライアントが最も多くのユーザーを引き付けているか 
results = client.query(f"""
       SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
        FROM default.Events
        GROUP BY Organized_By
        ORDER BY Total_Users DESC
        Limit 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Organized_By"])
        print("Total_Users ", row["Total_Users"])

RAGを実装

以前、データ分析のためのMyScaleを探求し、そのデータワークフローを強化する能力を強調しました。次に進むにあたり、外部知識ベースとLLMを組み合わせた革新的なフレームワークであるRetrieval-Augmented Generation(RAG)を実装することにより、一歩先を行きます。このステップにより、データをより深く理解し、詳細な洞察を見つけることができるようになります。次に、RAGをMyScaleとともに使用する方法を見ていきます。これにより、データとのやり取りがより興味深く、生産的になります。

Python

 

     from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

query="Can you please suggest me some events related to basketball"
# 上記で定義されたget_embeddingメソッドを使用し、それは文のリストを受け入れます
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
        FROM default.Events
        ORDER BY Users_Going DESC,dist
        LIMIT 5
    """)
PROMPT_TEMPLATE = """
Your objective is to formulate a response to a question using only the information provided below:
{context}
---
Your task is to carefully analyze the provided context and provide an answer to the following question based solely on the information given:
{question}
"""
# 上位の結果の説明を組み合わせます。 
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)

このブログを通じて、MyScaleはさまざまなアプリケーションを開発するために使用できるベクトルデータベース以上のものであることを観察しました。単純なSQLデータベースとして、または高度なAIアプリケーションのために使用でき、開発ドメインの大部分をカバーします。無料サービスにサインアップして、500万件の無料ベクトルストレージを入手し、高度な機能を試してみることをお勧めします。

結論

イベント分析アプリケーションの開発プロセスを通じて、Apify ScraperとともにMyScaleの能力と機能を探求しました。MyScaleは、SQLデータベースのすべての機能を維持しながら、高性能なベクトル検索の優れた能力を示しました。これにより、開発者はより速く正確なセマンティック検索を行うために、馴染み深いSQL構文を使用できます。

MyScaleの機能は、このアプリケーションに限定されていません。RAG法を使った任意のAIアプリケーションの開発に適用することができます。

フィードバックや提案があれば、お気軽にお問い合わせください。

Source:
https://dzone.com/articles/performing-advanced-facebook-event-data-analysis-w