1 | #!/usr/bin/python |
---|
2 | |
---|
3 | from M2Crypto import SSL |
---|
4 | from M2Crypto.SSL import SSLError |
---|
5 | |
---|
6 | # If this is an old enough version of M2Crypto.SSL that has an |
---|
7 | # ssl_verify_callback that doesn't allow 0-length signed certs, create a |
---|
8 | # version of that callback that does. This is edited from the original in |
---|
9 | # M2Crypto.SSL.cb. This version also elides the printing to stderr. |
---|
10 | if not getattr(SSL.cb, 'ssl_verify_callback_allow_unknown_ca', None): |
---|
11 | from M2Crypto.SSL.Context import map |
---|
12 | from M2Crypto import m2 |
---|
13 | |
---|
14 | def ssl_verify_callback(ssl_ctx_ptr, x509_ptr, errnum, errdepth, ok): |
---|
15 | unknown_issuer = [ |
---|
16 | m2.X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY, |
---|
17 | m2.X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE, |
---|
18 | m2.X509_V_ERR_CERT_UNTRUSTED, |
---|
19 | m2.X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT |
---|
20 | ] |
---|
21 | ssl_ctx = map()[ssl_ctx_ptr] |
---|
22 | |
---|
23 | if errnum in unknown_issuer: |
---|
24 | if ssl_ctx.get_allow_unknown_ca(): |
---|
25 | ok = 1 |
---|
26 | # CRL checking goes here... |
---|
27 | if ok: |
---|
28 | if ssl_ctx.get_verify_depth() >= errdepth: |
---|
29 | ok = 1 |
---|
30 | else: |
---|
31 | ok = 0 |
---|
32 | return ok |
---|
33 | else: |
---|
34 | def ssl_verify_callback(ssl_ctx_ptr, x509_ptr, errnum, errdepth, ok): |
---|
35 | raise ValueError("This should never be called") |
---|
36 | |
---|
37 | class fedd_ssl_context(SSL.Context): |
---|
38 | """ |
---|
39 | Simple wrapper around an M2Crypto.SSL.Context to initialize it for fedd. |
---|
40 | """ |
---|
41 | def __init__(self, my_cert, trusted_certs=None, password=None): |
---|
42 | """ |
---|
43 | construct a fedd_ssl_context |
---|
44 | |
---|
45 | @param my_cert: PEM file with my certificate in it |
---|
46 | @param trusted_certs: PEM file with trusted certs in it (optional) |
---|
47 | """ |
---|
48 | SSL.Context.__init__(self) |
---|
49 | |
---|
50 | # load_cert takes a callback to get a password, not a password, so if |
---|
51 | # the caller provided a password, this creates a nonce callback using a |
---|
52 | # lambda form. |
---|
53 | if password != None and not callable(password): |
---|
54 | # This is cute. password = lambda *args: password produces a |
---|
55 | # function object that returns itself rather than one that returns |
---|
56 | # the object itself. This is because password is an object |
---|
57 | # reference and after the assignment it's a lambda. So we assign |
---|
58 | # to a temp. |
---|
59 | pwd = str(password) |
---|
60 | password =lambda *args: pwd |
---|
61 | |
---|
62 | # The calls to str below (and above) are because the underlying SSL |
---|
63 | # stuff is intolerant of unicode. |
---|
64 | if password != None: |
---|
65 | self.load_cert(str(my_cert), callback=password) |
---|
66 | else: |
---|
67 | self.load_cert(str(my_cert)) |
---|
68 | |
---|
69 | return |
---|
70 | |
---|
71 | # If no trusted certificates are specified, allow unknown CAs. |
---|
72 | if trusted_certs: |
---|
73 | self.load_verify_locations(trusted_certs) |
---|
74 | self.set_verify(SSL.verify_peer, 10) |
---|
75 | else: |
---|
76 | # More legacy code. Recent versions of M2Crypto express the |
---|
77 | # allow_unknown_ca option through a callback turned to allow it. |
---|
78 | # Older versions use a standard callback that respects the |
---|
79 | # attribute. This should work under both regines. |
---|
80 | callb = getattr(SSL.cb, 'ssl_verify_callback_allow_unknown_ca', |
---|
81 | ssl_verify_callback) |
---|
82 | self.set_allow_unknown_ca(True) |
---|
83 | self.set_verify(SSL.verify_peer, 10, callback=callb) |
---|