source: tools/suggest/suggest.py @ af36abb

abac0-leakabac0-meicompt_changesmei-idmei-rt0-nmei_rt0tvf-new-xml
Last change on this file since af36abb was af36abb, checked in by Ted Faber <faber@…>, 12 years ago

Suggest corrections from TIED partial proof

  • Property mode set to 100644
File size: 4.0 KB
Line 
1#!/usr/local/bin/python
2
3import ABAC
4import Creddy
5
6import os
7import re
8import copy
9import base64
10
11from tempfile import mkdtemp
12from shutil import rmtree
13
14class proof:
15    def __init__(self, name, prover, role, principal, creds):
16        self.name = name
17        self.prover = prover
18        self.role = role
19        self.principal = principal
20        self.ctxt = ABAC.Context()
21        self.keyid_to_cn = { }
22        self.cn_to_keyid = { }
23        attrs = []
24        try:
25            d = mkdtemp()
26            print d
27            for c in creds:
28                cc = self.pem_to_der(c)
29                succ = self.ctxt.load_id_chunk(cc) 
30                if succ == ABAC.ABAC_CERT_SUCCESS: 
31                    try:
32                        fn = os.path.join(d, 'file.pem')
33                        f = open(fn, 'w')
34                        f.write(cc)
35                        f.close()
36                        cid = Creddy.ID(fn)
37                        base_cn = cn = re.sub('_ID.pem$','',cid.cert_filename())
38                        i = 0
39                        while cn in self.cn_to_keyid:
40                            cn = '%s%03d' % (base_cn, i)
41                            i += 1
42                        self.cn_to_keyid[cn] = cid.keyid()
43                        self.keyid_to_cn[cid.keyid()] = cn
44                    except EnvironmentError, e:
45                        print >>sys.stderr, '%s: %s' % (e.filename, e.strerror)
46                else:
47                    attrs.append(cc)
48            for c in attrs:
49                self.ctxt.load_attribute_chunk(c) 
50        finally:
51            rmtree(d)
52
53
54    @staticmethod
55    def pem_to_der(c):
56        pat = '-----BEGIN CERTIFICATE-----(.*)-----END CERTIFICATE'
57        m = re.match(pat, c, re.DOTALL)
58        if m: return base64.b64decode(m.group(1))
59        else: return c
60
61    def replace_keyids(self, s):
62        for k, v in self.keyid_to_cn.items():
63            s = re.sub(k, v, s)
64        return s
65
66    def __str__(self):
67        s = 'Name: %s\n' % self.name
68        s += 'Prover: %s\n' % self.prover
69        s += 'Principal: %s\n' % self.principal
70        s += 'Role: %s\n' % self.role
71        s += 'Creds: \n'
72        for c in self.ctxt.credentials():
73            s += self.replace_keyids(
74                    '%s <- %s\n' % ( c.head().string(), c.tail().string()))
75        return s
76
77def read_proofs(fn):
78    proofs = {}
79    try:
80        f = open(fn, 'r')
81        creds = []
82        i = 0
83        for line in f:
84            line = line.strip()
85            if line == '<proof>':
86                prover = None
87                principal = None
88                role = None
89                creds = []
90            elif line == '</proof>' :
91                proofs[name] = proof(name, prover, role, principal, creds)
92                i += 1
93           
94            m = re.match('<comment>(.*)</comment>', line)
95            if m is not None:
96                name = m.group(1)
97
98            m = re.match('<principal>([0-9a-f]+)</principal>', line)
99            if m is not None:
100                principal = m.group(1)
101
102            m = re.match('<prover>([0-9a-f]+)</prover>', line)
103            if m is not None:
104                prover = m.group(1)
105
106            m = re.match('<attribute>(.*)</attribute>', line)
107            if m is not None:
108                role = m.group(1)
109
110            m = re.match('<credential>(.*)</credential>', line)
111            if m is not None:
112                creds.append(base64.b64decode(m.group(1)))
113        f.close()
114    except EnvironmentError, e:
115        return None
116    return proofs
117
118def interpret_proof(p):
119    print  p
120    roles = set()
121    principals = set()
122    goals = set()
123    attrs = set()
124    ok, proof = p.ctxt.query(p.role, p.principal)
125    for c in p.ctxt.credentials():
126        if c.head().is_principal(): principals.add(c.head().string())
127        else: roles.add(c.head().string())
128        if c.tail().is_principal(): principals.add(c.tail().string())
129        else: roles.add(c.tail().string())
130
131   
132    for r in roles:
133        ok, proof =  p.ctxt.query(p.role, r)
134        if ok :
135            goals.add(r)
136        ok, proof = p.ctxt.query(r, p.principal)
137        if ok:
138            attrs.add(r)
139
140    acting_for = []
141    for r in attrs:
142        m =re.match('([^\.]+)\\.acting_for', r)
143        if m:
144            acting_for.append(m.group(1))
145    split_goals = [ [s.strip() for s in g.split('&')] for g in goals ]
146
147    plans = []
148    for sg in split_goals:
149        p = []
150        for g in sg:
151            if g.endswith('acting_for'):
152                a = g[:-len('.acting_for')]
153                if len(acting_for) > 0:
154                    p.append('add %s to %s' % (a, acting_for[0]))
155                else:
156                    p.append('must get %s delegated to %s' % (a, principal))
157            else:
158                p.append('add %s to %s' % (g, principal))
159        plans.append('\n'.join(p))
160
161    return plans
162
163
164proofs = read_proofs('./auth.log')
165print proofs
166#interpret_proof(proofs['New (create) succeeded at 2012-07-06 15:24:21.122101'])
167plans = interpret_proof(proofs['Create failed at 2012-07-06 15:24:25.648260'])
168
169for p in plans:
170    print '---'
171    print p
Note: See TracBrowser for help on using the repository browser.