Search for new Kubernetes tools on GitHub
(self, days_back=7)
| 31 | } |
| 32 | |
| 33 | def search_new_tools(self, days_back=7): |
| 34 | """Search for new Kubernetes tools on GitHub""" |
| 35 | since = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d') |
| 36 | |
| 37 | search_queries = [ |
| 38 | f"kubernetes created:>{since} stars:>5", |
| 39 | f"kubectl plugin created:>{since} stars:>3", |
| 40 | f"topic:kubernetes created:>{since} stars:>3", |
| 41 | ] |
| 42 | |
| 43 | discovered_tools = [] |
| 44 | |
| 45 | for query in search_queries: |
| 46 | print(f"š Searching: {query}") |
| 47 | |
| 48 | try: |
| 49 | url = f"https://api.github.com/search/repositories" |
| 50 | params = { |
| 51 | 'q': query, |
| 52 | 'sort': 'stars', |
| 53 | 'order': 'desc', |
| 54 | 'per_page': 10 |
| 55 | } |
| 56 | |
| 57 | response = self.session.get(url, params=params) |
| 58 | |
| 59 | if response.status_code == 200: |
| 60 | data = response.json() |
| 61 | print(f" Found {data['total_count']} results") |
| 62 | |
| 63 | for repo in data['items']: |
| 64 | tool_info = self.extract_tool_info(repo) |
| 65 | if self.is_valid_kubernetes_tool(tool_info): |
| 66 | discovered_tools.append(tool_info) |
| 67 | |
| 68 | elif response.status_code == 403: |
| 69 | print(" ā ļø Rate limited, waiting...") |
| 70 | time.sleep(60) |
| 71 | else: |
| 72 | print(f" ā Error {response.status_code}") |
| 73 | |
| 74 | except Exception as e: |
| 75 | print(f" ā Exception: {e}") |
| 76 | |
| 77 | time.sleep(2) |
| 78 | |
| 79 | unique_tools = self.deduplicate_tools(discovered_tools) |
| 80 | print(f"\nā Found {len(unique_tools)} unique tools") |
| 81 | |
| 82 | return unique_tools |
| 83 | |
| 84 | def extract_tool_info(self, repo): |
| 85 | """Extract relevant information from GitHub repo""" |
no test coverage detected