| 729 | |
| 730 | @derived_from(pd.DataFrame) |
| 731 | def isin(self, values): |
| 732 | if isinstance(self, DataFrame): |
| 733 | # DataFrame.isin does weird alignment stuff |
| 734 | bad_types = (FrameBase, pd.Series, pd.DataFrame) |
| 735 | else: |
| 736 | bad_types = (FrameBase,) |
| 737 | if isinstance(values, bad_types): |
| 738 | if ( |
| 739 | isinstance(values, FrameBase) |
| 740 | and values.ndim == 1 |
| 741 | and values.npartitions == 1 |
| 742 | ): |
| 743 | # Can broadcast |
| 744 | return new_collection(expr.Isin(self, values=values)) |
| 745 | raise NotImplementedError(f"Passing a {typename(type(values))!r} to `isin`") |
| 746 | |
| 747 | # We wrap values in a delayed for two reasons: |
| 748 | # - avoid serializing data in every task |
| 749 | # - avoid cost of traversal of large list in optimizations |
| 750 | if isinstance(values, list): |
| 751 | # Motivated by https://github.com/dask/dask/issues/9411. This appears to be |
| 752 | # caused by https://github.com/dask/distributed/issues/6368, and further |
| 753 | # exacerbated by the fact that the list contains duplicates. This is a patch until |
| 754 | # we can create a better fix for Serialization. |
| 755 | try: |
| 756 | values = list(set(values)) |
| 757 | except TypeError: |
| 758 | pass |
| 759 | if not any(is_dask_collection(v) for v in values): |
| 760 | # Avoid always passing a numpy array of object dtype |
| 761 | inferred_type = pd.api.types.infer_dtype(values, skipna=False) |
| 762 | object_like = { |
| 763 | "mixed-integer", |
| 764 | "decimal", |
| 765 | "categorical", |
| 766 | "time", |
| 767 | "period", |
| 768 | "mixed", |
| 769 | "unknown-array", |
| 770 | } |
| 771 | if inferred_type in object_like: |
| 772 | values = np.fromiter(values, dtype=object, count=len(values)) |
| 773 | else: |
| 774 | values = np.asarray(values) |
| 775 | |
| 776 | return new_collection( |
| 777 | expr.Isin( |
| 778 | self, |
| 779 | values=expr._DelayedExpr( |
| 780 | delayed(values, name=f"delayed-{_tokenize_deterministic(values)}") |
| 781 | ), |
| 782 | ) |
| 783 | ) |
| 784 | |
| 785 | def _partitions(self, index): |
| 786 | # Used by `partitions` for partition-wise slicing |