Whew. This is a big update. I did some significant keeping work. I moved all of
[mediagoblin:mediagoblin.git] / mediagoblin / decorators.py
1 # GNU MediaGoblin -- federated, autonomous media hosting
2 # Copyright (C) 2011, 2012 MediaGoblin contributors.  See AUTHORS.
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as published by
6 # the Free Software Foundation, either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from functools import wraps
18
19 from urlparse import urljoin
20 from werkzeug.exceptions import Forbidden, NotFound
21 from werkzeug.urls import url_quote
22
23 from mediagoblin import mg_globals as mgg
24 from mediagoblin.db.models import MediaEntry, User, MediaComment, Privilege, \
25                             UserBan
26 from mediagoblin.tools.response import redirect, render_404, render_user_banned
27
28
29 def require_active_login(controller):
30     """
31     Require an active login from the user.
32     """
33     @wraps(controller)
34     def new_controller_func(request, *args, **kwargs):
35         if request.user and \
36                 request.user.status == u'needs_email_verification':
37             return redirect(
38                 request, 'mediagoblin.user_pages.user_home',
39                 user=request.user.username)
40         elif not request.user or request.user.status != u'active':
41             next_url = urljoin(
42                     request.urlgen('mediagoblin.auth.login',
43                         qualified=True),
44                     request.url)
45
46             return redirect(request, 'mediagoblin.auth.login',
47                             next=next_url)
48
49         return controller(request, *args, **kwargs)
50
51     return new_controller_func
52
53 def active_user_from_url(controller):
54     """Retrieve User() from <user> URL pattern and pass in as url_user=...
55
56     Returns a 404 if no such active user has been found"""
57     @wraps(controller)
58     def wrapper(request, *args, **kwargs):
59         user = User.query.filter_by(username=request.matchdict['user']).first()
60         if user is None:
61             return render_404(request)
62
63         return controller(request, *args, url_user=user, **kwargs)
64
65     return wrapper
66
67 def user_has_privilege(privilege_name):
68
69     def user_has_privilege_decorator(controller):
70         @wraps(controller)
71         def wrapper(request, *args, **kwargs):
72             user_id = request.user.id
73             privileges_of_user = Privilege.query.filter(
74                 Privilege.all_users.any(
75                 User.id==user_id))
76             if UserBan.query.filter(UserBan.user_id==user_id).count():
77                 return render_user_banned(request)
78             elif not privileges_of_user.filter(
79                 Privilege.privilege_name==privilege_name).count():
80                 raise Forbidden()
81
82             return controller(request, *args, **kwargs)
83
84         return wrapper
85     return user_has_privilege_decorator
86
87
88 def user_may_delete_media(controller):
89     """
90     Require user ownership of the MediaEntry to delete.
91     """
92     @wraps(controller)
93     def wrapper(request, *args, **kwargs):
94         uploader_id = kwargs['media'].uploader
95         if not (request.user.is_admin or
96                 request.user.id == uploader_id):
97             raise Forbidden()
98
99         return controller(request, *args, **kwargs)
100
101     return wrapper
102
103
104 def user_may_alter_collection(controller):
105     """
106     Require user ownership of the Collection to modify.
107     """
108     @wraps(controller)
109     def wrapper(request, *args, **kwargs):
110         creator_id = request.db.User.find_one(
111             {'username': request.matchdict['user']}).id
112         if not (request.user.is_admin or
113                 request.user.id == creator_id):
114             raise Forbidden()
115
116         return controller(request, *args, **kwargs)
117
118     return wrapper
119
120
121 def uses_pagination(controller):
122     """
123     Check request GET 'page' key for wrong values
124     """
125     @wraps(controller)
126     def wrapper(request, *args, **kwargs):
127         try:
128             page = int(request.GET.get('page', 1))
129             if page < 0:
130                 return render_404(request)
131         except ValueError:
132             return render_404(request)
133
134         return controller(request, page=page, *args, **kwargs)
135
136     return wrapper
137
138
139 def get_user_media_entry(controller):
140     """
141     Pass in a MediaEntry based off of a url component
142     """
143     @wraps(controller)
144     def wrapper(request, *args, **kwargs):
145         user = User.query.filter_by(username=request.matchdict['user']).first()
146         if not user:
147             raise NotFound()
148
149         media = None
150
151         # might not be a slug, might be an id, but whatever
152         media_slug = request.matchdict['media']
153
154         # if it starts with id: it actually isn't a slug, it's an id.
155         if media_slug.startswith(u'id:'):
156             try:
157                 media = MediaEntry.query.filter_by(
158                     id=int(media_slug[3:]),
159                     state=u'processed',
160                     uploader=user.id).first()
161             except ValueError:
162                 raise NotFound()
163         else:
164             # no magical id: stuff?  It's a slug!
165             media = MediaEntry.query.filter_by(
166                 slug=media_slug,
167                 state=u'processed',
168                 uploader=user.id).first()
169
170         if not media:
171             # Didn't find anything?  Okay, 404.
172             raise NotFound()
173
174         return controller(request, media=media, *args, **kwargs)
175
176     return wrapper
177
178
179 def get_user_collection(controller):
180     """
181     Pass in a Collection based off of a url component
182     """
183     @wraps(controller)
184     def wrapper(request, *args, **kwargs):
185         user = request.db.User.find_one(
186             {'username': request.matchdict['user']})
187
188         if not user:
189             return render_404(request)
190
191         collection = request.db.Collection.find_one(
192             {'slug': request.matchdict['collection'],
193              'creator': user.id})
194
195         # Still no collection?  Okay, 404.
196         if not collection:
197             return render_404(request)
198
199         return controller(request, collection=collection, *args, **kwargs)
200
201     return wrapper
202
203
204 def get_user_collection_item(controller):
205     """
206     Pass in a CollectionItem based off of a url component
207     """
208     @wraps(controller)
209     def wrapper(request, *args, **kwargs):
210         user = request.db.User.find_one(
211             {'username': request.matchdict['user']})
212
213         if not user:
214             return render_404(request)
215
216         collection_item = request.db.CollectionItem.find_one(
217             {'id': request.matchdict['collection_item'] })
218
219         # Still no collection item?  Okay, 404.
220         if not collection_item:
221             return render_404(request)
222
223         return controller(request, collection_item=collection_item, *args, **kwargs)
224
225     return wrapper
226
227
228 def get_media_entry_by_id(controller):
229     """
230     Pass in a MediaEntry based off of a url component
231     """
232     @wraps(controller)
233     def wrapper(request, *args, **kwargs):
234         media = MediaEntry.query.filter_by(
235                 id=request.matchdict['media_id'],
236                 state=u'processed').first()
237         # Still no media?  Okay, 404.
238         if not media:
239             return render_404(request)
240
241         given_username = request.matchdict.get('user')
242         if given_username and (given_username != media.get_uploader.username):
243             return render_404(request)
244
245         return controller(request, media=media, *args, **kwargs)
246
247     return wrapper
248
249
250 def get_media_comment_by_id(controller):
251     """
252     Pass in a MediaComment based off of a url component
253     """
254     @wraps(controller)
255     def wrapper(request, *args, **kwargs):
256         comment = MediaComment.query.filter_by(
257                 id=request.matchdict['comment']).first()
258         # Still no media?  Okay, 404.
259         if not comment:
260             return render_404(request)
261
262         return controller(request, comment=comment, *args, **kwargs)
263
264     return wrapper
265
266
267
268 def get_workbench(func):
269     """Decorator, passing in a workbench as kwarg which is cleaned up afterwards"""
270
271     @wraps(func)
272     def new_func(*args, **kwargs):
273         with mgg.workbench_manager.create() as workbench:
274             return func(*args, workbench=workbench, **kwargs)
275
276     return new_func
277
278 def require_admin_or_moderator_login(controller):
279     """
280     Require an login from an administrator or a moderator.
281     """
282     @wraps(controller)
283     def new_controller_func(request, *args, **kwargs):
284         admin_privilege = Privilege.one({'privilege_name':u'admin'})
285         moderator_privilege = Privilege.one({'privilege_name':u'moderator'})
286         if request.user and \
287             not admin_privilege in request.user.all_privileges and \
288                  not moderator_privilege in request.user.all_privileges:
289
290             raise Forbidden()
291         elif not request.user:
292             next_url = urljoin(
293                     request.urlgen('mediagoblin.auth.login',
294                         qualified=True),
295                     request.url)
296
297             return redirect(request, 'mediagoblin.auth.login',
298                             next=next_url)
299
300         return controller(request, *args, **kwargs)
301
302     return new_controller_func
303
304 def user_not_banned(controller):
305     """
306     Requires that the user has not been banned. Otherwise redirects to the page
307     explaining why they have been banned
308     """
309     @wraps(controller)
310     def wrapper(request, *args, **kwargs):
311         if request.user:
312             user_banned = UserBan.query.get(request.user.id)
313             if user_banned:
314                 return render_user_banned(request)
315         return controller(request, *args, **kwargs)
316
317     return wrapper
318