2 # vi:si:et:sw=4:sts=4:ts=4
4 # Copyright (C) 2009-2010 Fluendo, S.L. (www.fluendo.com).
5 # Copyright (C) 2009-2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
7 # This file may be distributed and/or modified under the terms of
8 # the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 # This file is distributed without any warranty; without even the implied
11 # warranty of merchantability or fitness for a particular purpose.
12 # See "LICENSE" in the source distribution for more information.
14 from itertools import ifilter
20 from twisted.web import client
21 from twisted.internet import defer, reactor
22 from twisted.internet.task import deferLater
25 from HLS.m3u8 import M3U8
27 class HLSFetcher(object):
29 def __init__(self, url, options=None, program=1):
31 self.program = program
33 self.path = options.path
34 self.referer = options.referer
35 self.bitrate = options.bitrate
36 self.n_segments_keep = options.keep
41 self.n_segments_keep = 3
43 self.path = tempfile.mkdtemp()
45 self._program_playlist = None
46 self._file_playlist = None
48 self._cached_files = {}
50 self._files = None # the iter of the playlist files download
51 self._next_download = None # the delayed download defer, if any
52 self._file_playlisted = None # the defer to wait until new files are added to playlist
54 def _get_page(self, url):
55 def got_page(content):
56 logging.debug("Cookies: %r" % self._cookies)
58 url = url.encode("utf-8")
59 if 'HLS_RESET_COOKIES' in os.environ.keys():
63 headers['Referer'] = self.referer
64 d = client.getPage(url, cookies=self._cookies, headers=headers)
65 d.addCallback(got_page)
68 def _download_page(self, url, path):
69 # client.downloadPage does not support cookies!
71 logging.debug("Received segment of %r bytes." % len(x))
74 d = self._get_page(url)
77 d.addCallback(lambda x: f.write(x))
78 d.addBoth(lambda _: f.close())
79 d.addCallback(lambda _: path)
82 def delete_cache(self, f):
83 keys = self._cached_files.keys()
84 for i in ifilter(f, keys):
85 filename = self._cached_files[i]
86 logging.debug("Removing %r" % filename)
88 del self._cached_files[i]
91 def _got_file_failed(self, e):
93 self._new_filed.errback(e)
94 self._new_filed = None
96 def _got_file(self, path, l, f):
97 logging.debug("Saved " + l + " in " + path)
98 self._cached_files[f['sequence']] = path
99 if self.n_segments_keep != -1:
100 self.delete_cache(lambda x: x <= f['sequence'] - self.n_segments_keep)
102 self._new_filed.callback((path, l, f))
103 self._new_filed = None
106 def _download_file(self, f):
107 l = HLS.make_url(self._file_playlist.url, f['file'])
108 name = urlparse.urlparse(f['file']).path.split('/')[-1]
109 path = os.path.join(self.path, name)
110 d = self._download_page(l, path)
111 d.addErrback(self._got_file_failed)
112 d.addCallback(self._got_file, l, f)
115 def _get_next_file(self, last_file=None):
116 next = self._files.next()
120 if not self._cached_files.has_key(last_file['sequence'] - 1) or \
121 not self._cached_files.has_key(last_file['sequence'] - 2):
123 elif self._file_playlist.endlist():
126 delay = 1 # last_file['duration'] doesn't work
127 # when duration is not in sync with
128 # player, which can happen easily...
129 return deferLater(reactor, delay, self._download_file, next)
130 elif not self._file_playlist.endlist():
131 self._file_playlisted = defer.Deferred()
132 self._file_playlisted.addCallback(lambda x: self._get_next_file(last_file))
133 return self._file_playlisted
135 def _handle_end(self, failure):
136 failure.trap(StopIteration)
140 def _get_files_loop(self, last_file=None):
142 (path, l, f) = last_file
145 d = self._get_next_file(f)
147 d.addCallback(self._get_files_loop)
148 d.addErrback(self._handle_end)
150 def _playlist_updated(self, pl):
151 if pl.has_programs():
152 # if we got a program playlist, save it and start a program
153 self._program_playlist = pl
154 (program_url, _) = pl.get_program_playlist(self.program, self.bitrate)
155 l = HLS.make_url(self.url, program_url)
156 return self._reload_playlist(M3U8(l))
158 # we got sequence playlist, start reloading it regularly, and get files
159 self._file_playlist = pl
161 self._files = pl.iter_files()
163 reactor.callLater(pl.reload_delay(), self._reload_playlist, pl)
164 if self._file_playlisted:
165 self._file_playlisted.callback(pl)
166 self._file_playlisted = None
171 def _got_playlist_content(self, content, pl):
172 if not pl.update(content):
173 # if the playlist cannot be loaded, start a reload timer
174 d = deferLater(reactor, pl.reload_delay(), self._fetch_playlist, pl)
175 d.addCallback(self._got_playlist_content, pl)
179 def _fetch_playlist(self, pl):
180 logging.debug('fetching %r' % pl.url)
181 d = self._get_page(pl.url)
184 def _reload_playlist(self, pl):
185 d = self._fetch_playlist(pl)
186 d.addCallback(self._got_playlist_content, pl)
187 d.addCallback(self._playlist_updated)
190 def get_file(self, sequence):
192 keys = self._cached_files.keys()
194 sequence = ifilter(lambda x: x >= sequence, keys).next()
195 filename = self._cached_files[sequence]
198 d.addCallback(lambda x: self.get_file(sequence))
201 logging.debug('waiting for %r (available: %r)' % (sequence, keys))
204 def _start_get_files(self, x):
205 self._new_filed = defer.Deferred()
206 self._get_files_loop()
207 return self._new_filed
211 d = self._reload_playlist(M3U8(self.url))
212 d.addCallback(self._start_get_files)