2 # vi:si:et:sw=4:sts=4:ts=4
4 # Copyright (C) 2009-2010 Fluendo, S.L. (www.fluendo.com).
5 # Copyright (C) 2009-2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
7 # This file may be distributed and/or modified under the terms of
8 # the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 # This file is distributed without any warranty; without even the implied
11 # warranty of merchantability or fitness for a particular purpose.
12 # See "LICENSE" in the source distribution for more information.
14 from itertools import ifilter
20 from twisted.python import log
21 from twisted.web import client
22 from twisted.internet import defer, reactor
23 from twisted.internet.task import deferLater
26 from HLS.m3u8 import M3U8
28 class HLSFetcher(object):
30 def __init__(self, url, options=None, program=1):
32 self.program = program
34 self.path = options.path
35 self.referer = options.referer
36 self.bitrate = options.bitrate
37 self.n_segments_keep = options.keep
38 self.nbuffer = options.buffer
43 self.n_segments_keep = 3
46 self.path = tempfile.mkdtemp()
48 self._program_playlist = None
49 self._file_playlist = None
51 self._cached_files = {} # sequence n -> path
53 self._files = None # the iter of the playlist files download
54 self._next_download = None # the delayed download defer, if any
55 self._file_playlisted = None # the defer to wait until new files are added to playlist
57 def _get_page(self, url):
58 def got_page(content):
59 logging.debug("Cookies: %r" % self._cookies)
61 def got_page_error(e, url):
66 url = url.encode("utf-8")
67 if 'HLS_RESET_COOKIES' in os.environ.keys():
71 headers['Referer'] = self.referer
72 d = client.getPage(url, cookies=self._cookies, headers=headers)
73 d.addCallback(got_page)
74 d.addErrback(got_page_error, url)
77 def _download_page(self, url, path):
78 # client.downloadPage does not support cookies!
80 logging.debug("Received segment of %r bytes." % len(x))
83 d = self._get_page(url)
89 def _download_segment(self, f):
90 url = HLS.make_url(self._file_playlist.url, f['file'])
91 name = urlparse.urlparse(f['file']).path.split('/')[-1]
92 path = os.path.join(self.path, name)
93 d = self._download_page(url, path)
94 if self.n_segments_keep != 0:
95 file = open(path, 'w')
96 d.addCallback(lambda x: file.write(x))
97 d.addBoth(lambda _: file.close())
98 d.addCallback(lambda _: path)
99 d.addErrback(self._got_file_failed)
100 d.addCallback(self._got_file, url, f)
102 d.addCallback(lambda _: (None, path, f))
105 def delete_cache(self, f):
106 keys = self._cached_files.keys()
107 for i in ifilter(f, keys):
108 filename = self._cached_files[i]
109 logging.debug("Removing %r" % filename)
111 del self._cached_files[i]
114 def _got_file_failed(self, e):
116 self._new_filed.errback(e)
117 self._new_filed = None
119 def _got_file(self, path, url, f):
120 logging.debug("Saved " + url + " in " + path)
121 self._cached_files[f['sequence']] = path
122 if self.n_segments_keep != -1:
123 self.delete_cache(lambda x: x <= f['sequence'] - self.n_segments_keep)
125 self._new_filed.callback((path, url, f))
126 self._new_filed = None
127 return (path, url, f)
129 def _get_next_file(self, last_file=None):
130 next = self._files.next()
134 # FIXME not only the last nbuffer, but the nbuffer -1 ...
135 if self.nbuffer > 0 and not self._cached_files.has_key(last_file['sequence'] - (self.nbuffer - 1)):
137 elif self._file_playlist.endlist():
140 delay = last_file['duration'] * 0.5 # doesn't work
141 # when duration is not in sync with
142 # player, which can happen easily...
143 return deferLater(reactor, delay, self._download_segment, next)
144 elif not self._file_playlist.endlist():
145 self._file_playlisted = defer.Deferred()
146 self._file_playlisted.addCallback(lambda x: self._get_next_file(last_file))
147 return self._file_playlisted
149 def _handle_end(self, failure):
150 failure.trap(StopIteration)
154 def _get_files_loop(self, last_file=None):
156 (path, l, f) = last_file
159 d = self._get_next_file(f)
161 d.addCallback(self._get_files_loop)
162 d.addErrback(self._handle_end)
164 def _playlist_updated(self, pl):
165 if pl.has_programs():
166 # if we got a program playlist, save it and start a program
167 self._program_playlist = pl
168 (program_url, _) = pl.get_program_playlist(self.program, self.bitrate)
169 l = HLS.make_url(self.url, program_url)
170 return self._reload_playlist(M3U8(l))
172 # we got sequence playlist, start reloading it regularly, and get files
173 self._file_playlist = pl
175 self._files = pl.iter_files()
177 # FIXME: reload delay - previous request time
178 reactor.callLater(pl.reload_delay(), self._reload_playlist, pl)
179 if self._file_playlisted:
180 self._file_playlisted.callback(pl)
181 self._file_playlisted = None
186 def _got_playlist_content(self, content, pl):
187 if not pl.update(content):
188 # if the playlist cannot be loaded, start a reload timer
189 d = deferLater(reactor, pl.reload_delay(), self._fetch_playlist, pl)
190 d.addCallback(self._got_playlist_content, pl)
194 def _fetch_playlist(self, pl):
195 logging.debug('fetching %r' % pl.url)
196 d = self._get_page(pl.url)
199 def _reload_playlist(self, pl):
200 d = self._fetch_playlist(pl)
201 d.addCallback(self._got_playlist_content, pl)
202 d.addCallback(self._playlist_updated)
205 def get_file(self, sequence):
207 keys = self._cached_files.keys()
209 sequence = ifilter(lambda x: x >= sequence, keys).next()
210 filename = self._cached_files[sequence]
213 d.addCallback(lambda x: self.get_file(sequence))
216 logging.debug('waiting for %r (available: %r)' % (sequence, keys))
219 def _start_get_files(self, x):
220 self._new_filed = defer.Deferred()
221 self._get_files_loop()
222 return self._new_filed
226 d = self._reload_playlist(M3U8(self.url))
227 d.addCallback(self._start_get_files)