1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors. See AUTHORS.
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 from six.moves.urllib.parse import parse_qs, urlparse
25 from mediagoblin import mg_globals
26 from mediagoblin.tools import template, pluginapi
27 from mediagoblin.tests.tools import fixture_add_user
30 _log = logging.getLogger(__name__)
33 class TestOAuth(object):
34 @pytest.fixture(autouse=True)
35 def setup(self, test_app):
36 self.test_app = test_app
38 self.db = mg_globals.database
40 self.pman = pluginapi.PluginManager()
42 self.user_password = u'4cc355_70k3N'
43 self.user = fixture_add_user(u'joauth', self.user_password,
44 privileges=[u'active'])
51 'username': self.user.username,
52 'password': self.user_password})
54 def register_client(self, name, client_type, description=None,
56 return self.test_app.post(
57 '/oauth-2/client/register', {
59 'description': description,
61 'redirect_uri': redirect_uri})
63 def get_context(self, template_name):
64 return template.TEMPLATE_TEST_CONTEXT[template_name]
66 def test_1_public_client_registration_without_redirect_uri(self):
67 ''' Test 'public' OAuth client registration without any redirect uri '''
68 response = self.register_client(
69 u'OMGOMGOMG', 'public', 'OMGOMG Apache License v2')
71 ctx = self.get_context('oauth/client/register.html')
73 client = self.db.OAuthClient.query.filter(
74 self.db.OAuthClient.name == u'OMGOMGOMG').first()
76 assert response.status_int == 200
78 # Should display an error
79 assert len(ctx['form'].redirect_uri.errors)
81 # Should not pass through
84 def test_2_successful_public_client_registration(self):
85 ''' Successfully register a public client '''
86 uri = 'http://foo.example'
88 u'OMGOMG', 'public', 'OMG!', uri)
90 client = self.db.OAuthClient.query.filter(
91 self.db.OAuthClient.name == u'OMGOMG').first()
93 # redirect_uri should be set
94 assert client.redirect_uri == uri
96 # Client should have been registered
99 def test_3_successful_confidential_client_reg(self):
100 ''' Register a confidential OAuth client '''
101 response = self.register_client(
102 u'GMOGMO', 'confidential', 'NO GMO!')
104 assert response.status_int == 302
106 client = self.db.OAuthClient.query.filter(
107 self.db.OAuthClient.name == u'GMOGMO').first()
109 # Client should have been registered
114 def test_4_authorize_confidential_client(self):
115 ''' Authorize a confidential client as a logged in user '''
116 client = self.test_3_successful_confidential_client_reg()
118 client_identifier = client.identifier
120 redirect_uri = 'https://foo.example'
121 response = self.test_app.get('/oauth-2/authorize', {
122 'client_id': client.identifier,
124 'redirect_uri': redirect_uri})
126 # User-agent should NOT be redirected
127 assert response.status_int == 200
129 ctx = self.get_context('oauth/authorize.html')
133 # Short for client authorization post reponse
134 capr = self.test_app.post(
135 '/oauth-2/client/authorize', {
136 'client_id': form.client_id.data,
138 'next': form.next.data})
140 assert capr.status_int == 302
142 authorization_response = capr.follow()
144 assert authorization_response.location.startswith(redirect_uri)
146 return authorization_response, client_identifier
148 def get_code_from_redirect_uri(self, uri):
149 ''' Get the value of ?code= from an URI '''
150 return parse_qs(urlparse(uri).query)['code'][0]
152 def test_token_endpoint_successful_confidential_request(self):
153 ''' Successful request against token endpoint '''
154 code_redirect, client_id = self.test_4_authorize_confidential_client()
156 code = self.get_code_from_redirect_uri(code_redirect.location)
158 client = self.db.OAuthClient.query.filter(
159 self.db.OAuthClient.identifier == six.text_type(client_id)).first()
161 token_res = self.test_app.get('/oauth-2/access_token?client_id={0}&\
162 code={1}&client_secret={2}'.format(client_id, code, client.secret))
164 assert token_res.status_int == 200
166 token_data = json.loads(token_res.body.decode())
168 assert not 'error' in token_data
169 assert 'access_token' in token_data
170 assert 'token_type' in token_data
171 assert 'expires_in' in token_data
172 assert type(token_data['expires_in']) == int
173 assert token_data['expires_in'] > 0
175 # There should be a refresh token provided in the token data
176 assert len(token_data['refresh_token'])
178 return client_id, token_data
180 def test_token_endpont_missing_id_confidential_request(self):
181 ''' Unsuccessful request against token endpoint, missing client_id '''
182 code_redirect, client_id = self.test_4_authorize_confidential_client()
184 code = self.get_code_from_redirect_uri(code_redirect.location)
186 client = self.db.OAuthClient.query.filter(
187 self.db.OAuthClient.identifier == six.text_type(client_id)).first()
189 token_res = self.test_app.get('/oauth-2/access_token?\
190 code={0}&client_secret={1}'.format(code, client.secret))
192 assert token_res.status_int == 200
194 token_data = json.loads(token_res.body.decode())
196 assert 'error' in token_data
197 assert not 'access_token' in token_data
198 assert token_data['error'] == 'invalid_request'
199 assert len(token_data['error_description'])
201 def test_refresh_token(self):
202 ''' Try to get a new access token using the refresh token '''
203 # Get an access token and a refresh token
204 client_id, token_data =\
205 self.test_token_endpoint_successful_confidential_request()
207 client = self.db.OAuthClient.query.filter(
208 self.db.OAuthClient.identifier == client_id).first()
210 token_res = self.test_app.get('/oauth-2/access_token',
211 {'refresh_token': token_data['refresh_token'],
212 'client_id': client_id,
213 'client_secret': client.secret
216 assert token_res.status_int == 200
218 new_token_data = json.loads(token_res.body.decode())
220 assert not 'error' in new_token_data
221 assert 'access_token' in new_token_data
222 assert 'token_type' in new_token_data
223 assert 'expires_in' in new_token_data
224 assert type(new_token_data['expires_in']) == int
225 assert new_token_data['expires_in'] > 0