벡터 데이터베이스를 이용한 고급 페이스북 이벤트 데이터 분석 수행

오늘날 디지털 시대에서 모든 산업 분야의 전문가들은 예정된 이벤트, 컨퍼런스 및 워크숍에 대해 계속해서 업데이트해야 합니다. 그러나 광대한 온라인 정보의 바다 속에서 자신의 관심사와 일치하는 이벤트를 효과적으로 찾는 것은 큰 도전이 됩니다.

이 블로그에서는 이 문제에 대한 혁신적인 해결책을 소개합니다: Facebook에서 이벤트 데이터를 스크래핑하고 MyScale를 사용하여 스크래핑된 데이터를 분석하는 포괄적인 애플리케이션입니다. MyScale는 일반적으로 RAG 기술 스택과 연관되거나 벡터 데이터베이스로 사용되지만, 이러한 영역을 넘어서는 능력을 갖추고 있습니다. 우리는 벡터 검색 기능을 활용하여 의미적으로 유사한 이벤트를 분석하여 더 나은 결과와 통찰력을 제공하는 데 이를 활용할 것입니다.

당신은 Grok AI가 X(이전의 Twitter) 데이터에서 실시간 정보를 검색하기 위해 Qdrant 벡터 데이터베이스를 검색 엔진으로 사용했음을 알 수 있습니다. MyScale와 다른 플랫폼(예: Apify)과의 통합을 통해 개인화된 애플리케이션을 개발함으로써 일상 생활의 작업을 강화하는 데 벡터 데이터베이스의 힘을 확인할 수도 있습니다.

그러면 이 블로그에서 도시 이름만 입력으로 받아 Facebook에서 관련 이벤트를 모두 스크래핑하는 애플리케이션을 개발해 보겠습니다. 그 후 MyScale의 고급 SQL 벡터 기능을 사용하여 데이터 분석 및 의미 검색을 수행할 것입니다.

도구 및 기술

우리는 이 유용한 애플리케이션을 개발하기 위해 Apify, MyScale, OpenAI와 같은 여러 도구들을 사용할 것입니다.

  • Apify: 인기 있는 웹 스크레이핑 및 자동화 플랫폼으로, 데이터 수집 과정을 크게 간소화합니다. 이를 통해 데이터를 스크레이핑하고 이를 LLM에 제공할 수 있습니다. 이를 통해 실시간 데이터로 LLM을 훈련시키고 애플리케이션을 개발할 수 있습니다.
  • MyScale: MyScale는 SQL 벡터 데이터베이스로, 우리는 이를 사용하여 구조화된 데이터와 비구조화된 데이터를 최적화된 방식으로 저장 및 처리합니다.
  • OpenAI: 우리는 OpenAItext-embedding-3-small 모델을 사용하여 텍스트의 임베딩을 얻고, 이 임베딩을 MyScale에 저장하여 데이터 분석 및 의미 검색에 사용할 것입니다.

MyScale 및 Apify 설정 방법

MyScale와 Apify를 설정하기 시작하려면 새 디렉토리와 Python 파일을 생성해야 합니다. 이를 위해 터미널 또는 명령 행을 열고 다음 명령을 입력하십시오:

Shell

 

mkdir MyScale
cd MyScale
touch main.ipynb

패키지를 설치합시다. 아래 명령을 복사하여 터미널에 붙여넣으십시오. 이 패키지들은 우리가 애플리케이션을 개발하는 데 필요한 도구와 라이브러리를 제공합니다.

Shell

 

pip install openai apify-client clickhouse-connect pandas numpy

이렇게 하면 시스템에 모든 종속성이 설치되어야 합니다. 모든 것이 제대로 설치되었는지 확인하려면 터미널에서 다음 명령을 입력할 수 있습니다.

Shell

 

pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'

이 과정에서는 모든 설치된 의존성과 해당 버전을 포함해야 합니다. 누락된 의존성을 발견하면 해당 패키지의 설치 명령어를 다시 실행해야 할 수도 있습니다. 이제 설치가 완료되었으니 코드를 작성할 준비가 되었습니다.

참고: 파이썬 노트북 환경에서 작업할 예정입니다. 모든 코드 블록을 노트북 셀로 간주하시기 바랍니다.

Apify를 사용한 데이터 스크래핑 방법

이제 Apify API를 사용하여 뉴욕시의 이벤트 데이터를 스크래핑하겠습니다.페이스북 이벤트 스크래퍼를 사용합니다.

Python

 

     import pandas as pd
from apify_client import ApifyClient
# API 토큰으로 ApifyClient 초기화
client = ApifyClient("Enter_your_apify_key_here")

# 액터 입력 준비
run_input = {
    "searchQueries": ["Sport New York"],
    "startUrls": [],
    "maxEvents": 50,
}

