MCPcopy
hub / github.com/cartography-cncf/cartography / load

Function load

cartography/client/core/tx.py:784–850  ·  view source on GitHub ↗

Load node data to the graph with automatic indexing. This is the main entry point for intel modules to write node data to the graph. It automatically ensures that required indexes exist before performing the load operation, optimizing performance and maintaining data integrity.

(
    neo4j_session: neo4j.Session,
    node_schema: CartographyNodeSchema,
    dict_list: List[Dict[str, Any]],
    batch_size: int = 10000,
    **kwargs,
)

Source from the content-addressed store, hash-verified

782
783
784def load(
785 neo4j_session: neo4j.Session,
786 node_schema: CartographyNodeSchema,
787 dict_list: List[Dict[str, Any]],
788 batch_size: int = 10000,
789 **kwargs,
790) -> None:
791 """
792 Load node data to the graph with automatic indexing.
793
794 This is the main entry point for intel modules to write node data to the graph.
795 It automatically ensures that required indexes exist before performing the load
796 operation, optimizing performance and maintaining data integrity.
797
798 Args:
799 neo4j_session (neo4j.Session): The Neo4j session for database operations.
800 node_schema (CartographyNodeSchema): The node schema object that defines
801 the structure of the data being loaded and generates the ingestion query.
802 dict_list (List[Dict[str, Any]]): The data to load to the graph, represented
803 as a list of dictionaries. Each dictionary represents one node to create
804 or update.
805 batch_size (int): The number of items to process per transaction. Defaults to 10000.
806 **kwargs: Additional keyword arguments passed to the Neo4j query, such as
807 timestamps, update tags, or other metadata.
808
809 Examples:
810 >>> node_schema = CartographyNodeSchema(
811 ... label='AWSUser',
812 ... properties={
813 ... 'id': PropertyRef('UserId'),
814 ... 'name': PropertyRef('UserName'),
815 ... 'email': PropertyRef('Email')
816 ... }
817 ... )
818 >>> users_data = [
819 ... {'UserId': 'user1', 'UserName': 'Alice', 'Email': 'alice@example.com'},
820 ... {'UserId': 'user2', 'UserName': 'Bob', 'Email': 'bob@example.com'}
821 ... ]
822 >>> load(session, node_schema, users_data, lastupdated=current_time)
823
824 Note:
825 - If ``dict_list`` is empty, the function returns early to save processing time.
826 - The function automatically creates necessary indexes before loading data.
827 - The ingestion query is generated automatically from the node schema.
828 - Data is processed in batches for optimal performance.
829 """
830 if batch_size <= 0:
831 raise ValueError(f"batch_size must be greater than 0, got {batch_size}")
832 if len(dict_list) == 0:
833 # If there is no data to load, save some time.
834 return
835 ensure_indexes(neo4j_session, node_schema)
836 ingestion_query = build_ingestion_query(node_schema)
837 load_graph_data(
838 neo4j_session, ingestion_query, dict_list, batch_size=batch_size, **kwargs
839 )
840
841 # Apply conditional labels if any are defined

Callers 15

syncFunction · 0.90
load_noticesFunction · 0.90
load_cvesFunction · 0.90
load_rolesFunction · 0.90
load_organizationsFunction · 0.90
syncFunction · 0.90
load_directory_usersFunction · 0.90
load_secretsFunction · 0.90
load_applicationsFunction · 0.90
load_api_keysFunction · 0.90

Calls 6

build_ingestion_queryFunction · 0.90
ensure_indexesFunction · 0.85
load_graph_dataFunction · 0.85
run_write_queryFunction · 0.85
incrMethod · 0.80