#!/usr/local/bin/python
import ABAC
import Creddy
import os
import re
import copy
import base64
from tempfile import mkdtemp
from shutil import rmtree
class proof:
def __init__(self, name, prover, role, principal, creds):
self.name = name
self.prover = prover
self.role = role
self.principal = principal
self.ctxt = ABAC.Context()
self.keyid_to_cn = { }
self.cn_to_keyid = { }
attrs = []
try:
d = mkdtemp()
for c in creds:
cc = self.pem_to_der(c)
succ = self.ctxt.load_id_chunk(cc)
if succ == ABAC.ABAC_CERT_SUCCESS:
try:
fn = os.path.join(d, 'file.pem')
f = open(fn, 'w')
f.write(cc)
f.close()
cid = Creddy.ID(fn)
base_cn = cn = re.sub('_ID.pem$','',cid.cert_filename())
i = 0
while cn in self.cn_to_keyid:
cn = '%s%03d' % (base_cn, i)
i += 1
self.cn_to_keyid[cn] = cid.keyid()
self.keyid_to_cn[cid.keyid()] = cn
except EnvironmentError, e:
print >>sys.stderr, '%s: %s' % (e.filename, e.strerror)
else:
attrs.append(cc)
for c in attrs:
self.ctxt.load_attribute_chunk(c)
finally:
rmtree(d)
@staticmethod
def pem_to_der(c):
pat = '-----BEGIN CERTIFICATE-----(.*)-----END CERTIFICATE'
m = re.match(pat, c, re.DOTALL)
if m: return base64.b64decode(m.group(1))
else: return c
def replace_keyids(self, s):
for k, v in self.keyid_to_cn.items():
s = re.sub(k, v, s)
return s
def __str__(self):
s = 'Name: %s\n' % self.name
s += 'Prover: %s\n' % self.prover
s += 'Principal: %s\n' % self.principal
s += 'Role: %s\n' % self.role
s += 'Creds: \n'
for c in self.ctxt.credentials():
s += self.replace_keyids(
'%s <- %s\n' % ( c.head().string(), c.tail().string()))
return s
def read_proofs(fn):
proofs = []
try:
f = open(fn, 'r')
creds = []
for line in f:
line = line.strip()
if line == '':
prover = None
principal = None
role = None
creds = []
elif line == '' :
p = proof(name, prover, role, principal, creds)
ok, pp = p.ctxt.query(role, principal)
if not ok: proofs.append(p)
m = re.match('(.*)', line)
if m is not None:
name = m.group(1)
m = re.match('([0-9a-f]+)', line)
if m is not None:
principal = m.group(1)
m = re.match('([0-9a-f]+)', line)
if m is not None:
prover = m.group(1)
m = re.match('(.*)', line)
if m is not None:
role = m.group(1)
m = re.match('(.*)', line)
if m is not None:
creds.append(base64.b64decode(m.group(1)))
f.close()
except EnvironmentError, e:
return None
return proofs
def interpret_proof(p):
roles = set()
direct_roles = {}
groles = set()
principals = set()
goals = set()
attrs = set()
ok, proof = p.ctxt.query(p.role, p.principal)
for c in p.ctxt.credentials():
role = c.tail()
if role.is_principal():
if role.string() != p.principal:
principals.add(role.string())
else:
assigner, r = c.head().string().split('.')
direct_roles[r] = assigner
else:
r = role.string()
for s in r.split('&'):
roles.add(s.strip())
groles.add(r)
role = c.head()
roles.add(role.string())
groles.add(role.string())
for r in groles:
ok, proof = p.ctxt.query(p.role, r)
if ok :
goals.add(r)
for r in roles:
ok, proof = p.ctxt.query(r, p.principal)
if ok:
attrs.add(r)
split_goals = [ [s.strip() for s in g.split('&')] for g in goals ]
plans = []
for sg in split_goals:
p = []
for g in sg:
if g.count('.') == 2:
# linking role
pr, rr, lr = g.split('.')
if lr in direct_roles:
p.append('add %s.%s to %s' % (pr, rr, direct_roles[lr]))
else:
p.append('someone with %s.%s must delegate %s to %s' % \
(pr, rr, lr, p.principal))
elif g.count('.') == 1:
p.append('add %s to %s' % (g, principal))
plans.append('\n'.join(p))
return plans
proofs = read_proofs('./auth.log')
print len(proofs)
plans = interpret_proof(proofs[0])
for p in proofs:
plans = interpret_proof(p)
print "%s needs to have %s" % (p.principal, p.role)
print "accompilsh this by: "
first = True
for pl in plans:
if not first: print '---'
else: first = False
print pl