-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdkim.pyx
executable file
·206 lines (152 loc) · 8.3 KB
/
dkim.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from libc.stdlib cimport malloc, free
from libc.string cimport memset
from dkim cimport signer_eml_header, signer_sign, signer_init, signer_quit
from email.message import EmailMessage
import exceptions
class CdkimSignerContextManager:
def __enter__(self):
signer_init()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
signer_quit()
class Dkim:
def __init__(self):
self.dkim_statuses_to_exceptions = {
1: exceptions.SignatureAvailableButFailed,
2: exceptions.NoSignatureAvailable,
3: exceptions.PublicKeyNotFound,
4: exceptions.CantGetDomainKeyToVerify,
5: exceptions.MessageNotValidSyntax,
6: exceptions.RessourceUnavailable,
7: exceptions.InternalError,
8: exceptions.KeyFoundButHasBeenRevoked,
11: exceptions.KeyRetrievalFailed,
16: exceptions.GotMultipleDNSReplies,
17: exceptions.SignatureGenerationFailed
}
def get_canonicalized_body(self):
return '\r\n\r\n'.join(str(self.message).replace('\n', '\r\n' # A canonicalized mail body lines must end with a CRLF as per defined in RFC6376
).split('\r\n'*2)[1:]).encode()
def _retrieve_original_case_sensitive_string_from_message(self, key):
"""Used as we don't want to force the user providing `headers` being in a case-sensitive form."""
for header_key in self.message.keys():
if key.lower() == header_key.lower():
return header_key
raise exceptions.SpecifiedHeaderDoesNotExistsInProvidedMessage()
class Signer(Dkim):
def __init__(self, message: EmailMessage, selector: str, signing_domain: str, secret_key: bytes,
header_canon='relaxed', body_canon='relaxed', headers: list=None):
"""
:param headers: A list of header keys to be included in the signature, the "From" header is absolutely mandatory
as defined in RFC6376 hence not required in this list.
:param message: The email message to be signed.
:param selector: The domain selector used for the signature. The selector is used as a key
for us to be able to retrieve our dkim public key for domain-registration verification.
:param signing_domain: The domain name used for signing the email.
:param secret_key: The raw-format RSA-private-key used to sign the email.
:param header_canon: Canonicalizationo algorithm to use, either `"simple"` or `"relaxed"` (see RFC6376 for more details).
"""
super().__init__()
# Otherwise would be a mutable argument
if headers is None:
self.headers = {'from'}
else:
self.headers = set(header_key for header_key in headers)
self.headers.add('from')
self.message = message
self.selector = selector
self.signing_domain = signing_domain
self.secret_key = secret_key
self.header_canon = header_canon
self.body_canon = body_canon
def get_signature_header(self, normalized: bool=True) -> str:
"""
:param normalized: Should the `"DKIM-Signature: "` string be included before the signature.
:raises: Any of `exceptions` if unable to sign the message.
"""
for header in self.headers:
if not isinstance(header, str):
raise TypeError("Argument 'headers' has incorrect type (expected List[str], got List[bytes])")
allowed_canon = ('simple', 'relaxed')
if self.header_canon not in allowed_canon or self.body_canon not in allowed_canon:
raise ValueError('body_canon amd header_canon must be either `simple` or `relaxed` as per defined in RFC6376')
cdef signer_sign_pm signer_sign_pm
## Allocate stack data
# It must be done so that the objects created by encode persist in memory (encode creates temporary objects).
# Then, they can be refered to using pointers.
encoded_signing_domain = self.signing_domain.encode()
encoded_selector = self.selector.encode()
encoded_body = self.get_canonicalized_body()
signer_sign_pm.signing_domain = encoded_signing_domain
signer_sign_pm.selector = encoded_selector
signer_sign_pm.body = encoded_body
signer_sign_pm.body_size = len(signer_sign_pm.body)
signer_sign_pm.secret_key = self.secret_key
cdef unsigned char out_signature_buffer[SIGNER_MAX_OUTPUT_SIGNATURE_BUFFER_SIZE]
memset(out_signature_buffer, 0, SIGNER_MAX_OUTPUT_SIGNATURE_BUFFER_SIZE)
signer_sign_pm.out_signature_buffer = out_signature_buffer
signer_sign_pm.out_signature_buffer_size = SIGNER_MAX_OUTPUT_SIGNATURE_BUFFER_SIZE
signer_sign_pm.header_array_length = len(self.headers)
signer_sign_pm.header_array = <signer_eml_header*> malloc(sizeof(signer_eml_header) * signer_sign_pm.header_array_length)
signer_sign_pm.dkim_header_canon = 0 if self.header_canon == 'simple' else 1
signer_sign_pm.dkim_body_canon = 0 if self.body_canon == 'simple' else 1
if signer_sign_pm.header_array is NULL:
raise MemoryError("Couldn't allocate memory for the header array")
cdef int error_code
try:
for i, header in enumerate(self.headers):
try:
encoded_header = '{}: {}'.format(
self._retrieve_original_case_sensitive_string_from_message(header),
self.message[header]).encode()
except KeyError:
raise exceptions.SpecifiedHeaderDoesNotExistsInProvidedMessage("Header key {} wasn't found in message".format(header))
signer_sign_pm.header_array[i].header = encoded_header
signer_sign_pm.header_array[i].size = len(encoded_header)
with CdkimSignerContextManager():
error_code = signer_sign(&signer_sign_pm)
if error_code in self.dkim_statuses_to_exceptions:
raise self.dkim_statuses_to_exceptions[error_code]()
elif error_code != DKIM_STAT_OK:
raise RuntimeError(f"signer_sign returned the following error code {error_code}")
signature = signer_sign_pm.out_signature_buffer.decode()
if normalized:
return "DKIM-Signature: " + signature
return signature
finally:
free(signer_sign_pm.header_array)
def add_signature_to_message(self) -> None:
"""Helper that adds the result of `self.get_signature_header` to the `message`"""
self.message['DKIM-Signature'] = self.get_signature_header(normalized=False)
class Verifier(Dkim):
def __init__(self, message: EmailMessage):
super().__init__()
self.message = message
def verify(self) -> None:
"""
Raises any of `exceptions` if unable to verify signature.
"""
cdef signer_sign_pm signer_sign_pm
headers = ['{field}: {value}'.format(field=field, value=value).encode() for field, value in self.message.items()]
canonicalized_body = self.get_canonicalized_body()
if not canonicalized_body.endswith(b'\r\n'):
canonicalized_body += b'\r\n'
signer_sign_pm.body = canonicalized_body
signer_sign_pm.body_size = len(signer_sign_pm.body)
signer_sign_pm.header_array_length = len(headers)
signer_sign_pm.header_array = <signer_eml_header*> malloc(sizeof(signer_eml_header) * signer_sign_pm.header_array_length)
if signer_sign_pm.header_array is NULL:
raise MemoryError("Couldn't allocate memory for the header array")
cdef int error_code
try:
for i, header in enumerate(headers):
signer_sign_pm.header_array[i].header = header
signer_sign_pm.header_array[i].size = len(header)
with CdkimSignerContextManager():
error_code = signer_verify(&signer_sign_pm)
if error_code in self.dkim_statuses_to_exceptions:
raise self.dkim_statuses_to_exceptions[error_code]()
elif error_code != DKIM_STAT_OK:
raise RuntimeError(f"signer_sign returned the following error code {error_code}")
finally:
free(signer_sign_pm.header_array)