Given a list of filepaths, read all flows and return a list of them. From a performance perspective, streaming would be advisable - however, if there's an error with one of the files, we want it to be raised immediately. Raises: FlowReadException, if any error occurs.
(paths)
| 95 | |
| 96 | |
| 97 | def read_flows_from_paths(paths) -> list[flow.Flow]: |
| 98 | """ |
| 99 | Given a list of filepaths, read all flows and return a list of them. |
| 100 | From a performance perspective, streaming would be advisable - |
| 101 | however, if there's an error with one of the files, we want it to be raised immediately. |
| 102 | |
| 103 | Raises: |
| 104 | FlowReadException, if any error occurs. |
| 105 | """ |
| 106 | try: |
| 107 | flows: list[flow.Flow] = [] |
| 108 | for path in paths: |
| 109 | path = os.path.expanduser(path) |
| 110 | with open(path, "rb") as f: |
| 111 | flows.extend(FlowReader(f).stream()) |
| 112 | except OSError as e: |
| 113 | raise exceptions.FlowReadException(e.strerror) |
| 114 | return flows |
nothing calls this directly
no test coverage detected
searching dependent graphs…