Resolve label conflicts by applying the configured policy. Args: df: Full history DataFrame (all rows, not deduplicated). join_key_columns: Entity key column names. feature_name_columns: Label/feature column names. timestamp_field: Event timestamp column name.
(
df: pd.DataFrame,
join_key_columns: List[str],
feature_name_columns: List[str],
timestamp_field: str,
labeler_field: str,
conflict_policy: ConflictPolicy,
labeler_priorities: Optional[List[str]] = None,
)
| 17 | |
| 18 | |
| 19 | def resolve_conflicts( |
| 20 | df: pd.DataFrame, |
| 21 | join_key_columns: List[str], |
| 22 | feature_name_columns: List[str], |
| 23 | timestamp_field: str, |
| 24 | labeler_field: str, |
| 25 | conflict_policy: ConflictPolicy, |
| 26 | labeler_priorities: Optional[List[str]] = None, |
| 27 | ) -> pd.DataFrame: |
| 28 | """Resolve label conflicts by applying the configured policy. |
| 29 | |
| 30 | Args: |
| 31 | df: Full history DataFrame (all rows, not deduplicated). |
| 32 | join_key_columns: Entity key column names. |
| 33 | feature_name_columns: Label/feature column names. |
| 34 | timestamp_field: Event timestamp column name. |
| 35 | labeler_field: Column identifying who wrote the label. |
| 36 | conflict_policy: The resolution strategy to apply. |
| 37 | labeler_priorities: Ordered list of labelers from highest to lowest |
| 38 | priority. Required for LABELER_PRIORITY policy. |
| 39 | |
| 40 | Returns: |
| 41 | DataFrame with one resolved row per unique entity key combination. |
| 42 | """ |
| 43 | if df.empty: |
| 44 | return df |
| 45 | |
| 46 | if conflict_policy == ConflictPolicy.LAST_WRITE_WINS: |
| 47 | return _resolve_last_write_wins(df, join_key_columns, timestamp_field) |
| 48 | elif conflict_policy == ConflictPolicy.LABELER_PRIORITY: |
| 49 | return _resolve_labeler_priority( |
| 50 | df, join_key_columns, timestamp_field, labeler_field, labeler_priorities |
| 51 | ) |
| 52 | elif conflict_policy == ConflictPolicy.MAJORITY_VOTE: |
| 53 | return _resolve_majority_vote( |
| 54 | df, join_key_columns, feature_name_columns, timestamp_field |
| 55 | ) |
| 56 | else: |
| 57 | return _resolve_last_write_wins(df, join_key_columns, timestamp_field) |
| 58 | |
| 59 | |
| 60 | def _resolve_last_write_wins( |
no test coverage detected