# 액터 실행 및 완료 대기
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)

df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)

for item in client.dataset(run["defaultDatasetId"]).iterate_items():
    # None 값을 빈 문자열로 대체하는 사전 이항식 사용
    row = {
        'Name': item.get('name', ''),
        'Datetime': item.get('dateTimeSentence', ''),
        'Description': item.get('description', ''),
        'Users_Going': item.get('usersGoing', ''),
        'Users_Interested': item.get('usersInterested', ''),
        'Users_Responded': item.get('usersResponded', ''),
        'City': item.get('location', {}).get('city', '') if item.get('location') else '',
        'Organized_By': item.get('organizedBy', ''),
        'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
    }
    # 모든 None 값이 빈 문자열로 대체되었는지 확인
    row = {k: (v if v is not None else '') for k, v in row.items()}
    dataframe1 = dataframe1._append(row, ignore_index=True)

# 데이터 정리
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)

이 스크립트는 다가오는 이벤트 세부 정보를 pandas DataFrame 형태로 제공합니다.

참고: 위의 스크립트에 Apify API 키를 추가하는 것을 잊지 마세요. API 토큰은 Apify 콘솔의 통합 페이지에서 찾을 수 있습니다.

데이터 전처리

원시 데이터를 수집할 때 다양한 형식으로 제공됩니다. 이 스크립트에서는 이벤트 날짜를 단일 형식으로 변환하여 데이터 필터링을 보다 효율적으로 수행할 수 있도록 합니다.

Python

 

     # 데이터 조작 및 날짜 파싱을 위해 필요한 라이브러리 가져오기
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser

# 범위를 나타내거나 단일 날짜를 나타낼 수 있는 날짜 문자열을 파싱하는 함수
def parse_dates(date_str):
    # 날짜 문자열에 대시가 있는지 확인하여 범위를 나타냄
    if '-' in date_str:
        parts = date_str.split('-')
        # 문자열이 두 부분으로 분할되면 유효한 범위임
        if len(parts) == 2:
            try:
                # 시작 및 종료 날짜를 파싱하고 읽기 쉬운 형식으로 포맷
                start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
                end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
                return start_date, end_date
            except ValueError:
                # 파싱 오류의 경우 아무것도 수행하지 않음(아래에서 처리)
                pass  
    # 범위가 아니거나 범위 파싱에 실패한 경우 단일 날짜로 파싱하려고 시도
    try:
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 단일 날짜를 시작_날짜로 포맷하고 다르게 종료_날짜로 포맷
        start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
        end_date = parsed_date.strftime('%a, %b %d')  # Omitting time for end_date
        return start_date, end_date
    except ValueError:
        # 파싱에 실패하면 두 날짜 모두 NaN 반환
        return np.nan, np.nan  

# 날짜 문자열에서 상세 날짜, 시간 및 요일을 추출하는 함수
def extract_date_time_day(date_str):
    try:
        # 입력 형식에 대한 유연성을 허용하여 날짜 문자열 파싱
        parsed_date = parser.parse(date_str, fuzzy=True)
        # 날짜, 시간 및 요일 부분 추출 및 포맷
        date = parsed_date.strftime('%Y-%m-%d')
        day = parsed_date.strftime('%a')
        # 원래 문자열에 시간 구성 요소가 포함되어 있는지 확인
        time_component = parsed_date.strftime('%I:%M %p') not in date_str
        time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
    except ValueError:
        # 파싱에 실패하면 날짜, 시간 및 요일을 NaN으로 설정
        date, time, day = np.nan, np.nan, np.nan
    
    return date, time, day

# parse_dates 함수를 데이터프레임 전체에 적용하여 시작 및 종료 날짜에 대한 새 열 생성
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)

# Start_Date가 NaN인 행 삭제, 파싱이 실패한 것으로 나타냄
dataframe = dataframe1.dropna(subset=['Start_Date'])

# extract_date_time_day를 적용하여 시작 및 종료 날짜를 별도의 날짜, 시간 및 요일 열로 분할
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))

# 더 이상 필요하지 않으므로 원래 'Datetime' 열 제거
dataframe=dataframe.drop(['Datetime'], axis=1)

# 'Start_Date' 및 'End_Date'를 datetime 형식으로 변환하여 날짜 부분만 추출
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date

# 'Start_Time'을 datetime 형식으로 변환하여 시간 정보 유지
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])

이 코드 스니펫은 pandasdatetimedateutil 패키지를 사용하여 데이터를 형식화합니다.

임베딩 생성

이벤트를 깊이 이해하고 검색하기 위해 text-embedding-3-small을 사용하여 설명에서 임베딩을 생성합니다. 이 임베딩은 각 이벤트의 의미적 본질을 포착하여 애플리케이션이 더 나은 결과를 반환하는 데 도움이 됩니다.

