import sys
import json
import requests
""" TODO: Insert pydocString """
[docs]class Auth():
def __init__(self, certificate, key, auth_server="auth.iudx.org.in", version=1):
self.url = "https://" + auth_server + "/auth/v" + str(version)
self.credentials = (certificate, key)
[docs] def call(self, api, body=None):
ret = True # success
body = json.dumps(body)
response = requests.post(
url=self.url + "/" + api,
verify=True,
cert=self.credentials,
data=body,
headers={"content-type": "application/json"}
)
if response.status_code != 200:
sys.stderr.write(
"WARNING: auth API failure | " +
self.url + "/" + api + " | " +
response.reason + " | " +
response.text
)
ret = False # failed
if response.headers['content-type'] == 'application/json':
return {'success':ret, 'response':json.loads(response.text)}
else:
sys.stderr.write(
"WARNING: auth did not send 'application/json'"
)
return {'success':False, 'response':None}
[docs] def get_token(self, request, token_time=None, existing_token=None):
body = {'request': request}
if token_time:
body['token-time'] = token_time
if existing_token:
body['existing-token'] = existing_token
return self.call("token", body)
[docs] def get_certificate_info(self):
return self.call("certificate-info")
[docs] def get_policy(self):
return self.call("acl")
[docs] def set_policy(self, policy):
body = {'policy': policy}
return self.call("acl/set", body)
[docs] def append_policy(self, policy):
body = {'policy': policy}
return self.call("acl/append", body)
[docs] def introspect_token(self, token, server_token=None):
body = {'token': token}
if server_token:
if type(server_token) == type({}):
sys.stderr.write(
"ERROR: server-token cannot be a dictonary"
)
return {'success':False, 'response':None}
body['server-token'] = server_token
return self.call("token/introspect", body)
[docs] def revoke_tokens(self, tokens):
if type(tokens) is type([]):
body = {'tokens': tokens}
else:
body = {'tokens': [tokens]}
return self.call("token/revoke", body)
[docs] def revoke_token_hashes(self, token_hashes):
if type(token_hashes) is type([]):
body = {'token-hashes': token_hashes}
else:
body = {'token-hashes': [token_hashes]}
return self.call("token/revoke", body)
[docs] def revoke_all(self, serial, fingerprint):
body = {'serial':serial, 'fingerprint': fingerprint}
return self.call("token/revoke-all", body)
[docs] def audit_tokens(self, hours):
body = {'hours': hours}
return self.call("audit/tokens", body)
[docs] def add_consumer_to_group(self, consumer, group, valid_till):
body = {'consumer': consumer, 'group': group, 'valid-till': valid_till}
return self.call("group/add", body)
[docs] def delete_consumer_from_group(self, consumer, group):
body = {'consumer': consumer, 'group': group}
return self.call("group/delete", body)
[docs] def list_group(self, consumer, group=None):
body = {'consumer': consumer}
if group:
body['group'] = group
return self.call("group/list", body)