MCPcopy
hub / github.com/mudler/LocalAI / AsyncTokenAuthInterceptor

Class AsyncTokenAuthInterceptor

backend/python/common/grpc_auth.py:49–61  ·  view source on GitHub ↗

Async gRPC server interceptor that validates a bearer token.

Source from the content-addressed store, hash-verified

47
48
49class AsyncTokenAuthInterceptor(grpc.aio.ServerInterceptor):
50 """Async gRPC server interceptor that validates a bearer token."""
51
52 def __init__(self, token: str):
53 self._token = token
54
55 async def intercept_service(self, continuation, handler_call_details):
56 metadata = dict(handler_call_details.invocation_metadata)
57 auth = metadata.get("authorization", "")
58 expected = "Bearer " + self._token
59 if not hmac.compare_digest(auth, expected):
60 return _AbortHandler()
61 return await continuation(handler_call_details)
62
63
64def get_auth_interceptors(*, aio: bool = False):

Callers 1

get_auth_interceptorsFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected