crappy
[hls-player:hls-player.git] / HLS / fetcher.py
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Copyright (C) 2009-2010 Fluendo, S.L. (www.fluendo.com).
5 # Copyright (C) 2009-2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
6
7 # This file may be distributed and/or modified under the terms of
8 # the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 # This file is distributed without any warranty; without even the implied
11 # warranty of merchantability or fitness for a particular purpose.
12 # See "LICENSE" in the source distribution for more information.
13
14 from itertools import ifilter
15 import logging
16 import os, os.path
17 import tempfile
18 import urlparse
19
20 from twisted.python import log
21 from twisted.web import client
22 from twisted.internet import defer, reactor
23 #from twisted.internet.task import deferLater
24
25 import HLS
26 from HLS.m3u8 import M3U8
27
28 def deferLater(clock, delay, callable, *args, **kw):
29     d = defer.Deferred()
30     d.addCallback(lambda ignored: callable(*args, **kw))
31     clock.callLater(delay, d.callback, None)
32     return d
33
34 class HLSFetcher(object):
35
36     def __init__(self, url, options=None, program=1):
37         self.url = url
38         self.program = program
39         if options:
40             self.path = options.path
41             self.referer = options.referer
42             self.bitrate = options.bitrate
43             self.n_segments_keep = options.keep
44             self.nbuffer = options.buffer
45         else:
46             self.path = None
47             self.referer = None
48             self.bitrate = 200000
49             self.n_segments_keep = 3
50             self.nbuffer = 3
51         if not self.path:
52             self.path = tempfile.mkdtemp()
53
54         self._program_playlist = None
55         self._file_playlist = None
56         self._cookies = {}
57         self._cached_files = {} # sequence n -> path
58
59         self._files = None # the iter of the playlist files download
60         self._next_download = None # the delayed download defer, if any
61         self._file_playlisted = None # the defer to wait until new files are added to playlist
62
63     def _get_page(self, url):
64         def got_page(content):
65             logging.debug("Cookies: %r" % self._cookies)
66             return content
67         def got_page_error(e, url):
68             logging.error(url)
69             print e
70
71         url = url.encode("utf-8")
72         if 'HLS_RESET_COOKIES' in os.environ.keys():
73             self._cookies = {}
74         headers = {}
75         if self.referer:
76             headers['Referer'] = self.referer
77         d = client.getPage(url, cookies=self._cookies, headers=headers)
78         d.addCallback(got_page)
79         d.addErrback(got_page_error, url)
80         return d
81
82     def _download_segment(self, f):
83         def _got_file_failed(e, self):
84             if self._new_filed:
85                 self._new_filed.errback(e)
86                 self._new_filed = None
87                 return e
88         def _got_file(path, self, url, f):
89             logging.debug("Saved " + url + " in " + path)
90             self._cached_files[f['sequence']] = path
91             if self.n_segments_keep != -1:
92                 self.delete_cache(lambda x: x <= f['sequence'] - self.n_segments_keep)
93             if self._new_filed:
94                 self._new_filed.callback((path, url, f))
95                 self._new_filed = None
96             return (path, url, f)
97         def _check(x):
98             logging.debug("Received segment of %r bytes." % len(x))
99             return x
100
101         print '%r %r' % (self._file_playlist.url, f['file'])
102         url = HLS.make_url(self._file_playlist.url, f['file'])
103         name = urlparse.urlparse(f['file']).path.split('/')[-1]
104         path = os.path.join(self.path, name)
105         d = self._get_page(url)
106         d.addCallback(_check)
107         if self.n_segments_keep != 0:
108             file = open(path, 'w')
109             d.addCallback(lambda x: file.write(x))
110             d.addCallback(lambda _: path)
111             d.addErrback(_got_file_failed, self)
112             d.addCallback(_got_file, self, url, f)
113             d.addBoth(lambda _: file.close())
114         else:
115             d.addCallback(lambda _: (None, path, f))
116         return d
117
118     def delete_cache(self, f):
119         keys = self._cached_files.keys()
120         for i in ifilter(f, keys):
121             filename = self._cached_files[i]
122             logging.debug("Removing %r" % filename)
123             os.remove(filename)
124             del self._cached_files[i]
125         self._cached_files
126
127     def _get_next_file(self, last_file=None):
128         next = self._files.next()
129         if next:
130             delay = 0
131             if last_file:
132                 # FIXME not only the last nbuffer, but the nbuffer -1 ...
133                 if self.nbuffer > 0 and not self._cached_files.has_key(last_file['sequence'] - (self.nbuffer - 1)):
134                     delay = 0
135                 elif self._file_playlist.endlist():
136                     delay = 1
137                 else:
138                     delay = last_file['duration'] * 0.5 # doesn't work
139                               # when duration is not in sync with
140                               # player, which can happen easily...
141             return deferLater(reactor, delay, self._download_segment, next)
142         elif not self._file_playlist.endlist():
143             # when new uri are added to the playlist
144             self._file_playlisted = defer.Deferred()
145             self._file_playlisted.addCallback(lambda x: self._get_next_file(last_file))
146             return self._file_playlisted
147
148     def _handle_end(self, failure):
149         failure.trap(StopIteration)
150         print "End of media"
151         reactor.stop()
152
153     def _get_files_loop(self, last_file=None):
154         if last_file:
155             (path, l, f) = last_file
156         else:
157             f = None
158         d = self._get_next_file(f)
159         # and loop
160         d.addCallback(self._get_files_loop)
161         d.addErrback(self._handle_end)
162         d.addErrback(lambda f: f.trap(Exception))
163
164     def _playlist_updated(self, pl):
165         if pl.has_programs():
166             # if we got a program playlist, save it and start a program
167             self._program_playlist = pl
168             (program_url, _) = pl.get_program_playlist(self.program, self.bitrate)
169             l = HLS.make_url(self.url, program_url)
170             return self._reload_playlist(M3U8(l))
171         elif pl.has_files():
172             # we got sequence playlist, start reloading it regularly, and get files
173             self._file_playlist = pl
174             if not self._files:
175                 self._files = pl.iter_files()
176             if not pl.endlist():
177                 # FIXME: reload delay - previous request time
178                 reactor.callLater(pl.reload_delay(), self._reload_playlist, pl)
179             if self._file_playlisted:
180                 self._file_playlisted.callback(pl)
181                 self._file_playlisted = None
182         else:
183             raise
184         return pl
185
186     def _got_playlist_content(self, content, pl):
187         if not pl.update(content):
188             # if the playlist is not updated, start a reload timer
189             d = deferLater(reactor, pl.reload_delay(), self._fetch_playlist, pl)
190             d.addCallback(self._got_playlist_content, pl)
191             return d
192         return pl
193
194     def _fetch_playlist(self, pl):
195         logging.debug('fetching %r' % pl.url)
196         d = self._get_page(pl.url)
197         return d
198
199     def _reload_playlist(self, pl):
200         d = self._fetch_playlist(pl)
201         d.addCallback(self._got_playlist_content, pl)
202         d.addCallback(self._playlist_updated)
203         #d.addErrback(lambda f: f.trap(Exception))
204         return d
205
206     def get_file(self, sequence):
207         d = defer.Deferred()
208         keys = self._cached_files.keys()
209         try:
210             sequence = ifilter(lambda x: x >= sequence, keys).next()
211             filename = self._cached_files[sequence]
212             d.callback(filename)
213         except:
214             d.addCallback(lambda x: self.get_file(sequence))
215             self._new_filed = d
216             keys.sort()
217             logging.debug('waiting for %r (available: %r)' % (sequence, keys))
218         return d
219
220     def _start_get_files(self, x):
221         self._new_filed = defer.Deferred()
222         self._get_files_loop()
223         return self._new_filed
224
225     def start(self):
226         self._files = None
227         d = self._reload_playlist(M3U8(self.url))
228         d.addCallback(self._start_get_files)
229         return d
230
231     def stop(self):
232         pass
233