2 # vi:si:et:sw=4:sts=4:ts=4
4 # Copyright (C) 2009-2010 Fluendo, S.L. (www.fluendo.com).
5 # Copyright (C) 2009-2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
7 # This file may be distributed and/or modified under the terms of
8 # the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 # This file is distributed without any warranty; without even the implied
11 # warranty of merchantability or fitness for a particular purpose.
12 # See "LICENSE" in the source distribution for more information.
14 from itertools import ifilter
20 from twisted.python import log
21 from twisted.web import client
22 from twisted.internet import defer, reactor, task
23 from twisted.internet.task import deferLater
26 from HLS.m3u8 import M3U8
28 class HLSFetcher(object):
30 def __init__(self, url, options=None, program=1):
32 self.program = program
34 self.path = options.path
35 self.referer = options.referer
36 self.bitrate = options.bitrate
37 self.n_segments_keep = options.keep
38 self.nbuffer = options.buffer
43 self.n_segments_keep = 3
46 self.path = tempfile.mkdtemp()
48 self._program_playlist = None
49 self._file_playlist = None
51 self._cached_files = {} # sequence n -> path
53 self._files = None # the iter of the playlist files download
54 self._next_download = None # the delayed download defer, if any
55 self._file_playlisted = None # the defer to wait until new files are added to playlist
60 def _get_page(self, url):
61 def got_page(content):
62 logging.debug("Cookies: %r" % self._cookies)
64 def got_page_error(e, url):
69 url = url.encode("utf-8")
70 if 'HLS_RESET_COOKIES' in os.environ.keys():
74 headers['Referer'] = self.referer
75 d = client.getPage(url, cookies=self._cookies, headers=headers)
76 d.addCallback(got_page)
77 d.addErrback(got_page_error, url)
80 def _download_page(self, url, path):
81 # client.downloadPage does not support cookies!
83 logging.debug("Received segment of %r bytes." % len(x))
86 d = self._get_page(url)
92 def _download_segment(self, f):
93 url = HLS.make_url(self._file_playlist.url, f['file'])
94 name = urlparse.urlparse(f['file']).path.split('/')[-1]
95 path = os.path.join(self.path, name)
96 d = self._download_page(url, path)
97 if self.n_segments_keep != 0:
98 file = open(path, 'w')
99 d.addCallback(lambda x: file.write(x))
100 d.addBoth(lambda _: file.close())
101 d.addCallback(lambda _: path)
102 d.addErrback(self._got_file_failed)
103 d.addCallback(self._got_file, url, f)
105 d.addCallback(lambda _: (None, path, f))
108 def delete_cache(self, f):
109 keys = self._cached_files.keys()
110 for i in ifilter(f, keys):
111 filename = self._cached_files[i]
112 logging.debug("Removing %r" % filename)
114 del self._cached_files[i]
117 def _got_file_failed(self, e):
119 self._new_filed.errback(e)
120 self._new_filed = None
122 def _got_file(self, path, url, f):
123 logging.debug("Saved " + url + " in " + path)
124 self._cached_files[f['sequence']] = path
125 if self.n_segments_keep != -1:
126 self.delete_cache(lambda x: x <= f['sequence'] - self.n_segments_keep)
128 self._new_filed.callback((path, url, f))
129 self._new_filed = None
130 return (path, url, f)
132 def _get_next_file(self):
133 next = self._files.next()
135 d = self._download_segment(next)
137 elif not self._file_playlist.endlist():
138 self._seg_task.stop()
139 self._file_playlisted = defer.Deferred()
140 self._file_playlisted.addCallback(lambda x: self._get_next_file())
141 self._file_playlisted.addCallback(self._next_file_delay)
142 self._file_playlisted.addCallback(self._seg_task.start)
143 return self._file_playlisted
145 def _handle_end(self, failure):
146 failure.trap(StopIteration)
150 def _next_file_delay(self, f):
151 delay = f[2]["duration"]
152 # FIXME not only the last nbuffer, but the nbuffer -1 ...
153 if self.nbuffer > 0 and not self._cached_files.has_key(f[2]['sequence'] - (self.nbuffer - 1)):
155 elif self._file_playlist.endlist():
159 def _get_files_loop(self):
160 if not self._seg_task:
161 self._seg_task = task.LoopingCall(self._get_next_file)
162 d = self._get_next_file()
163 d.addCallback(self._next_file_delay)
164 d.addCallback(self._seg_task.start)
167 def _playlist_updated(self, pl):
168 if pl.has_programs():
169 # if we got a program playlist, save it and start a program
170 self._program_playlist = pl
171 (program_url, _) = pl.get_program_playlist(self.program, self.bitrate)
172 l = HLS.make_url(self.url, program_url)
173 return self._reload_playlist(M3U8(l))
175 # we got sequence playlist, start reloading it regularly, and get files
176 self._file_playlist = pl
178 self._files = pl.iter_files()
180 if not self._pl_task:
181 self._pl_task = task.LoopingCall(self._reload_playlist, pl)
182 self._pl_task.start(10, False)
183 if self._file_playlisted:
184 self._file_playlisted.callback(pl)
185 self._file_playlisted = None
190 def _got_playlist_content(self, content, pl):
191 if not pl.update(content):
192 # if the playlist cannot be loaded, start a reload timer
194 self._pl_task.start(pl.reload_delay(), False)
195 d = deferLater(reactor, pl.reload_delay(), self._fetch_playlist, pl)
196 d.addCallback(self._got_playlist_content, pl)
200 def _fetch_playlist(self, pl):
201 logging.debug('fetching %r' % pl.url)
202 d = self._get_page(pl.url)
205 def _reload_playlist(self, pl):
206 d = self._fetch_playlist(pl)
207 d.addCallback(self._got_playlist_content, pl)
208 d.addCallback(self._playlist_updated)
211 def get_file(self, sequence):
213 keys = self._cached_files.keys()
215 sequence = ifilter(lambda x: x >= sequence, keys).next()
216 filename = self._cached_files[sequence]
219 d.addCallback(lambda x: self.get_file(sequence))
222 logging.debug('waiting for %r (available: %r)' % (sequence, keys))
225 def _start_get_files(self, x):
226 self._new_filed = defer.Deferred()
227 self._get_files_loop()
228 return self._new_filed
232 d = self._reload_playlist(M3U8(self.url))
233 d.addCallback(self._start_get_files)