Load node data to the graph with automatic indexing. This is the main entry point for intel modules to write node data to the graph. It automatically ensures that required indexes exist before performing the load operation, optimizing performance and maintaining data integrity.
(
neo4j_session: neo4j.Session,
node_schema: CartographyNodeSchema,
dict_list: List[Dict[str, Any]],
batch_size: int = 10000,
**kwargs,
)
| 782 | |
| 783 | |
| 784 | def load( |
| 785 | neo4j_session: neo4j.Session, |
| 786 | node_schema: CartographyNodeSchema, |
| 787 | dict_list: List[Dict[str, Any]], |
| 788 | batch_size: int = 10000, |
| 789 | **kwargs, |
| 790 | ) -> None: |
| 791 | """ |
| 792 | Load node data to the graph with automatic indexing. |
| 793 | |
| 794 | This is the main entry point for intel modules to write node data to the graph. |
| 795 | It automatically ensures that required indexes exist before performing the load |
| 796 | operation, optimizing performance and maintaining data integrity. |
| 797 | |
| 798 | Args: |
| 799 | neo4j_session (neo4j.Session): The Neo4j session for database operations. |
| 800 | node_schema (CartographyNodeSchema): The node schema object that defines |
| 801 | the structure of the data being loaded and generates the ingestion query. |
| 802 | dict_list (List[Dict[str, Any]]): The data to load to the graph, represented |
| 803 | as a list of dictionaries. Each dictionary represents one node to create |
| 804 | or update. |
| 805 | batch_size (int): The number of items to process per transaction. Defaults to 10000. |
| 806 | **kwargs: Additional keyword arguments passed to the Neo4j query, such as |
| 807 | timestamps, update tags, or other metadata. |
| 808 | |
| 809 | Examples: |
| 810 | >>> node_schema = CartographyNodeSchema( |
| 811 | ... label='AWSUser', |
| 812 | ... properties={ |
| 813 | ... 'id': PropertyRef('UserId'), |
| 814 | ... 'name': PropertyRef('UserName'), |
| 815 | ... 'email': PropertyRef('Email') |
| 816 | ... } |
| 817 | ... ) |
| 818 | >>> users_data = [ |
| 819 | ... {'UserId': 'user1', 'UserName': 'Alice', 'Email': 'alice@example.com'}, |
| 820 | ... {'UserId': 'user2', 'UserName': 'Bob', 'Email': 'bob@example.com'} |
| 821 | ... ] |
| 822 | >>> load(session, node_schema, users_data, lastupdated=current_time) |
| 823 | |
| 824 | Note: |
| 825 | - If ``dict_list`` is empty, the function returns early to save processing time. |
| 826 | - The function automatically creates necessary indexes before loading data. |
| 827 | - The ingestion query is generated automatically from the node schema. |
| 828 | - Data is processed in batches for optimal performance. |
| 829 | """ |
| 830 | if batch_size <= 0: |
| 831 | raise ValueError(f"batch_size must be greater than 0, got {batch_size}") |
| 832 | if len(dict_list) == 0: |
| 833 | # If there is no data to load, save some time. |
| 834 | return |
| 835 | ensure_indexes(neo4j_session, node_schema) |
| 836 | ingestion_query = build_ingestion_query(node_schema) |
| 837 | load_graph_data( |
| 838 | neo4j_session, ingestion_query, dict_list, batch_size=batch_size, **kwargs |
| 839 | ) |
| 840 | |
| 841 | # Apply conditional labels if any are defined |