MCPcopy
hub / github.com/mudler/LocalAI / TokenAuthInterceptor

Class TokenAuthInterceptor

backend/python/common/grpc_auth.py:33–46  ·  view source on GitHub ↗

Sync gRPC server interceptor that validates a bearer token.

Source from the content-addressed store, hash-verified

31
32
33class TokenAuthInterceptor(grpc.ServerInterceptor):
34 """Sync gRPC server interceptor that validates a bearer token."""
35
36 def __init__(self, token: str):
37 self._token = token
38 self._abort_handler = _AbortHandler()
39
40 def intercept_service(self, continuation, handler_call_details):
41 metadata = dict(handler_call_details.invocation_metadata)
42 auth = metadata.get("authorization", "")
43 expected = "Bearer " + self._token
44 if not hmac.compare_digest(auth, expected):
45 return self._abort_handler
46 return continuation(handler_call_details)
47
48
49class AsyncTokenAuthInterceptor(grpc.aio.ServerInterceptor):

Callers 1

get_auth_interceptorsFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected