在当今的数字时代,各行各业的专业人士必须紧跟即将到来的活动、会议和研讨会。然而,在浩瀚的网络信息海洋中,高效找到与个人兴趣相符的活动,是一项重大挑战。
本博客介绍了一种创新解决方案:一个综合应用,旨在从Facebook抓取活动数据,并使用MyScale分析抓取的数据。虽然MyScale常与RAG技术栈或作为向量数据库关联,其能力远超这些领域。我们将利用其向量搜索功能进行数据分析,通过分析语义上相似的活动,提供更佳的结果和见解。
你可能会注意到,Grok AI利用Qdrant向量数据库作为搜索引擎,从X(原名Twitter)数据中检索实时信息。通过将MyScale与其他平台如Apify集成,你也可以这样评估向量数据库的力量,通过开发简单的个性化应用来增强日常任务。
因此,在本博客中,我们将开发一个应用,只需输入城市名称,即可从Facebook抓取所有相关活动。随后,我们将使用MyScale的高级SQL向量功能进行数据分析和语义搜索。
工具与技术
我们将运用多种工具,包括Apify、MyScale和OpenAI,来开发这款实用的应用程序。
- Apify: 这是一个广受欢迎的网页抓取及自动化平台,极大地简化了数据收集流程。它具备抓取数据并将其输入大型语言模型(LLMs)的能力,使我们能够利用实时数据训练LLMs,进而开发应用程序。
- MyScale: MyScale是一款SQL向量数据库,我们用它来高效存储和处理结构化与非结构化数据。
- OpenAI: 我们将采用OpenAI的
text-embedding-3-small
模型来获取文本嵌入,然后将这些嵌入保存到MyScale中,以便进行数据分析和语义搜索。
如何搭建MyScale与Apify
开始设置MyScale和Apify前,你需要创建一个新目录和一个Python文件。这可以通过打开终端或命令行,并输入以下命令来完成:
mkdir MyScale
cd MyScale
touch main.ipynb
接下来安装所需包。复制下面的命令,粘贴到终端中。这些包将提供我们开发应用所需的工具和库。
pip install openai apify-client clickhouse-connect pandas numpy
这应该会在你的系统中安装所有依赖项。为确保一切安装正确,你可以在终端中输入以下命令进行验证。
pip freeze | egrep '(openai|apify-client|clickhouse-connect|pandas|numpy)'
这应该包括所有已安装依赖项及其版本。如果发现任何缺失的依赖项,您可能需要重新运行该特定包的安装命令。现在,安装完毕后,我们就可以开始编写代码了。
注意:我们将在Python笔记本中工作。请将每个代码块视为笔记本的一个单元格。
如何使用Apify抓取数据
接下来,我们将使用Apify API来抓取纽约市的事件数据,采用Facebook Events爬虫。
import pandas as pd
from apify_client import ApifyClient
# 使用您的API令牌初始化ApifyClient
client = ApifyClient("Enter_your_apify_key_here")
# 准备Actor输入
run_input = {
"searchQueries": ["Sport New York"],
"startUrls": [],
"maxEvents": 50,
}
# 运行Actor并等待其完成
run = client.actor("UZBnerCFBo5FgGouO").call(run_input=run_input)
df_columns = ['Name', 'Datetime', 'Description', 'Users_Going', 'Users_Interested', 'Users_Responded', 'City', 'Organized_By', 'Street_Address']
dataframe1 = pd.DataFrame(columns=df_columns)
for item in client.dataset(run["defaultDatasetId"]).iterate_items():
# 使用字典推导式将None值替换为空字符串
row = {
'Name': item.get('name', ''),
'Datetime': item.get('dateTimeSentence', ''),
'Description': item.get('description', ''),
'Users_Going': item.get('usersGoing', ''),
'Users_Interested': item.get('usersInterested', ''),
'Users_Responded': item.get('usersResponded', ''),
'City': item.get('location', {}).get('city', '') if item.get('location') else '',
'Organized_By': item.get('organizedBy', ''),
'Street_Address': item.get('location', {}).get('streetAddress', '') if item.get('location') else ''
}
# 确保所有None值都被空字符串替换
row = {k: (v if v is not None else '') for k, v in row.items()}
dataframe1 = dataframe1._append(row, ignore_index=True)
# 清理数据
dataframe1['Description'] = dataframe1['Description'].replace('\\n', '', regex=True)
此脚本以pandas
DataFrame的形式提供即将到来的事件详情。
注意:别忘了在上面的脚本中添加您的Apify API密钥。您可以在Apify控制台的集成页面找到您的API令牌。
数据预处理
当我们收集原始数据时,它们以各种格式出现。在本脚本中,我们将事件日期统一为一个格式,以便更高效地进行数据筛选。
# 导入数据处理和日期解析所需库
import pandas as pd
import numpy as np
from datetime import datetime
from dateutil import parser
# 解析可能表示范围或单一日期的字符串函数
def parse_dates(date_str):
# 检查日期字符串是否包含破折号,指示范围
if '-' in date_str:
parts = date_str.split('-')
# 如果字符串分割成两部分,则为有效范围
if len(parts) == 2:
try:
# 解析起始和结束日期,格式化为可读格式
start_date = parser.parse(parts[0], fuzzy=True).strftime('%a, %b %d')
end_date = parser.parse(parts[1], fuzzy=True).strftime('%a, %b %d')
return start_date, end_date
except ValueError:
# 解析错误时,不作处理(将在下方处理)
pass
# 如果不是范围或解析范围失败,尝试解析为单一日期
try:
parsed_date = parser.parse(date_str, fuzzy=True)
# 格式化单一日期作为起始日期,并以不同方式格式化结束日期
start_date = parsed_date.strftime('%a, %b %d AT %I:%M %p EDT')
end_date = parsed_date.strftime('%a, %b %d') # Omitting time for end_date
return start_date, end_date
except ValueError:
# 解析失败时,返回两个日期的NaN
return np.nan, np.nan
# 从日期字符串中提取详细日期、时间和星期函数
def extract_date_time_day(date_str):
try:
# 解析日期字符串,允许输入格式有一定灵活性
parsed_date = parser.parse(date_str, fuzzy=True)
# 提取并格式化日期、时间和星期部分
date = parsed_date.strftime('%Y-%m-%d')
day = parsed_date.strftime('%a')
# 确定原始字符串是否包含时间组件
time_component = parsed_date.strftime('%I:%M %p') not in date_str
time = parsed_date.strftime('%H:%M:%S') if not time_component else np.nan
except ValueError:
# 解析失败时,设置日期、时间和星期为NaN
date, time, day = np.nan, np.nan, np.nan
return date, time, day
# 应用parse_dates函数遍历数据框,为起始和结束日期创建新列
dataframe1[['Start_Date', 'End_Date']] = dataframe1.apply(lambda row: pd.Series(parse_dates(row['Datetime'])), axis=1)
# 删除Start_Date为NaN的行,指示解析不成功
dataframe = dataframe1.dropna(subset=['Start_Date'])
# 应用extract_date_time_day函数将起始和结束日期分割为单独的日期、时间和星期列
dataframe['Start_Date'], dataframe['Start_Time'], dataframe['Start_Day'] = zip(*dataframe['Start_Date'].apply(extract_date_time_day))
dataframe['End_Date'], _, dataframe['End_Day'] = zip(*dataframe['End_Date'].apply(extract_date_time_day))
# 移除不再需要的原始'Datetime'列
dataframe=dataframe.drop(['Datetime'], axis=1)
# 将'Start_Date'和'End_Date'转换为datetime格式,提取仅日期部分
dataframe['Start_Date'] = pd.to_datetime(dataframe['Start_Date']).dt.date
dataframe['End_Date'] = pd.to_datetime(dataframe['End_Date']).dt.date
# 将'Start_Time'转换为datetime格式,保留时间信息
dataframe['Start_Time'] = pd.to_datetime(dataframe['Start_Time'])
此代码片段结合Python中的pandas
、datetime
和dateutil
包来格式化数据。
生成嵌入
为了深入理解和搜索事件,我们将使用text-embedding-3-small
从事件描述中生成嵌入。这些嵌入捕捉了每个事件的语义本质,有助于应用程序返回更优质的结果。
# 导入OpenAI库以访问API。
from openai import OpenAI
# 使用API密钥初始化OpenAI客户端。
openai_client = OpenAI(api_key="your_openai_api_key_here")
# 获取文本嵌入的函数
def get_embedding(text, model="text-embedding-3-small"):
return openai_client.embeddings.create(input=text, model=model).data
embeddings = get_embedding(dataframe["Description"].tolist())
# 从嵌入对象中提取嵌入向量
vectors = [embedding.embedding for embedding in embeddings]
array = np.array(vectors)
embeddings_series = pd.Series(list(array))
# 将嵌入作为新列添加到DataFrame中。
dataframe['Description_Embeddings'] = embeddings_series
现在,我们将带有嵌入的新DataFrame
插入MyScale。
连接MyScale
如开头所述,我们将使用MyScale作为向量数据库来存储和管理数据。在此,我们将连接到MyScale,为数据存储做准备。
import clickhouse_connect
client = clickhouse_connect.get_client(
host='host_name_here',
port=443,
username='username_here',
password='passwd_here'
)
此连接设置确保我们的应用程序能够与MyScale通信,并利用SQL的力量进行数据操作和分析。
注意:有关如何连接到MyScale集群的更多信息,请参阅连接详情。
使用MyScale创建表和索引
我们现在根据我们的DataFrame创建一个表。所有数据,包括嵌入,都将存储在此表中。
client.command("""
CREATE TABLE default.Events (
Name String,
Description String,
Users_Going Int64,
Users_Interested Int64,
Users_Responded Int64,
City String,
Organized_By String,
Street_Address String,
Start_Date Date32,
End_Date Nullable(Date32),
Start_Time Nullable(DateTime64),
Start_Day String,
End_Day String,
Description_Embeddings Array(Float32),
CONSTRAINT check_data_length CHECK length(Description_Embeddings) = 1536
) ENGINE = MergeTree()
ORDER BY (Name);
""")
上述SQL语句在集群上创建了一个名为Events
的表。通过CONSTRAINT
确保所有向量嵌入的长度均为1536
。
在MyScale中存储数据并创建索引
此步骤中,我们将处理后的数据插入MyScale。这涉及批量插入数据以确保高效的存储和检索。
batch_size = 10 # Adjust based on your needs
num_batches = len(dataframe) // batch_size
for i in range(num_batches):
start_idx = i * batch_size
end_idx = start_idx + batch_size
batch_data = dataframe[start_idx:end_idx]
# print(batch_data["Description_Embeddings"])
client.insert("default.Events", batch_data.to_records(index=False).tolist(), column_names=batch_data.columns.tolist())
print(f"Batch {i+1}/{num_batches} inserted.")
client.command("""
ALTER TABLE default.Events
ADD VECTOR INDEX vector_index Description_Embeddings
TYPE MSTG
""")
利用pandas
,上述代码高效地将我们准备好的数据集传输到MyScale数据库中。
使用MyScale进行数据分析
最后,我们利用MyScale的分析能力执行分析并启用语义搜索。通过执行SQL查询,我们可以基于主题、位置和日期分析事件。那么,让我们尝试编写一些查询。
简单的SQL查询
首先,让我们尝试从表中获取前10条结果。
results=client.query("""
SELECT Name,Description FROM default.Events LIMIT 10
""")
for row in results.named_results():
print(row["Name"])
print(row['Description'])
此查询将简单地从events
表返回前10条结果。
通过语义相关性发现事件
让我们尝试找到与参考事件相似的前10个即将到来的事件,例如:“国内历史最悠久的表演之一 – 自1974年运营至今…现在迎来我们的50周年纪念!我们的Schenectady”。这是通过比较事件描述的语义嵌入来实现的,确保主题和情感的匹配。
embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embedding=embeddings[0].embedding
results = client.query(f"""
SELECT Name, Description,
distance(Description_Embeddings, {embedding}) as dist FROM default.Events ORDER BY dist LIMIT 10
""")
for row in results.named_results():
print("Title of the event ", row["Name"])
print("Description of the event ", row['Description'])
print("Distance : ", row["dist"])
按人气趋势展示的热门事件
此查询根据参与人数和感兴趣用户数量,列出了排名前10的活动,从大城市节庆到主要会议,突显了热门活动。适合寻求加入大型、充满活力聚会的人群。
results = client.query(f"""
SELECT Name, City, Users_Going, Users_Interested, Users_Responded
FROM default.Events
ORDER BY Users_Going DESC, Users_Interested DESC
LIMIT 10
""")
for row in results.named_results():
print("Event Name ", row["Name"])
print("City ", row["City"])
print("Users Going ", row["Users_Going"])
print("Interested Users ", row["Users_Interested"])
纽约热门本地活动
结合相关性和受欢迎程度,此查询识别出纽约市与特定事件相关的类似活动,并按参与度进行排名,提供了一份反映城市活力文化并吸引当地兴趣的精选活动列表。
embeddings=get_embedding(["One of the Longest Running Shows in the Country - Operating since 1974 ...NOW our 50th YEAR !!!Our Schenectady"])
embeddi=embeddings[0].embedding
results = client.query(f"""
SELECT Name,City, Description, Users_Going,distance(Description_Embeddings, {embeddi}) as dist
FROM default.Events
WHERE City LIKE '%New York%' and dist < 1.5
ORDER BY Users_Going DESC,dist
LIMIT 10
""")
for row in results.named_results():
print("Event Name ", row["Name"])
print("Description ", row["Description"])
print("Users Going ", row["Users_Going"])
领先活动组织者
此查询根据总参与人数和感兴趣用户数量,列出了排名前10的活动组织者,突显那些在创造引人入胜活动和吸引大量观众方面表现卓越的组织者。为活动策划者和对顶级活动感兴趣的参与者提供了洞察。
#哪个客户吸引了最多用户
results = client.query(f"""
SELECT Organized_By, SUM(Users_Going + Users_Interested) AS Total_Users
FROM default.Events
GROUP BY Organized_By
ORDER BY Total_Users DESC
Limit 10
""")
for row in results.named_results():
print("Event Name ", row["Organized_By"])
print("Total_Users ", row["Total_Users"])
实施RAG
之前,我们探讨了MyScale在数据分析中的应用,突出了它增强数据工作流的强大功能。接下来,我们将更进一步,采用检索增强生成(RAG)这一创新框架,它将外部知识库与大型语言模型(LLMs)相结合。这一步骤将帮助您更深入地理解数据,发现更详尽的洞察。随后,您将了解如何将RAG与MyScale结合使用,这将使数据工作变得更加有趣和高效。
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
query="Can you please suggest me some events related to basketball"
# 使用上述定义的get_embedding方法,它接受一个句子列表
embeddings=get_embedding([query])
embeddings=embeddings[0].embedding
results = client.query(f"""
SELECT Name, City, Users_Going, Description, distance(Description_Embeddings, {embeddings}) as dist
FROM default.Events
ORDER BY Users_Going DESC,dist
LIMIT 5
""")
PROMPT_TEMPLATE = """
Your objective is to formulate a response to a question using only the information provided below:
{context}
---
Your task is to carefully analyze the provided context and provide an answer to the following question based solely on the information given:
{question}
"""
# 合并顶级结果的描述。
descriptions = [row["Description"] for row in results.named_results()]
context_text = "\n\n---\n\n".join(descriptions)
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE)
prompt = prompt_template.format(context=context_text, question=query)
model = ChatOpenAI(openai_api_key="your_api_key_here")
response_text = model.predict(prompt)
formatted_response = f"Response: {response_text}\n"
print(formatted_response)
在整个博客中,我们观察到MyScale远不止是一个可以用来开发各种应用的向量数据库。我们可以将其作为简单的SQL数据库使用,或者用于高级AI应用,覆盖了大部分开发领域。我们鼓励您尝试一下,通过注册免费层级并获得500万免费向量存储来探索其高级功能。
结论
我们通过开发事件分析应用程序的过程,探索了MyScale与Apify Scraper的能力和功能。MyScale在保持SQL数据库所有功能的同时,展示了其在高性能向量搜索方面的卓越能力,帮助开发者在熟悉的SQL语法基础上进行语义搜索,速度和准确性都有了显著提升。
MyScale的能力并不仅限于此应用,您可以采用它利用RAG方法开发任何AI应用。
如有任何反馈或建议,请随时与我们联系。
Source:
https://dzone.com/articles/performing-advanced-facebook-event-data-analysis-w