MCPcopy Index your code
hub / github.com/pre-commit/pre-commit / _new_repo

Method _new_repo

pre_commit/store.py:132–175  ·  view source on GitHub ↗
(
            self,
            repo: str,
            ref: str,
            deps: Sequence[str],
            make_strategy: Callable[[str], None],
    )

Source from the content-addressed store, hash-verified

130 return repo
131
132 def _new_repo(
133 self,
134 repo: str,
135 ref: str,
136 deps: Sequence[str],
137 make_strategy: Callable[[str], None],
138 ) -> str:
139 original_repo = repo
140 repo = self.db_repo_name(repo, deps)
141
142 def _get_result() -> str | None:
143 # Check if we already exist
144 with self.connect() as db:
145 result = db.execute(
146 'SELECT path FROM repos WHERE repo = ? AND ref = ?',
147 (repo, ref),
148 ).fetchone()
149 return result[0] if result else None
150
151 result = _get_result()
152 if result:
153 return result
154 with self.exclusive_lock():
155 # Another process may have already completed this work
156 result = _get_result()
157 if result: # pragma: no cover (race)
158 return result
159
160 logger.info(f'Initializing environment for {repo}.')
161
162 directory = tempfile.mkdtemp(prefix='repo', dir=self.directory)
163 with clean_path_on_failure(directory):
164 make_strategy(directory)
165
166 # Update our db with the created repo
167 with self.connect() as db:
168 db.execute(
169 'INSERT INTO repos (repo, ref, path) VALUES (?, ?, ?)',
170 [repo, ref, directory],
171 )
172
173 clientlib.warn_for_stages_on_repo_init(original_repo, directory)
174
175 return directory
176
177 def _complete_clone(self, ref: str, git_cmd: Callable[..., None]) -> None:
178 """Perform a complete clone of a repository and its submodules """

Callers 2

cloneMethod · 0.95
make_localMethod · 0.95

Calls 4

db_repo_nameMethod · 0.95
exclusive_lockMethod · 0.95
connectMethod · 0.95
clean_path_on_failureFunction · 0.90

Tested by

no test coverage detected