Python

 

     # OpenAI 라이브러리를 통한 API 접근 가져오기.
from openai import OpenAI 
# API 키로 OpenAI 클라이언트 초기화.
openai_client = OpenAI(api_key="your_openai_api_key_here")
# 텍스트 임베딩을 가져오는 함수 
def get_embedding(text, model="text-embedding-3-small"):
    return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# 임베딩 객체에서 임베딩 벡터 추출
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# 임베딩을 DataFrame에 새 열로 추가.
dataframe['Description_Embeddings'] = embeddings_series

이제 임베딩이 포함된 새 DataFrame을 MyScale에 삽입합니다.

MyScale와 연결

처음에 논의된 대로 MyScale를 벡터 데이터베이스로 사용하여 데이터를 저장하고 관리합니다. 여기서 MyScale에 연결하여 데이터 저장을 준비합니다.

Python

 

     import clickhouse_connect
client = clickhouse_connect.get_client(
    host='host_name_here',
    port=443,
    username='username_here',
    password='passwd_here'
)

이 연결 설정은 애플리케이션이 MyScale와 통신하고 SQL의 힘을 사용하여 데이터 조작 및 분석을 수행할 수 있도록 합니다.

참고: MyScale 클러스터에 연결하는 방법에 대한 자세한 내용은 연결 세부 정보를 참조하세요.

MyScale를 사용하여 테이블 및 인덱스 생성

이제 DataFrame에 따라 테이블을 생성합니다. 모든 데이터는 이 테이블에 저장되며, 임베딩을 포함합니다.

Python

 

     client.command("""
    CREATE TABLE default.Events (
    Name String,
    Description String,
    Users_Going Int64,
    Users_Interested Int64,
    Users_Responded Int64,
    City String,
    Organized_By String,
    Street_Address String,
    Start_Date Date32,
    End_Date Nullable(Date32),
    Start_Time Nullable(DateTime64),
    Start_Day String,
    End_Day String,
    Description_Embeddings Array(Float32),
    CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
    ) ENGINE = MergeTree()
    ORDER BY (Name);
    """)

위의 SQL 문은 클러스터에 Events라는 이름의 테이블을 생성합니다. CONSTRAINT는 모든 벡터 임베딩이 1536의 길이로 동일하게 유지되도록 합니다.

MyScale에서 데이터 저장 및 인덱스 생성

이 단계에서는 처리된 데이터를 MyScale에 삽입합니다. 이는 데이터의 효율적인 저장 및 검색을 보장하기 위해 데이터를 일괄 삽입하는 것을 포함합니다.

Python

 

     batch_size = 10  # Adjust based on your needs

num_batches = len(dataframe) // batch_size

for i in range(num_batches):
    start_idx = i * batch_size
    end_idx = start_idx + batch_size
    batch_data = dataframe[start_idx:end_idx]
    # print(batch_data["Description_Embeddings"])
    client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
    print(f"Batch {i+1}/{num_batches} inserted.")

client.command("""
ALTER TABLE default.Events
    ADD VECTOR INDEX vector_index Description_Embeddings
    TYPE MSTG
""")

pandas를 사용하여 위의 코드는 준비된 데이터 세트를 MyScale 데이터베이스로 효율적으로 전송합니다.

MyScale를 사용한 데이터 분석

마지막으로 MyScale의 분석 기능을 사용하여 분석을 수행하고 의미 검색을 활성화합니다. SQL 쿼리를 실행하여 주제, 위치 및 날짜별로 이벤트를 분석할 수 있습니다. 이제 몇 가지 쿼리를 작성해 보겠습니다.

간단한 SQL 쿼리

먼저 테이블에서 상위 10개 결과를 가져오도록 시도해 보겠습니다.

Python

 

     results=client.query("""
        SELECT Name,Description FROM default.Events LIMIT 10
    """)
for row in results.named_results():
        print(row["Name"])
        print(row['Description'])

이 쿼리는 events 테이블에서 상위 10개 결과를 단순히 반환합니다.

의미적 유사성에 따라 이벤트 발견

참조 이벤트와 유사한 상위 10개의 예정된 이벤트를 찾아 보겠습니다. 예를 들어 “1974년부터 운영되어 온 국내에서 가장 오래 진행되는 공연 중 하나 – 50번째 해입니다!!! 우리의 Schenectady”. 이는 이벤트 설명의 의미 임베딩을 비교하여 테마와 감정의 일치를 보장하여 달성됩니다.

Python

 

embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, Description,
        distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
    """)
for row in results.named_results():
        print("Title of the event  ", row["Name"])
        print("Description of the event  ", row['Description'])
        print("Distance : ", row["dist"])

인기에 따른 트렌딩 이벤트

이 쿼리는 참여자 수와 관심을 받은 사용자 수로 상위 10개 이벤트를 순위 매깁니다, 대도시 축제부터 주요 컨퍼런스까지 인기 있는 이벤트를 강조합니다. 크고 에너지 넘치는 모임에 참여하고자 하는 사람들에게 이상적입니다.

Python

 

     results = client.query(f"""
        SELECT Name, City, Users_Going, Users_Interested, Users_Responded
        FROM default.Events
        ORDER BY Users_Going DESC, Users_Interested DESC
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("City ", row["City"])
        print("Users Going ", row["Users_Going"])
        print("Interested Users ", row["Users_Interested"])

뉴욕의 인기 지역 이벤트

관련성과 인기를 결합하여, 이 쿼리는 뉴욕시의 유사한 이벤트를 식별합니다 특정 이벤트와 관련된 것들을 참석률로 순위를 매기며, 도시의 활기찬 문화를 반영하고 지역 관심을 끌 이벤트의 선별된 목록을 제공합니다.

Python

 

     embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
        SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
        FROM default.Events
        WHERE City LIKE '%New York%' and dist < 1.5
        ORDER BY Users_Going DESC,dist
        LIMIT 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Name"])
        print("Description ", row["Description"])
        print("Users Going ", row["Users_Going"])

선도적 이벤트 조직자

이 쿼리는 상위 10명의 이벤트 조직자를 순위 매깁니다 총 참석자 수와 관심을 받은 사용자 수로, 매력적인 이벤트를 만들고 많은 관객을 유치하는 데 뛰어난 사람들을 강조합니다. 이는 최고 수준의 이벤트에 관심이 있는 이벤트 기획자와 참석자에게 통찰력을 제공합니다.

Python

 

     # 어떤 고객이 가장 많은 수의 사용자를 유치했나요 
results = client.query(f"""
       SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
        FROM default.Events
        GROUP BY Organized_By
        ORDER BY Total_Users DESC
        Limit 10
    """)
for row in results.named_results():
        print("Event Name  ", row["Organized_By"])
        print("Total_Users ", row["Total_Users"])

RAG 구현

이전에는 데이터 분석을 위한 MyScale을 살펴보았고, 이를 통해 데이터 워크플로우를 강화하는 기능에 대해 강조했습니다. 앞으로 나아가 Retrieval-Augmented Generation (RAG)을 구현해보겠습니다. 이는 외부 지식 기반과 LLM을 결합한 혁신적인 프레임워크입니다. 이 단계를 통해 데이터를 보다 잘 이해하고 자세한 인사이트를 찾을 수 있습니다. 다음으로 MyScale과 RAG를 사용하는 방법을 살펴보겠습니다. 이를 통해 데이터 작업이 보다 흥미롭고 생산적이 될 것입니다.

Python

 

     from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

query="Can you please suggest me some events related to basketball"
# 위에서 정의한 get_embedding 메서드를 사용하세요. 이 메서드는 문장 목록을 입력으로 받습니다.
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
        SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
        FROM default.Events
        ORDER BY Users_Going DESC,dist
        LIMIT 5
    """)
PROMPT_TEMPLATE = """
Your objective is to formulate a response to a question using only the information provided below:
{context}
---
Your task is to carefully analyze the provided context and provide an answer to the following question based solely on the information given:
{question}
"""
# 상위 결과의 설명을 결합합니다. 
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)

이 블로그를 통해 MyScale이 다양한 애플리케이션을 개발하는 데 사용할 수 있는 단순한 벡터 데이터베이스 이상의 것임을 관찰했습니다. 간단한 SQL 데이터베이스로 사용하거나 고급 AI 애플리케이션에 사용할 수 있으며, 대부분의 개발 영역을 커버합니다. 무료 티어에 가입하여 5백만 개의 무료 벡터 스토리지를 받고 고급 기능을 탐색해보시기 바랍니다.

결론

MyScale과 Apify Scraper를 통해 이벤트 분석 애플리케이션을 개발하는 과정을 통해 MyScale의 능력과 기능을 탐색했습니다. MyScale은 고성능 벡터 검색 기능을 갖추면서 SQL 데이터베이스의 모든 기능을 유지하는 데 탁원한 역할을 했으며, 개발자들이 익숙한 SQL 구문을 사용하여 훨씬 빠르고 정확한 의미 검색을 할 수 있게 도왔습니다.

MyScale의 기능은 이 애플리케이션에만 국한되지 않습니다: RAG 방법을 사용하여 어떤 AI 애플리케이션을 개발하는 데에도 적용할 수 있습니다. 

피드백이나 제안이 있으시면 언제든지 연락주세요.

Source:
https://dzone.com/articles/performing-advanced-facebook-event-data-analysis-w