-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_auth.py
More file actions
222 lines (156 loc) · 6.79 KB
/
test_auth.py
File metadata and controls
222 lines (156 loc) · 6.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from uuid import uuid4
import django
# We run django.setup() in order to auto populate the base django's app models for testing purposes
from rest_framework_simplejwt.utils import aware_utcnow, datetime_to_epoch
try:
django.setup()
except Exception as exc:
raise exc
import pytest
from django.http.request import HttpRequest
from rest_framework import exceptions
from rest_framework_simplejwt.tokens import UntypedToken
from src.shipchain_common.authentication import EngineRequest, passive_credentials_auth, PermissionedTokenUser, \
TransmissionRequest, LambdaRequest
from src.shipchain_common.test_utils import get_jwt
from src.shipchain_common.utils import random_id
USERNAME = 'fake@shipchain.io'
ORGANIZATION_ID = '00000000-0000-0000-0000-000000000001'
@pytest.fixture()
def username():
return 'fake@shipchain.io'
@pytest.fixture()
def organization_id():
return random_id()
@pytest.fixture()
def engine_request():
return EngineRequest()
@pytest.fixture()
def transmission_request():
return TransmissionRequest()
@pytest.fixture()
def lambda_request():
return LambdaRequest()
def test_passive_jwt_auth(username):
with pytest.raises(exceptions.AuthenticationFailed):
passive_credentials_auth('')
user = passive_credentials_auth(get_jwt(username=username))
assert user.is_authenticated
assert not user.is_staff
assert not user.is_superuser
assert user.username == 'fake@shipchain.io'
assert user.token.get('organization_id', None) is None
def test_organization_jwt_auth(username, organization_id):
user = passive_credentials_auth(get_jwt(username=username, organization_id=organization_id))
assert user.token.get('organization_id', None) == organization_id
def test_engine_auth_requires_header(engine_request):
request = HttpRequest()
assert not engine_request.has_permission(request, {})
request.META['X_NGINX_SOURCE'] = 'alb'
assert not engine_request.has_permission(request, {})
request.META['X_NGINX_SOURCE'] = 'internal'
with pytest.raises(KeyError):
engine_request.has_permission(request, {})
request.META['X_SSL_CLIENT_VERIFY'] = 'NONE'
assert not engine_request.has_permission(request, {})
request.META['X_SSL_CLIENT_VERIFY'] = 'SUCCESS'
with pytest.raises(KeyError):
engine_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=engine.h4ck3d'
assert not engine_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=profiles.test-internal'
assert not engine_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=engine.test-internal'
assert engine_request.has_permission(request, {})
def test_transmission_auth_requires_header(transmission_request):
request = HttpRequest()
assert not transmission_request.has_permission(request, {})
request.META['X_NGINX_SOURCE'] = 'alb'
assert not transmission_request.has_permission(request, {})
request.META['X_NGINX_SOURCE'] = 'internal'
with pytest.raises(KeyError):
transmission_request.has_permission(request, {})
request.META['X_SSL_CLIENT_VERIFY'] = 'NONE'
assert not transmission_request.has_permission(request, {})
request.META['X_SSL_CLIENT_VERIFY'] = 'SUCCESS'
with pytest.raises(KeyError):
transmission_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=transmission.h4ck3d'
assert not transmission_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=profiles.test-internal'
assert not transmission_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=transmission.test-internal'
assert transmission_request.has_permission(request, {})
def test_lambda_auth_requires_header(lambda_request):
request = HttpRequest()
assert not lambda_request.has_permission(request, {})
request.META['X_NGINX_SOURCE'] = 'alb'
assert not lambda_request.has_permission(request, {})
request.META['X_NGINX_SOURCE'] = 'internal'
with pytest.raises(KeyError):
lambda_request.has_permission(request, {})
request.META['X_SSL_CLIENT_VERIFY'] = 'NONE'
assert not lambda_request.has_permission(request, {})
request.META['X_SSL_CLIENT_VERIFY'] = 'SUCCESS'
with pytest.raises(KeyError):
lambda_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=lambda.h4ck3d'
assert not lambda_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=profiles.test-internal'
assert not lambda_request.has_permission(request, {})
request.META['X_SSL_CLIENT_DN'] = '/CN=lambda.test-internal'
assert lambda_request.has_permission(request, {})
@pytest.fixture
def one_feature():
"""Returns feature object response in token, and list of feature permissions"""
return {'feature': ['permission']}, ['feature.permission']
@pytest.fixture
def many_feature():
"""Returns feature object response in token, and list of feature permissions"""
return {
'feature': [
'permission',
'permission2',
],
'feature2': [
'permission',
'permission2',
],
}, [
'feature.permission',
'feature.permission2',
'feature2.permission',
'feature2.permission2',
]
def test_token_user_get_no_permissions():
jwt = get_jwt()
token = UntypedToken(jwt)
token_user = PermissionedTokenUser(token)
assert token_user.get_all_permissions() == []
def test_token_user_get_one_permission(one_feature):
jwt = get_jwt(features=one_feature[0])
token = UntypedToken(jwt)
token_user = PermissionedTokenUser(token)
assert token_user.get_all_permissions() == one_feature[1]
def test_token_user_get_many_permission(many_feature):
jwt = get_jwt(features=many_feature[0])
token = UntypedToken(jwt)
token_user = PermissionedTokenUser(token)
assert token_user.get_all_permissions() == many_feature[1]
def test_token_user_has_perm(many_feature):
jwt = get_jwt(features=many_feature[0])
token = UntypedToken(jwt)
token_user = PermissionedTokenUser(token)
assert token_user.has_perm(many_feature[1][2])
def test_token_user_has_perms(many_feature):
jwt = get_jwt(features=many_feature[0])
token = UntypedToken(jwt)
token_user = PermissionedTokenUser(token)
assert token_user.has_perms(many_feature[1])
def test_token_user_does_not_has_perm(many_feature):
jwt = get_jwt(features=many_feature[0])
token = UntypedToken(jwt)
token_user = PermissionedTokenUser(token)
assert not token_user.has_perm('not_a_permission')
assert not token_user.has_perm(many_feature[1][0].split('.')[0]) # doesn't match on just feature
assert not token_user.has_perm(many_feature[1][0].split('.')[1]) # doesn't match on just permission