Pick the label from the highest-priority labeler per entity. If multiple rows exist from the same priority labeler, the latest timestamp wins. Labelers not in the priority list are ranked lowest.
(
df: pd.DataFrame,
join_key_columns: List[str],
timestamp_field: str,
labeler_field: str,
labeler_priorities: Optional[List[str]] = None,
)
| 70 | |
| 71 | |
| 72 | def _resolve_labeler_priority( |
| 73 | df: pd.DataFrame, |
| 74 | join_key_columns: List[str], |
| 75 | timestamp_field: str, |
| 76 | labeler_field: str, |
| 77 | labeler_priorities: Optional[List[str]] = None, |
| 78 | ) -> pd.DataFrame: |
| 79 | """Pick the label from the highest-priority labeler per entity. |
| 80 | |
| 81 | If multiple rows exist from the same priority labeler, the latest timestamp |
| 82 | wins. Labelers not in the priority list are ranked lowest. |
| 83 | """ |
| 84 | if not labeler_priorities: |
| 85 | return _resolve_last_write_wins(df, join_key_columns, timestamp_field) |
| 86 | |
| 87 | priority_map = {name: i for i, name in enumerate(labeler_priorities)} |
| 88 | max_priority = len(labeler_priorities) |
| 89 | |
| 90 | df = df.copy() |
| 91 | df["_priority_rank"] = df[labeler_field].map( |
| 92 | lambda x: priority_map.get(x, max_priority) |
| 93 | ) |
| 94 | df_sorted = df.sort_values( |
| 95 | ["_priority_rank", timestamp_field], ascending=[True, False] |
| 96 | ) |
| 97 | result = df_sorted.drop_duplicates(subset=join_key_columns, keep="first") |
| 98 | result = result.drop(columns=["_priority_rank"]) |
| 99 | return result.reset_index(drop=True) |
| 100 | |
| 101 | |
| 102 | def _resolve_majority_vote( |
no test coverage detected