fetcher: use task loop for regular fetches
[hls-player:hls-player.git] / HLS / fetcher.py
1 # -*- Mode: Python -*-
2 # vi:si:et:sw=4:sts=4:ts=4
3 #
4 # Copyright (C) 2009-2010 Fluendo, S.L. (www.fluendo.com).
5 # Copyright (C) 2009-2010 Marc-Andre Lureau <marcandre.lureau@gmail.com>
6
7 # This file may be distributed and/or modified under the terms of
8 # the GNU General Public License version 2 as published by
9 # the Free Software Foundation.
10 # This file is distributed without any warranty; without even the implied
11 # warranty of merchantability or fitness for a particular purpose.
12 # See "LICENSE" in the source distribution for more information.
13
14 from itertools import ifilter
15 import logging
16 import os, os.path
17 import tempfile
18 import urlparse
19
20 from twisted.python import log
21 from twisted.web import client
22 from twisted.internet import defer, reactor, task
23 from twisted.internet.task import deferLater
24
25 import HLS
26 from HLS.m3u8 import M3U8
27
28 class HLSFetcher(object):
29
30     def __init__(self, url, options=None, program=1):
31         self.url = url
32         self.program = program
33         if options:
34             self.path = options.path
35             self.referer = options.referer
36             self.bitrate = options.bitrate
37             self.n_segments_keep = options.keep
38             self.nbuffer = options.buffer
39         else:
40             self.path = None
41             self.referer = None
42             self.bitrate = 200000
43             self.n_segments_keep = 3
44             self.nbuffer = 3
45         if not self.path:
46             self.path = tempfile.mkdtemp()
47
48         self._program_playlist = None
49         self._file_playlist = None
50         self._cookies = {}
51         self._cached_files = {} # sequence n -> path
52
53         self._files = None # the iter of the playlist files download
54         self._next_download = None # the delayed download defer, if any
55         self._file_playlisted = None # the defer to wait until new files are added to playlist
56
57         self._pl_task = None
58         self._seg_task = None
59
60     def _get_page(self, url):
61         def got_page(content):
62             logging.debug("Cookies: %r" % self._cookies)
63             return content
64         def got_page_error(e, url):
65             logging.error(url)
66             log.err(e)
67             return e
68
69         url = url.encode("utf-8")
70         if 'HLS_RESET_COOKIES' in os.environ.keys():
71             self._cookies = {}
72         headers = {}
73         if self.referer:
74             headers['Referer'] = self.referer
75         d = client.getPage(url, cookies=self._cookies, headers=headers)
76         d.addCallback(got_page)
77         d.addErrback(got_page_error, url)
78         return d
79
80     def _download_page(self, url, path):
81         # client.downloadPage does not support cookies!
82         def _check(x):
83             logging.debug("Received segment of %r bytes." % len(x))
84             return x
85
86         d = self._get_page(url)
87         d.addCallback(_check)
88         return d
89
90         return d
91
92     def _download_segment(self, f):
93         url = HLS.make_url(self._file_playlist.url, f['file'])
94         name = urlparse.urlparse(f['file']).path.split('/')[-1]
95         path = os.path.join(self.path, name)
96         d = self._download_page(url, path)
97         if self.n_segments_keep != 0:
98             file = open(path, 'w')
99             d.addCallback(lambda x: file.write(x))
100             d.addBoth(lambda _: file.close())
101             d.addCallback(lambda _: path)
102             d.addErrback(self._got_file_failed)
103             d.addCallback(self._got_file, url, f)
104         else:
105             d.addCallback(lambda _: (None, path, f))
106         return d
107
108     def delete_cache(self, f):
109         keys = self._cached_files.keys()
110         for i in ifilter(f, keys):
111             filename = self._cached_files[i]
112             logging.debug("Removing %r" % filename)
113             os.remove(filename)
114             del self._cached_files[i]
115         self._cached_files
116
117     def _got_file_failed(self, e):
118         if self._new_filed:
119             self._new_filed.errback(e)
120             self._new_filed = None
121
122     def _got_file(self, path, url, f):
123         logging.debug("Saved " + url + " in " + path)
124         self._cached_files[f['sequence']] = path
125         if self.n_segments_keep != -1:
126             self.delete_cache(lambda x: x <= f['sequence'] - self.n_segments_keep)
127         if self._new_filed:
128             self._new_filed.callback((path, url, f))
129             self._new_filed = None
130         return (path, url, f)
131
132     def _get_next_file(self):
133         next = self._files.next()
134         if next:
135             d = self._download_segment(next)
136             return d
137         elif not self._file_playlist.endlist():
138             self._seg_task.stop()
139             self._file_playlisted = defer.Deferred()
140             self._file_playlisted.addCallback(lambda x: self._get_next_file())
141             self._file_playlisted.addCallback(self._next_file_delay)
142             self._file_playlisted.addCallback(self._seg_task.start)
143             return self._file_playlisted
144
145     def _handle_end(self, failure):
146         failure.trap(StopIteration)
147         print "End of media"
148         reactor.stop()
149
150     def _next_file_delay(self, f):
151         delay = f[2]["duration"]
152         # FIXME not only the last nbuffer, but the nbuffer -1 ...
153         if self.nbuffer > 0 and not self._cached_files.has_key(f[2]['sequence'] - (self.nbuffer - 1)):
154             delay = 0
155         elif self._file_playlist.endlist():
156             delay = 1
157         return delay
158
159     def _get_files_loop(self):
160         if not self._seg_task:
161             self._seg_task = task.LoopingCall(self._get_next_file)
162         d = self._get_next_file()
163         d.addCallback(self._next_file_delay)
164         d.addCallback(self._seg_task.start)
165         return d
166
167     def _playlist_updated(self, pl):
168         if pl.has_programs():
169             # if we got a program playlist, save it and start a program
170             self._program_playlist = pl
171             (program_url, _) = pl.get_program_playlist(self.program, self.bitrate)
172             l = HLS.make_url(self.url, program_url)
173             return self._reload_playlist(M3U8(l))
174         elif pl.has_files():
175             # we got sequence playlist, start reloading it regularly, and get files
176             self._file_playlist = pl
177             if not self._files:
178                 self._files = pl.iter_files()
179             if not pl.endlist():
180                 if not self._pl_task:
181                     self._pl_task = task.LoopingCall(self._reload_playlist, pl)
182                     self._pl_task.start(10, False)
183             if self._file_playlisted:
184                 self._file_playlisted.callback(pl)
185                 self._file_playlisted = None
186         else:
187             raise
188         return pl
189
190     def _got_playlist_content(self, content, pl):
191         if not pl.update(content):
192             # if the playlist cannot be loaded, start a reload timer
193             self._pl_task.stop()
194             self._pl_task.start(pl.reload_delay(), False)
195             d = deferLater(reactor, pl.reload_delay(), self._fetch_playlist, pl)
196             d.addCallback(self._got_playlist_content, pl)
197             return d
198         return pl
199
200     def _fetch_playlist(self, pl):
201         logging.debug('fetching %r' % pl.url)
202         d = self._get_page(pl.url)
203         return d
204
205     def _reload_playlist(self, pl):
206         d = self._fetch_playlist(pl)
207         d.addCallback(self._got_playlist_content, pl)
208         d.addCallback(self._playlist_updated)
209         return d
210
211     def get_file(self, sequence):
212         d = defer.Deferred()
213         keys = self._cached_files.keys()
214         try:
215             sequence = ifilter(lambda x: x >= sequence, keys).next()
216             filename = self._cached_files[sequence]
217             d.callback(filename)
218         except:
219             d.addCallback(lambda x: self.get_file(sequence))
220             self._new_filed = d
221             keys.sort()
222             logging.debug('waiting for %r (available: %r)' % (sequence, keys))
223         return d
224
225     def _start_get_files(self, x):
226         self._new_filed = defer.Deferred()
227         self._get_files_loop()
228         return self._new_filed
229
230     def start(self):
231         self._files = None
232         d = self._reload_playlist(M3U8(self.url))
233         d.addCallback(self._start_get_files)
234         return d
235
236     def stop(self):
237         pass
238