Release 8.
[jlew:xo-file-distro.git] / FileShare.activity / FileShareActivity.py
1 # Copyright (C) 2009, Justin Lewis  (jtl1728@rit.edu)
2 #
3 # This program is free software; you can redistribute it and/or modify
4 # it under the terms of the GNU General Public License as published by
5 # the Free Software Foundation; either version 2 of the License, or
6 # (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful,
9 # but WITHOUT ANY WARRANTY; without even the implied warranty of
10 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11 # GNU General Public License for more details.
12 #
13 # You should have received a copy of the GNU General Public License
14 # along with this program; if not, write to the Free Software
15 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
16
17 import gtk
18 import telepathy
19 import simplejson
20 import tempfile
21 import os
22 import time
23 import journalentrybundle
24 import dbus
25 import gobject
26 import zipfile
27 from hashlib import sha1
28 from gettext import gettext as _
29
30 from sugar.activity.activity import Activity
31
32 from sugar.presence.tubeconn import TubeConnection
33 from sugar import network
34 from sugar import profile
35
36 from GuiView import GuiView
37 from MyExceptions import InShareException, FileUploadFailure, ServerRequestFailure, NoFreeTubes, TimeOut
38
39 from TubeSpeak import TubeSpeak
40 import FileInfo
41 from hashlib import sha1
42
43 import urllib, urllib2, MultipartPostHandler, httplib
44 import threading
45
46 import logging
47 _logger = logging.getLogger('fileshare-activity')
48
49 SERVICE = "org.laptop.FileShare"
50 IFACE = SERVICE
51 PATH = "/org/laptop/FileShare"
52 DIST_STREAM_SERVICE = 'fileshare-activity-http'
53
54 class MyHTTPRequestHandler(network.ChunkedGlibHTTPRequestHandler):
55     def translate_path(self, path):
56         return self.server._pathBuilder( path )
57
58 class MyHTTPServer(network.GlibTCPServer):
59     def __init__(self, server_address, pathBuilder):
60         self._pathBuilder = pathBuilder
61         network.GlibTCPServer.__init__(self, server_address, MyHTTPRequestHandler)
62
63 class FileShareActivity(Activity):
64     def __init__(self, handle):
65         Activity.__init__(self, handle)
66         #wait a moment so that our debug console capture mistakes
67         gobject.threads_init()
68         gobject.idle_add( self._doInit, None )
69
70     def _doInit(self, handle):
71         _logger.info("activity running")
72
73         # Make a temp directory to hold all files
74         temp_path = os.path.join(self.get_activity_root(), 'instance')
75         self._filepath = tempfile.mkdtemp(dir=temp_path)
76
77         # Set if they started the activity
78         self.isServer = not self._shared_activity
79
80         # Port the file server will do http transfers
81         self.port = 1024 + (hash(self._activity_id) % 64511)
82
83         # Data structures for holding file list
84         self.sharedFiles = {}
85
86         # Holds the controll tube
87         self.controlTube = None
88
89         # Holds tubes for transfers
90         self.unused_download_tubes = set()
91         self.addr=None
92
93         # Are we the ones that created the control tube
94         self.initiating = False
95
96         # Set to true when closing for keep cleanup
97         self._close_requested = False
98
99         # Set up internals for server mode if later requested
100         self._mode = "P2P"
101         prof = profile.get_profile()
102         self._user_key_hash = sha1(prof.pubkey).hexdigest()
103         self._user_nick = profile.get_nick_name()
104         self._user_permissions = 0
105         self.server_ip = None
106
107         jabber_serv = None
108         prof = profile.get_profile()
109         #Need to check if on 82 or higher
110         if hasattr(prof, 'jabber_server'):
111             jabber_serv = prof.jabber_server
112         else:
113             #Higher, everything was moved to gconf
114             import gconf
115             client = gconf.client_get_default()
116             jabber_serv = client.get_string("/desktop/sugar/collaboration/jabber_server")
117
118         if jabber_serv:
119             self.server_ip = jabber_serv
120             self.server_port= 14623
121             self.s_version = 0
122
123
124         # INITIALIZE GUI
125         ################
126         self.set_title('File Share')
127
128         # Set gui display object
129         self.disp = GuiView(self)
130
131         # Set Toolbars
132         self.disp.build_toolbars()
133
134         # Build table and display the gui
135         self.disp.build_table()
136
137         # Connect to shared and join calls
138         self._sh_hnd = self.connect('shared', self._shared_cb)
139         self._jo_hnd = self.connect('joined', self._joined_cb)
140
141         self.set_canvas(self.disp)
142         self.show_all()
143
144     def switch_to_server(self):
145         if self.server_ip and self.isServer:
146             self._mode = "SERVER"
147             self.isServer = False
148
149             # Remove shared mode
150             # Disable handlers incase not shared yet
151             self.disconnect( self._sh_hnd )
152             self.disconnect( self._jo_hnd )
153
154             # Disable notify the tube of changes
155             self.initiating = False
156
157             # Disable greeting people joining tube
158             if self.controlTube:
159                 self.controlTube.switch_to_server_mode()
160
161             # Set activity to private mode if shared
162             if self._shared_activity:
163                 ##TODO:
164                 pass
165
166             # Clear file List (can't go to server mode after sharing, clear list)
167             # Will not delete files so connected people can still download files.
168             self.disp.clear_files(False)
169
170             # Rebuild gui, now we are in server mode
171             self.disp.build_toolbars()
172
173             #self.set_canvas(self.disp)
174             #self.show_all()
175
176             #IN SERVER MODE, GET SERVER FILE LIST
177             def call():
178                 try:
179                     conn = httplib.HTTPConnection( self.server_ip, self.server_port)
180                     conn.request("GET", "/filelist")
181                     r1 = conn.getresponse()
182                     if r1.status == 200:
183                         data = r1.read()
184                         conn.close()
185                         self.incomingRequest('filelist',data)
186                     else:
187                         self.disp.guiHandler._alert(str(r1.status), _("Error getting file list") )
188                 except:
189                     self.disp.guiHandler._alert(_("Error getting file list"))
190                 self.disp.guiHandler.show_throbber(False)
191
192             self.disp.guiHandler.show_throbber(True, _("Requesting file list from server"))
193             threading.Thread(target=call).start()
194
195     def check_for_server(self):
196         s_version = None
197         try:
198             conn = httplib.HTTPConnection( self.server_ip, self.server_port)
199             conn.request("GET", "/version")
200             r1 = conn.getresponse()
201             if r1.status == 200:
202                 s_version= r1.read()
203                 conn.close()
204
205                 if int(s_version) >= 2:
206                     # Version 2 supports permissions, announce user so server
207                     # can cache user info and be added to the access list if allowed
208
209                     params =  { 'id': self._user_key_hash,
210                                 'nick': self._user_nick
211                               }
212                     try:
213                         opener = urllib2.build_opener( MultipartPostHandler.MultipartPostHandler)
214                         f = opener.open("http://%s:%d/announce_user"%(self.server_ip, self.server_port), params)
215                         self._user_permissions = int(f.read())
216                     except:
217                         raise ServerRequestFailure
218
219                 else:
220                     # Older version didn't have permissions, set 1 as default (upload/remove)
221                     self._user_permissions = 1
222                 self.s_version = s_version
223                 return True
224             else:
225                 return False
226         except:
227             return False
228
229     def get_server_user_list(self):
230         params =  { 'id': self._user_key_hash }
231         try:
232             opener = urllib2.build_opener( MultipartPostHandler.MultipartPostHandler)
233             f = opener.open("http://%s:%d/user_list"%(self.server_ip, self.server_port), params)
234             response = f.read()
235             return simplejson.loads(response)
236         except Exception:
237             raise ServerRequestFailure
238
239     def change_server_user(self, userId, level):
240         params = { 'id': self._user_key_hash,
241                    'userid': userId,
242                    'level': level
243                  }
244         try:
245             opener = urllib2.build_opener( MultipartPostHandler.MultipartPostHandler)
246             f = opener.open("http://%s:%d/user_mod"%(self.server_ip, self.server_port), params)
247         except:
248             raise ServerRequestFailure
249
250     def build_file(self, jobject):
251         #If object has activity id and it is filled in, use that as hash
252         if jobject.metadata.has_key("activity_id") and str(jobject.metadata['activity_id']):
253             objectHash = str(jobject.metadata['activity_id'])
254             bundle_path = os.path.join(self._filepath, '%s.xoj' % objectHash)
255
256             # If file in share, return don't build file
257             if os.path.exists(bundle_path):
258                 raise InShareException()
259
260         else:
261             # Unknown activity id, must be a file
262             if jobject.get_file_path():
263                 # FIXME: This just checks the file hash should check for
264                 # identity by compairing metadata, but this will work for now
265                 # Problems are that if you have one file multiple times it will
266                 # only allow one copy of that file regardless of the metadata
267                 objectHash = sha1(open(jobject.get_file_path() ,'rb').read()).hexdigest()
268                 bundle_path = os.path.join(self._filepath, '%s.xoj' % objectHash)
269
270                 if os.path.exists(bundle_path):
271                     raise InShareException()
272
273             else:
274                 # UNKOWN ACTIVTIY, No activity id, no file hash, just add it
275                 # FIXME
276                 _logger.warn("Unknown File Data. Can't check if file is already shared.")
277                 objectHash = sha1(time.time()).hexdigest()
278                 bundle_path = os.path.join(self._filepath, '%s.xoj' % objectHash)
279
280         journalentrybundle.from_jobject(jobject, bundle_path )
281
282         # Build file information
283         desc =  "" if not jobject.metadata.has_key('description') else str( jobject.metadata['description'] )
284         title = _("Untitled") if str(jobject.metadata['title']) == "" else str(jobject.metadata['title'])
285         tags = "" if not jobject.metadata.has_key('tags') else str( jobject.metadata['tags'] )
286         size = os.path.getsize( bundle_path )
287
288         #File Info Block
289         return FileInfo.FileInfo(objectHash, title, desc, tags, size, True)
290
291     def send_file_to_server(self, id, file_info):
292         bundle_path = os.path.join(self._filepath, '%s.xoj' % id)
293         params = { 'jdata': simplejson.dumps(file_info.share_dump()),
294                     'file':  open(bundle_path, 'rb')
295                 }
296
297         if self.s_version >= 2:
298             params['id'] = self._user_key_hash
299
300         try:
301             opener = urllib2.build_opener( MultipartPostHandler.MultipartPostHandler)
302             opener.open("http://%s:%d/upload"%(self.server_ip, self.server_port), params)
303         except:
304             raise FileUploadFailure()
305
306     def remove_file_from_server( self, file_id ):
307         params =  { 'fid': file_id }
308
309         if self.s_version >= 2:
310             params['id'] = self._user_key_hash
311
312         try:
313             opener = urllib2.build_opener( MultipartPostHandler.MultipartPostHandler)
314             opener.open("http://%s:%d/remove"%(self.server_ip, self.server_port), params)
315         except:
316             raise ServerRequestFailure
317
318
319     def updateFileObj( self, key, file_obj ):
320         if self.sharedFiles.has_key( key ):
321             self.sharedFiles[key] = file_obj
322
323     def _registerShareFile( self, key, file_obj ):
324         self.sharedFiles[key] = file_obj
325
326         # Notify connected users
327         if self.initiating:
328                 self.controlTube.FileAdd( simplejson.dumps(fileinfo.share_dump()) )
329
330     def _unregisterShareFile( self, key ):
331         del self.sharedFiles[key]
332
333         # Notify connected users
334         if self.initiating:
335             self.controlTube.FileRem( simplejson.dumps(id) )
336
337
338
339     def delete_file( self, id ):
340         bundle_path = os.path.join(self._filepath, '%s.xoj' % id)
341         try:
342             os.remove( bundle_path )
343         except:
344             _logger.warn("Could not remove file from system: %s",bundle_path)
345
346     def server_ui_del_overide(self):
347         return self.isServer or self._mode=="SERVER"
348
349     def getFileList(self):
350         ret = {}
351         for key in self.sharedFiles:
352             ret[key] = self.sharedFiles[key].share_dump()
353         return simplejson.dumps(ret)
354
355     def filePathBuilder(self, path):
356         if self.sharedFiles.has_key( path[1:] ):
357             return os.path.join(self._filepath, '%s.xoj' % path[1:])
358         else:
359             _logger.debug("INVALID PATH",path[1:])
360
361     def _shared_cb(self, activity):
362         _logger.debug('Activity is now shared')
363         self.initiating = True
364
365         # Add hooks for new tubes.
366         self.watch_for_tubes()
367
368         #Create Shared tube
369         _logger.debug('This is my activity: making a tube...')
370
371         # Offor control tube (callback will put it into crontrol tube var)
372         self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].OfferDBusTube( SERVICE, {})
373
374         #Get ready to share files
375         self._share_document()
376
377     def _joined_cb(self, activity):
378
379         _logger.debug('Joined an existing shared activity')
380         self.initiating = False
381
382         # Add hooks for new tubes.
383         self.watch_for_tubes()
384
385         # Normally, we would just ask for the document.
386         # This activity allows the user to request files.
387         # The server will send us the file list and then we
388         # can use any new tubes to download the file
389
390
391
392     def watch_for_tubes(self):
393         """This method sets up the listeners for new tube connections"""
394         self.conn = self._shared_activity.telepathy_conn
395         self.tubes_chan = self._shared_activity.telepathy_tubes_chan
396
397         self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].connect_to_signal('NewTube',
398             self._new_tube_cb)
399
400         self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].ListTubes(
401             reply_handler=self._list_tubes_reply_cb,
402             error_handler=self._list_tubes_error_cb)
403
404     def _share_document(self):
405         _logger.info("Ready to share document, starting file server")
406         # FIXME: should ideally have the fileserver listen on a Unix socket
407         # instead of IPv4 (might be more compatible with Rainbow)
408
409         # Create a fileserver to serve files
410         self._fileserver = MyHTTPServer(("", self.port), self.filePathBuilder)
411
412         # Make a tube for it
413         chan = self._shared_activity.telepathy_tubes_chan
414         iface = chan[telepathy.CHANNEL_TYPE_TUBES]
415         self._fileserver_tube_id = iface.OfferStreamTube(DIST_STREAM_SERVICE,
416                 {},
417                 telepathy.SOCKET_ADDRESS_TYPE_IPV4,
418                 ('127.0.0.1', dbus.UInt16(self.port)),
419                 telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0)
420
421     def _server_download_document( self, fileId ):
422         addr = [self.server_ip, self.server_port]
423         self._download_document(addr, fileId)
424         # Download the file at next avaialbe time.
425         #gobject.idle_add(self._download_document, addr, fileId)
426         #return False
427
428
429     def _get_document(self,fileId):
430         if not self.addr:
431             try:
432                 tube_id = self.unused_download_tubes.pop()
433             except (ValueError, KeyError), e:
434                 _logger.debug('No tubes to get the document from right now: %s', e)
435                 raise NoFreeTubes()
436
437             # FIXME: should ideally have the CM listen on a Unix socket
438             # instead of IPv4 (might be more compatible with Rainbow)
439             chan = self._shared_activity.telepathy_tubes_chan
440             iface = chan[telepathy.CHANNEL_TYPE_TUBES]
441             self.addr = iface.AcceptStreamTube(tube_id,
442                     telepathy.SOCKET_ADDRESS_TYPE_IPV4,
443                     telepathy.SOCKET_ACCESS_CONTROL_LOCALHOST, 0,
444                     utf8_strings=True)
445
446             _logger.debug('Accepted stream tube: listening address is %r', self.addr)
447             # SOCKET_ADDRESS_TYPE_IPV4 is defined to have addresses of type '(sq)'
448             assert isinstance(self.addr, dbus.Struct)
449             assert len(self.addr) == 2
450             assert isinstance(self.addr[0], str)
451             assert isinstance(self.addr[1], (int, long))
452             assert self.addr[1] > 0 and self.addr[1] < 65536
453
454         # Download the file at next avaialbe time.
455         self._download_document(self.addr, fileId)
456         #gobject.idle_add(self._download_document, self.addr, fileId)
457         #return False
458
459     def _list_tubes_reply_cb(self, tubes):
460         for tube_info in tubes:
461             self._new_tube_cb(*tube_info)
462
463     def _list_tubes_error_cb(self, e):
464         _loggerg.error('ListTubes() failed: %s', e)
465
466     def _new_tube_cb(self, id, initiator, type, service, params, state):
467         _logger.debug('New tube: ID=%d initator=%d type=%d service=%s '
468                      'params=%r state=%d', id, initiator, type, service, params, state)
469         if (type == telepathy.TUBE_TYPE_DBUS and service == SERVICE):
470             if state == telepathy.TUBE_STATE_LOCAL_PENDING:
471                 self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES].AcceptDBusTube(id)
472             # Control tube
473             _logger.debug("Connecting to Control Tube")
474             tube_conn = TubeConnection(self.conn,
475                 self.tubes_chan[telepathy.CHANNEL_TYPE_TUBES], id,
476                 group_iface=self.tubes_chan[telepathy.CHANNEL_INTERFACE_GROUP])
477
478             self.controlTube = TubeSpeak(tube_conn, self.initiating,
479                                          self.incomingRequest, self.getFileList)
480         elif (type == telepathy.TUBE_TYPE_STREAM and service == DIST_STREAM_SERVICE):
481                 # Data tube, store for later
482                 _logger.debug("New data tube added")
483                 self.unused_download_tubes.add(id)
484
485
486     def incomingRequest(self,action,request):
487         if action == "filelist":
488             filelist = simplejson.loads( request )
489             for key in filelist:
490                 if not self.sharedFiles.has_key(key):
491                     fi = FileInfo.share_load(filelist[key])
492                     self.disp.guiHandler._addFileToUIList(fi.id, fi)
493                     # Register File with activity share list
494                     self._registerShareFile( fi.id, fi )
495         elif action == "fileadd":
496             addList = simplejson.loads( request )
497             fi = FileInfo.share_load( addList )
498             self.disp.guiHandler._addFileToUIList( fi.id, fi )
499             self._registerShareFile( fi.id, fi )
500         elif action == "filerem":
501             id =  simplejson.loads( request )
502             # DO NOT DELETE IF TRANSFER IN PROGRESS/COMPLETE
503             if self.fileShare[id].aquired == 0:
504                 self.disp.guiHandler._remFileFromUIList( id )
505                 # UnRegister File with activity share list
506                 self._unregisterShareFile( key )
507
508         else:
509             _logger.debug("Incoming tube Request: %s. Data: %s" % (action, request) )
510
511     def _download_document(self, addr, documentId):
512         _logger.debug('Requesting to download document')
513         bundle_path = os.path.join(self._filepath, '%s.xoj' % documentId)
514         port = int(addr[1])
515
516         getter = network.GlibURLDownloader("http://%s:%d/%s" % (addr[0], port,documentId))
517         getter.connect("finished", self._download_result_cb, documentId)
518         getter.connect("progress", self._download_progress_cb, documentId)
519         getter.connect("error", self._download_error_cb, documentId)
520         _logger.debug("Starting download to %s...", bundle_path)
521         getter.start(bundle_path)
522         return False
523
524     def _download_result_cb(self, getter, tmp_file, suggested_name, fileId):
525         _logger.debug("Got document %s (%s)", tmp_file, suggested_name)
526
527         try:
528             metadata = self._installBundle( tmp_file )
529             self.disp.guiHandler._alert( _("File Downloaded"), metadata['title'])
530             self.disp.set_installed( fileId )
531         except:
532             self.disp.guiHandler._alert( _("File Download Failed") )
533             self.disp.set_installed( fileId, False )
534
535     def _download_progress_cb(self, getter, bytes_downloaded, fileId):
536         self.disp.update_progress( fileId, bytes_downloaded )
537
538         # Force gui to update if there are actions pending
539         # Fixes bug where system appears to hang on FAST connections
540         while gtk.events_pending():
541             gtk.main_iteration()
542
543     def _download_error_cb(self, getter, err, fileId):
544         _logger.debug("Error getting document from tube. %s",  err )
545         self.disp.guiHandler._alert(_("Error getting document"), err)
546         #gobject.idle_add(self._get_document)
547
548
549     def _installBundle(self, tmp_file):
550         """Installs a file to the journal"""
551         _logger.debug("Saving %s to datastore...", tmp_file)
552         bundle = journalentrybundle.JournalEntryBundle(tmp_file)
553         bundle.install()
554         return bundle.get_metadata()
555
556
557     def can_close( self ):
558         #TODO: HAVE SERVER CHECK IF IT CAN CLOSE
559         self._close_requested = True
560         return True
561
562     def write_file(self, file_path):
563         _logger.debug('Writing activity file')
564
565         file = zipfile.ZipFile(file_path, "w")
566
567         # If no files to save save empty list
568         if len(self.sharedFiles) == 0:
569             #hack to empty file if existed before
570             file.writestr("_filelist.json", simplejson.dumps({}))
571             file.close()
572             return
573
574         if self._close_requested:
575             dialog = gtk.MessageDialog(self, gtk.DIALOG_MODAL,
576                     gtk.MESSAGE_INFO, gtk.BUTTONS_YES_NO,
577                     _("Saving files in activity allows the activity to resume with the current file list but takes up more space.") )
578             dialog.set_title("Do you wish to save files within activity?")
579
580             response = dialog.run()
581             dialog.destroy()
582
583             # Return not allowing files to be saved
584             if response == gtk.RESPONSE_NO:
585                 #hack to empty file if existed before
586                 file.writestr("_filelist.json", simplejson.dumps({}))
587                 file.close()
588                 return
589
590         # Save, requested, write files into zip and save file list
591         try:
592             for name in os.listdir(self._filepath):
593                 file.write(os.path.join( self._filepath, name), name, zipfile.ZIP_DEFLATED)
594
595             file.writestr("_filelist.json", self.getFileList())
596         finally:
597             file.close()
598
599     def read_file(self, file_path):
600         logging.debug('RELOADING ACTIVITY DATA...')
601
602         # Read file list from zip
603         zip_file = zipfile.ZipFile(file_path,'r')
604         filelist = simplejson.loads(zip_file.read("_filelist.json"))
605         namelist = zip_file.namelist()
606         for key in filelist:
607             fileName = '%s.xoj' % key
608             # Only extract and add files that we have (needed if client when saved)
609             if fileName in namelist:
610                 bundle_path = os.path.join(self._filepath, fileName)
611                 open(bundle_path, "wb").write(zip_file.read(fileName))
612
613                 fi = FileInfo.share_load(filelist[key], True)
614                 self._addFileToUIList(fi.id, fi)
615
616         zip_file.close()