Process files concurrently with semaphore limiting
(self, files_to_analyze: list)
| 1068 | return file_summaries, all_relationships |
| 1069 | |
| 1070 | async def _process_files_concurrently(self, files_to_analyze: list) -> tuple: |
| 1071 | """Process files concurrently with semaphore limiting""" |
| 1072 | file_summaries = [] |
| 1073 | all_relationships = [] |
| 1074 | |
| 1075 | # Create semaphore to limit concurrent tasks |
| 1076 | semaphore = asyncio.Semaphore(self.max_concurrent_files) |
| 1077 | tasks = [] |
| 1078 | |
| 1079 | async def _process_with_semaphore(file_path: Path, index: int, total: int): |
| 1080 | async with semaphore: |
| 1081 | # Add a small delay to space out concurrent requests |
| 1082 | if index > 1: |
| 1083 | await asyncio.sleep( |
| 1084 | self.request_delay * 0.5 |
| 1085 | ) # Reduced delay for concurrent processing |
| 1086 | return await self._analyze_single_file_with_relationships( |
| 1087 | file_path, index, total |
| 1088 | ) |
| 1089 | |
| 1090 | try: |
| 1091 | # Create tasks for all files |
| 1092 | tasks = [ |
| 1093 | _process_with_semaphore(file_path, i, len(files_to_analyze)) |
| 1094 | for i, file_path in enumerate(files_to_analyze, 1) |
| 1095 | ] |
| 1096 | |
| 1097 | # Process tasks and collect results |
| 1098 | if self.verbose_output: |
| 1099 | self.logger.info( |
| 1100 | f"Starting concurrent analysis of {len(tasks)} files..." |
| 1101 | ) |
| 1102 | |
| 1103 | try: |
| 1104 | results = await asyncio.gather(*tasks, return_exceptions=True) |
| 1105 | |
| 1106 | for i, result in enumerate(results): |
| 1107 | if isinstance(result, Exception): |
| 1108 | self.logger.error( |
| 1109 | f"Failed to analyze file {files_to_analyze[i]}: {result}" |
| 1110 | ) |
| 1111 | # Create error summary |
| 1112 | error_summary = FileSummary( |
| 1113 | file_path=str( |
| 1114 | files_to_analyze[i].relative_to(self.code_base_path) |
| 1115 | ), |
| 1116 | file_type="error", |
| 1117 | main_functions=[], |
| 1118 | key_concepts=[], |
| 1119 | dependencies=[], |
| 1120 | summary=f"Concurrent analysis failed: {str(result)}", |
| 1121 | lines_of_code=0, |
| 1122 | last_modified="", |
| 1123 | ) |
| 1124 | file_summaries.append(error_summary) |
| 1125 | else: |
| 1126 | file_summary, relationships = result |
| 1127 | file_summaries.append(file_summary) |
no test coverage detected