()
| 190 | |
| 191 | @contextmanager |
| 192 | def lock_pex_project_dir(): |
| 193 | # type: () -> Iterator[None] |
| 194 | |
| 195 | if "_PEX_LOCKED_PROJECT_DIR" in os.environ: |
| 196 | # N.B.: We're in a subprocess of a root Pex invocation that holds the lock; so it's |
| 197 | # ok to proceed. |
| 198 | yield |
| 199 | return |
| 200 | |
| 201 | from pex.fs import lock |
| 202 | from pex.fs.lock import FileLockStyle |
| 203 | |
| 204 | lck = lock.acquire(__file__ + ".lck", exclusive=True, style=FileLockStyle.BSD) |
| 205 | os.environ["_PEX_LOCKED_PROJECT_DIR"] = str(os.getpid()) |
| 206 | try: |
| 207 | yield |
| 208 | finally: |
| 209 | lck.release() |
| 210 | del os.environ["_PEX_LOCKED_PROJECT_DIR"] |
| 211 | |
| 212 | |
| 213 | @contextmanager |
no test coverage detected