MCPcopy
hub / github.com/collabnix/kubetools / search_new_tools

Method search_new_tools

scripts/discover_tools.py:33–82  Ā·  view source on GitHub ↗

Search for new Kubernetes tools on GitHub

(self, days_back=7)

Source from the content-addressed store, hash-verified

31 }
32
33 def search_new_tools(self, days_back=7):
34 """Search for new Kubernetes tools on GitHub"""
35 since = (datetime.now() - timedelta(days=days_back)).strftime('%Y-%m-%d')
36
37 search_queries = [
38 f"kubernetes created:>{since} stars:>5",
39 f"kubectl plugin created:>{since} stars:>3",
40 f"topic:kubernetes created:>{since} stars:>3",
41 ]
42
43 discovered_tools = []
44
45 for query in search_queries:
46 print(f"šŸ” Searching: {query}")
47
48 try:
49 url = f"https://api.github.com/search/repositories"
50 params = {
51 'q': query,
52 'sort': 'stars',
53 'order': 'desc',
54 'per_page': 10
55 }
56
57 response = self.session.get(url, params=params)
58
59 if response.status_code == 200:
60 data = response.json()
61 print(f" Found {data['total_count']} results")
62
63 for repo in data['items']:
64 tool_info = self.extract_tool_info(repo)
65 if self.is_valid_kubernetes_tool(tool_info):
66 discovered_tools.append(tool_info)
67
68 elif response.status_code == 403:
69 print(" āš ļø Rate limited, waiting...")
70 time.sleep(60)
71 else:
72 print(f" āŒ Error {response.status_code}")
73
74 except Exception as e:
75 print(f" āŒ Exception: {e}")
76
77 time.sleep(2)
78
79 unique_tools = self.deduplicate_tools(discovered_tools)
80 print(f"\nāœ… Found {len(unique_tools)} unique tools")
81
82 return unique_tools
83
84 def extract_tool_info(self, repo):
85 """Extract relevant information from GitHub repo"""

Callers 1

mainFunction Ā· 0.95

Calls 3

extract_tool_infoMethod Ā· 0.95
is_valid_kubernetes_toolMethod Ā· 0.95
deduplicate_toolsMethod Ā· 0.95

Tested by

no test coverage detected