#!/usr/local/bin/python
import ABAC
import Creddy
import os
import re
import copy
import base64
from tempfile import mkdtemp
from shutil import rmtree
class proof:
def __init__(self, name, prover, role, principal, creds):
self.name = name
self.prover = prover
self.role = role
self.principal = principal
self.ctxt = ABAC.Context()
self.keyid_to_cn = { }
self.cn_to_keyid = { }
attrs = []
try:
d = mkdtemp()
print d
for c in creds:
cc = self.pem_to_der(c)
succ = self.ctxt.load_id_chunk(cc)
if succ == ABAC.ABAC_CERT_SUCCESS:
try:
fn = os.path.join(d, 'file.pem')
f = open(fn, 'w')
f.write(cc)
f.close()
cid = Creddy.ID(fn)
base_cn = cn = re.sub('_ID.pem$','',cid.cert_filename())
i = 0
while cn in self.cn_to_keyid:
cn = '%s%03d' % (base_cn, i)
i += 1
self.cn_to_keyid[cn] = cid.keyid()
self.keyid_to_cn[cid.keyid()] = cn
except EnvironmentError, e:
print >>sys.stderr, '%s: %s' % (e.filename, e.strerror)
else:
attrs.append(cc)
for c in attrs:
self.ctxt.load_attribute_chunk(c)
finally:
rmtree(d)
@staticmethod
def pem_to_der(c):
pat = '-----BEGIN CERTIFICATE-----(.*)-----END CERTIFICATE'
m = re.match(pat, c, re.DOTALL)
if m: return base64.b64decode(m.group(1))
else: return c
def replace_keyids(self, s):
for k, v in self.keyid_to_cn.items():
s = re.sub(k, v, s)
return s
def __str__(self):
s = 'Name: %s\n' % self.name
s += 'Prover: %s\n' % self.prover
s += 'Principal: %s\n' % self.principal
s += 'Role: %s\n' % self.role
s += 'Creds: \n'
for c in self.ctxt.credentials():
s += self.replace_keyids(
'%s <- %s\n' % ( c.head().string(), c.tail().string()))
return s
def read_proofs(fn):
proofs = {}
try:
f = open(fn, 'r')
creds = []
i = 0
for line in f:
line = line.strip()
if line == '':
prover = None
principal = None
role = None
creds = []
elif line == '' :
proofs[name] = proof(name, prover, role, principal, creds)
i += 1
m = re.match('(.*)', line)
if m is not None:
name = m.group(1)
m = re.match('([0-9a-f]+)', line)
if m is not None:
principal = m.group(1)
m = re.match('([0-9a-f]+)', line)
if m is not None:
prover = m.group(1)
m = re.match('(.*)', line)
if m is not None:
role = m.group(1)
m = re.match('(.*)', line)
if m is not None:
creds.append(base64.b64decode(m.group(1)))
f.close()
except EnvironmentError, e:
return None
return proofs
def interpret_proof(p):
print p
roles = set()
principals = set()
goals = set()
attrs = set()
ok, proof = p.ctxt.query(p.role, p.principal)
for c in p.ctxt.credentials():
if c.head().is_principal(): principals.add(c.head().string())
else: roles.add(c.head().string())
if c.tail().is_principal(): principals.add(c.tail().string())
else: roles.add(c.tail().string())
for r in roles:
ok, proof = p.ctxt.query(p.role, r)
if ok :
goals.add(r)
ok, proof = p.ctxt.query(r, p.principal)
if ok:
attrs.add(r)
acting_for = []
for r in attrs:
m =re.match('([^\.]+)\\.acting_for', r)
if m:
acting_for.append(m.group(1))
split_goals = [ [s.strip() for s in g.split('&')] for g in goals ]
plans = []
for sg in split_goals:
p = []
for g in sg:
if g.endswith('acting_for'):
a = g[:-len('.acting_for')]
if len(acting_for) > 0:
p.append('add %s to %s' % (a, acting_for[0]))
else:
p.append('must get %s delegated to %s' % (a, principal))
else:
p.append('add %s to %s' % (g, principal))
plans.append('\n'.join(p))
return plans
proofs = read_proofs('./auth.log')
print proofs
#interpret_proof(proofs['New (create) succeeded at 2012-07-06 15:24:21.122101'])
plans = interpret_proof(proofs['Create failed at 2012-07-06 15:24:25.648260'])
for p in plans:
print '---'
print p