Args: df (DataFlow): the DataFlow to serialize. path (str): output path. Either a directory or an lmdb file. write_frequency (int): the frequency to write back data to disk. A smaller value reduces memory usage.
(df, path, write_frequency=5000)
| 44 | """ |
| 45 | @staticmethod |
| 46 | def save(df, path, write_frequency=5000): |
| 47 | """ |
| 48 | Args: |
| 49 | df (DataFlow): the DataFlow to serialize. |
| 50 | path (str): output path. Either a directory or an lmdb file. |
| 51 | write_frequency (int): the frequency to write back data to disk. |
| 52 | A smaller value reduces memory usage. |
| 53 | """ |
| 54 | assert isinstance(df, DataFlow), type(df) |
| 55 | isdir = os.path.isdir(path) |
| 56 | if isdir: |
| 57 | assert not os.path.isfile(os.path.join(path, 'data.mdb')), "LMDB file exists!" |
| 58 | else: |
| 59 | assert not os.path.isfile(path), "LMDB file {} exists!".format(path) |
| 60 | # It's OK to use super large map_size on Linux, but not on other platforms |
| 61 | # See: https://github.com/NVIDIA/DIGITS/issues/206 |
| 62 | map_size = 1099511627776 * 2 if platform.system() == 'Linux' else 128 * 10**6 |
| 63 | db = lmdb.open(path, subdir=isdir, |
| 64 | map_size=map_size, readonly=False, |
| 65 | meminit=False, map_async=True) # need sync() at the end |
| 66 | size = _reset_df_and_get_size(df) |
| 67 | |
| 68 | # put data into lmdb, and doubling the size if full. |
| 69 | # Ref: https://github.com/NVIDIA/DIGITS/pull/209/files |
| 70 | def put_or_grow(txn, key, value): |
| 71 | try: |
| 72 | txn.put(key, value) |
| 73 | return txn |
| 74 | except lmdb.MapFullError: |
| 75 | pass |
| 76 | txn.abort() |
| 77 | curr_size = db.info()['map_size'] |
| 78 | new_size = curr_size * 2 |
| 79 | logger.info("Doubling LMDB map_size to {:.2f}GB".format(new_size / 10**9)) |
| 80 | db.set_mapsize(new_size) |
| 81 | txn = db.begin(write=True) |
| 82 | txn = put_or_grow(txn, key, value) |
| 83 | return txn |
| 84 | |
| 85 | with get_tqdm(total=size) as pbar: |
| 86 | idx = -1 |
| 87 | |
| 88 | # LMDB transaction is not exception-safe! |
| 89 | # although it has a context manager interface |
| 90 | txn = db.begin(write=True) |
| 91 | for idx, dp in enumerate(df): |
| 92 | txn = put_or_grow(txn, u'{:08}'.format(idx).encode('ascii'), dumps(dp)) |
| 93 | pbar.update() |
| 94 | if (idx + 1) % write_frequency == 0: |
| 95 | txn.commit() |
| 96 | txn = db.begin(write=True) |
| 97 | txn.commit() |
| 98 | |
| 99 | keys = [u'{:08}'.format(k).encode('ascii') for k in range(idx + 1)] |
| 100 | with db.begin(write=True) as txn: |
| 101 | txn = put_or_grow(txn, b'__keys__', dumps(keys)) |
| 102 | |
| 103 | logger.info("Flushing database ...") |