Joins collection with another collection. Other collection must be one of the following: 1. An iterable. We recommend tuples over lists for internal performance reasons. 2. A delayed object, pointing to a tuple. This is recommended if the other c
(self, other, on_self, on_other=None)
| 1149 | return self.var(ddof=ddof).apply(math.sqrt) |
| 1150 | |
| 1151 | def join(self, other, on_self, on_other=None): |
| 1152 | """Joins collection with another collection. |
| 1153 | |
| 1154 | Other collection must be one of the following: |
| 1155 | |
| 1156 | 1. An iterable. We recommend tuples over lists for internal |
| 1157 | performance reasons. |
| 1158 | 2. A delayed object, pointing to a tuple. This is recommended if the |
| 1159 | other collection is sizable and you're using the distributed |
| 1160 | scheduler. Dask is able to pass around data wrapped in delayed |
| 1161 | objects with greater sophistication. |
| 1162 | 3. A Bag with a single partition |
| 1163 | |
| 1164 | You might also consider Dask Dataframe, whose join operations are much |
| 1165 | more heavily optimized. |
| 1166 | |
| 1167 | Parameters |
| 1168 | ---------- |
| 1169 | other: Iterable, Delayed, Bag |
| 1170 | Other collection on which to join |
| 1171 | on_self: callable |
| 1172 | Function to call on elements in this collection to determine a |
| 1173 | match |
| 1174 | on_other: callable (defaults to on_self) |
| 1175 | Function to call on elements in the other collection to determine a |
| 1176 | match |
| 1177 | |
| 1178 | Examples |
| 1179 | -------- |
| 1180 | >>> import dask.bag as db |
| 1181 | >>> people = db.from_sequence(['Alice', 'Bob', 'Charlie']) |
| 1182 | >>> fruit = ['Apple', 'Apricot', 'Banana'] |
| 1183 | >>> list(people.join(fruit, lambda x: x[0])) |
| 1184 | [('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')] |
| 1185 | """ |
| 1186 | token = tokenize(self, other, on_self, on_other) |
| 1187 | name = f"join-{token}" |
| 1188 | dsk = {} |
| 1189 | if isinstance(other, Bag): |
| 1190 | if other.npartitions == 1: |
| 1191 | dsk.update(other.dask) |
| 1192 | other = other.__dask_keys__()[0] |
| 1193 | dsk[f"join-{name}-other"] = (list, other) |
| 1194 | else: |
| 1195 | msg = ( |
| 1196 | "Multi-bag joins are not implemented. " |
| 1197 | "We recommend Dask dataframe if appropriate" |
| 1198 | ) |
| 1199 | raise NotImplementedError(msg) |
| 1200 | elif isinstance(other, Delayed): |
| 1201 | dsk.update(other.dask) |
| 1202 | other = other._key |
| 1203 | elif not isinstance(other, Iterable): |
| 1204 | msg = ( |
| 1205 | "Joined argument must be single-partition Bag, " |
| 1206 | f" delayed object, or Iterable, got {type(other).__name}" |
| 1207 | ) |
| 1208 | raise TypeError(msg) |