在上一篇博客中,我们了解了如何使用 2 个插件 apoc 和图形数据科学库 - gds 在本地安装和设置 neo4j。在这篇博客中,我将获取一个玩具数据集(电子商务网站中的产品)并将其存储在 neo4j 中。
在开始加载数据之前,如果您的用例中有大量数据,请确保为 neo4j 分配了足够的内存。为此:




import pandas as pd from neo4j import graphdatabase from openai import openai
client = openai(api_key="")
product_data_df = pd.read_csv('../data/product_data.csv')
def get_embedding(text):
"""
used to generate embeddings using openai embeddings model
:param text: str - text that needs to be converted to embeddings
:return: embedding
"""
model = "text-embedding-3-small"
text = text.replace("\n", " ")
return client.embeddings.create(input=[text], model=model).data[0].embedding
def create_category(product_data_df):
"""
used to generate queries for creating category nodes in neo4j
:param product_data_df: pandas dataframe - data
:return: query_list: list - list containing all create node queries for category
"""
cat_query = """create (a:category {name: '%s', embedding: %s})"""
distinct_category = product_data_df['category'].unique()
query_list = []
for category in distinct_category:
embedding = get_embedding(category)
query_list.append(cat_query % (category, embedding))
return query_list
def create_product(product_data_df):
"""
used to generate queries for creating product nodes in neo4j
:param product_data_df: pandas dataframe - data
:return: query_list: list - list containing all create node queries for product
"""
product_query = """create (a:product {name: '%s', description: '%s', price: %d, warranty_period: %d,
available_stock: %d, review_rating: %f, product_release_date: date('%s'), embedding: %s})"""
query_list = []
for idx, row in product_data_df.iterrows():
embedding = get_embedding(row['product name'] + " - " + row['description'])
query_list.append(product_query % (row['product name'], row['description'], int(row['price (inr)']),
int(row['warranty period (years)']), int(row['stock']),
float(row['review rating']), str(row['product release date']), embedding))
return query_list
def execute_bulk_query(query_list):
"""
executes queries is a list one by one
:param query_list: list - list of cypher queries
:return: none
"""
url = "bolt://localhost:7687"
auth = ("neo4j", "neo4j@123")
with graphdatabase.driver(url, auth=auth) as driver:
with driver.session() as session:
for query in query_list:
try:
session.run(query)
except exception as error:
print(f"error in executing query - {query}, error - {error}")
import pandas as pd
from neo4j import graphdatabase
from openai import openai
client = openai(api_key="")
product_data_df = pd.read_csv('../data/product_data.csv')
def preprocessing(df, columns_to_replace):
"""
used to preprocess certain column in dataframe
:param df: pandas dataframe - data
:param columns_to_replace: list - column name list
:return: df: pandas dataframe - processed data
"""
df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'s", "s"))
df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'", ""))
return df
def get_embedding(text):
"""
used to generate embeddings using openai embeddings model
:param text: str - text that needs to be converted to embeddings
:return: embedding
"""
model = "text-embedding-3-small"
text = text.replace("\n", " ")
return client.embeddings.create(input=[text], model=model).data[0].embedding
def create_category(product_data_df):
"""
used to generate queries for creating category nodes in neo4j
:param product_data_df: pandas dataframe - data
:return: query_list: list - list containing all create node queries for category
"""
cat_query = """create (a:category {name: '%s', embedding: %s})"""
distinct_category = product_data_df['category'].unique()
query_list = []
for category in distinct_category:
embedding = get_embedding(category)
query_list.append(cat_query % (category, embedding))
return query_list
def create_product(product_data_df):
"""
used to generate queries for creating product nodes in neo4j
:param product_data_df: pandas dataframe - data
:return: query_list: list - list containing all create node queries for product
"""
product_query = """create (a:product {name: '%s', description: '%s', price: %d, warranty_period: %d,
available_stock: %d, review_rating: %f, product_release_date: date('%s'), embedding: %s})"""
query_list = []
for idx, row in product_data_df.iterrows():
embedding = get_embedding(row['product name'] + " - " + row['description'])
query_list.append(product_query % (row['product name'], row['description'], int(row['price (inr)']),
int(row['warranty period (years)']), int(row['stock']),
float(row['review rating']), str(row['product release date']), embedding))
return query_list
def execute_bulk_query(query_list):
"""
executes queries is a list one by one
:param query_list: list - list of cypher queries
:return: none
"""
url = "bolt://localhost:7687"
auth = ("neo4j", "neo4j@123")
with graphdatabase.driver(url, auth=auth) as driver:
with driver.session() as session:
for query in query_list:
try:
session.run(query)
except exception as error:
print(f"error in executing query - {query}, error - {error}")
# preprocessing
product_data_df = preprocessing(product_data_df, ['product name', 'description'])
# create category
query_list = create_category(product_data_df)
execute_bulk_query(query_list)
# create product
query_list = create_product(product_data_df)
execute_bulk_query(query_list)
from neo4j import GraphDatabase
import pandas as pd
product_data_df = pd.read_csv('../data/product_data.csv')
def preprocessing(df, columns_to_replace):
"""
Used to preprocess certain column in dataframe
:param df: pandas dataframe - data
:param columns_to_replace: list - column name list
:return: df: pandas dataframe - processed data
"""
df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'s", "s"))
df[columns_to_replace] = df[columns_to_replace].apply(lambda col: col.str.replace("'", ""))
return df
def create_category_food_relationship_query(product_data_df):
"""
Used to create relationship between category and products
:param product_data_df: dataframe - data
:return: query_list: list - cypher queries
"""
query = """MATCH (c:Category {name: '%s'}), (p:Product {name: '%s'}) CREATE (c)-[:CATEGORY_CONTAINS_PRODUCT]->(p)"""
query_list = []
for idx, row in product_data_df.iterrows():
query_list.append(query % (row['Category'], row['Product Name']))
return query_list
def execute_bulk_query(query_list):
"""
Executes queries is a list one by one
:param query_list: list - list of cypher queries
:return: None
"""
url = "bolt://localhost:7687"
auth = ("neo4j", "neo4j@123")
with GraphDatabase.driver(url, auth=auth) as driver:
with driver.session() as session:
for query in query_list:
try:
session.run(query)
except Exception as error:
print(f"Error in executing query - {query}, Error - {error}")
# PREPROCESSING
product_data_df = preprocessing(product_data_df, ['Product Name', 'Description'])
# CATEGORY - FOOD RELATIONSHIP
query_list = create_category_food_relationship_query(product_data_df)
execute_bulk_query(query_list)
open图标上,然后单击neo4j浏览器以可视化我们创建的节点。



在接下来的博客中,我们将看到如何使用 python 构建图形查询引擎并使用获取的数据进行增强生成。
linkedin - https://www.linkedin.com/in/praveenr2998/
github - https://github.com/praveenr2998/creating-lightweight-rag-systems-with-graphs/tree/main/push_data_to_db
以上就是将数据加载到 Neo4j 中的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号