1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from util import ScheduleDict, tryErr
26 EPOLL_READ = select.EPOLLIN | select.EPOLLPRI | select.EPOLLERR | select.EPOLLHUP
27 EPOLL_WRITE = select.EPOLLOUT
30 ac_in_buffer_size = 4096
31 ac_out_buffer_size = 4096
33 def handle_close(self):
38 def handle_error(self):
39 self.logger.debug(traceback.format_exc())
42 def handle_read(self):
44 data = self.recv (self.ac_in_buffer_size)
45 except socket.error as why:
50 # All input is ignored from sockets we have "closed"
53 if isinstance(data, str) and self.use_encoding:
54 data = bytes(str, self.encoding)
55 self.ac_in_buffer = self.ac_in_buffer + data
57 self.server.lastReadbuf = self.ac_in_buffer
61 collect_incoming_data = asynchat.async_chat._collect_incoming_data
62 get_terminator = asynchat.async_chat.get_terminator
63 set_terminator = asynchat.async_chat.set_terminator
65 def handle_readbuf(self):
66 while self.ac_in_buffer:
67 lb = len(self.ac_in_buffer)
68 terminator = self.get_terminator()
70 # no terminator, collect it all
71 self.collect_incoming_data (self.ac_in_buffer)
72 self.ac_in_buffer = b''
73 elif isinstance(terminator, int):
77 self.collect_incoming_data (self.ac_in_buffer)
78 self.ac_in_buffer = b''
79 self.terminator = self.terminator - lb
81 self.collect_incoming_data (self.ac_in_buffer[:n])
82 self.ac_in_buffer = self.ac_in_buffer[n:]
84 self.found_terminator()
87 # 1) end of buffer matches terminator exactly:
88 # collect data, transition
89 # 2) end of buffer matches some prefix:
90 # collect data to the prefix
91 # 3) end of buffer does not match any prefix:
93 # NOTE: this supports multiple different terminators, but
94 # NOT ones that are prefixes of others...
95 if isinstance(self.ac_in_buffer, type(terminator)):
96 terminator = (terminator,)
97 termidx = tuple(map(self.ac_in_buffer.find, terminator))
99 index = min(x for x in termidx if x >= 0)
103 # we found the terminator
105 # don't bother reporting the empty string (source of subtle bugs)
106 self.collect_incoming_data (self.ac_in_buffer[:index])
107 specific_terminator = terminator[termidx.index(index)]
108 terminator_len = len(specific_terminator)
109 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
110 # This does the Right Thing if the terminator is changed here.
111 self.found_terminator()
113 # check for a prefix of the terminator
114 termidx = tuple(map(lambda a: asynchat.find_prefix_at_end (self.ac_in_buffer, a), terminator))
118 # we found a prefix, collect up to the prefix
119 self.collect_incoming_data (self.ac_in_buffer[:-index])
120 self.ac_in_buffer = self.ac_in_buffer[-index:]
123 # no prefix, collect it all
124 self.collect_incoming_data (self.ac_in_buffer)
125 self.ac_in_buffer = b''
127 def push(self, data):
128 if not len(self.wbuf):
129 # Try to send as much as we can immediately
131 bs = self.socket.send(data)
133 # Chances are we'll fail later, but anyway...
139 self.server.register_socket_m(self.fd, EPOLL_READ | EPOLL_WRITE)
141 def handle_timeout(self):
144 def handle_write(self):
145 if self.wbuf is None:
146 # Socket was just closed by remote peer
148 bs = self.socket.send(self.wbuf)
149 self.wbuf = self.wbuf[bs:]
150 if not len(self.wbuf):
154 self.server.register_socket_m(self.fd, EPOLL_READ)
156 recv = asynchat.async_chat.recv
165 self.server.unregister_socket(self.fd)
169 def changeTask(self, f, t = None):
170 tryErr(self.server.rmSchedule, self._Task, IgnoredExceptions=KeyError)
172 self._Task = self.server.schedule(f, t, errHandler=self)
174 def __init__(self, server, sock, addr):
175 self.ac_in_buffer = b''
183 self.fd = sock.fileno()
184 server.register_socket(self.fd, self)
185 self.changeTask(self.handle_timeout, time() + 15)
188 def _register(cls, scls):
190 if a == 'final_init':
191 f = lambda self, x=getattr(cls, a), y=getattr(scls, a): (x(self), y(self))
196 setattr(cls, a, getattr(scls, a))
198 class NetworkListener:
199 logger = logging.getLogger('SocketListener')
201 def __init__(self, server, server_address, address_family = socket.AF_INET6):
203 self.server_address = server_address
204 self.address_family = address_family
205 tryErr(self.setup_socket, server_address, Logger=self.logger, ErrorMsg=server_address)
207 def _makebind_py(self, server_address):
208 sock = socket.socket(self.address_family, socket.SOCK_STREAM)
211 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
214 sock.bind(server_address)
217 def _makebind_su(self, server_address):
218 if self.address_family != socket.AF_INET6:
219 raise NotImplementedError
221 from bindservice import bindservice
222 (node, service) = server_address
223 if not node: node = ''
224 if not service: service = ''
225 fd = bindservice(str(node), str(service))
226 sock = socket.fromfd(fd, socket.AF_INET6, socket.SOCK_STREAM)
230 def _makebind(self, *a, **ka):
232 return self._makebind_py(*a, **ka)
233 except BaseException as e:
235 return self._makebind_su(*a, **ka)
240 def setup_socket(self, server_address):
241 sock = self._makebind(server_address)
243 self.server.register_socket(sock.fileno(), self)
246 def handle_read(self):
248 conn, addr = self.socket.accept()
249 conn.setblocking(False)
250 h = server.RequestHandlerClass(server, conn, addr)
252 def handle_error(self):
253 # Ignore errors... like socket closing on the queue
257 def __init__(self, server, fd):
260 self.logger = logging.getLogger('Waker for %s' % (server.__class__.__name__,))
262 def handle_read(self):
263 data = os.read(self.fd, 1)
265 self.logger.error('Got EOF on socket')
266 self.logger.debug('Read wakeup')
268 class AsyncSocketServer:
269 logger = logging.getLogger('SocketServer')
273 def __init__(self, RequestHandlerClass):
274 if not hasattr(self, 'ServerName'):
275 self.ServerName = 'Eloipool'
277 self.RequestHandlerClass = RequestHandlerClass
280 self.keepgoing = True
282 self._epoll = select.epoll()
285 self._sch = ScheduleDict()
288 self.TrustedForwarders = ()
293 self.register_socket(r, o)
296 def register_socket(self, fd, o, eventmask = EPOLL_READ):
297 self._epoll.register(fd, eventmask)
300 def register_socket_m(self, fd, eventmask):
302 self._epoll.modify(fd, eventmask)
306 def unregister_socket(self, fd):
309 self._epoll.unregister(fd)
313 def schedule(self, task, startTime, errHandler=None):
314 self._sch[task] = startTime
316 self._schEH[id(task)] = errHandler
319 def rmSchedule(self, task):
325 def pre_schedule(self):
330 raise NotImplementedError('Class `%s\' did not enable waker' % (self.__class__.__name__))
331 os.write(self.waker, b'\1') # to break out of the epoll
333 def final_init(self):
336 def serve_forever(self):
339 while self.keepgoing:
340 self.doing = 'pre-schedule'
342 self.doing = 'schedule'
346 timeNext = self._sch.nextTime()
347 if timeNow < timeNext:
348 timeout = timeNext - timeNow
350 f = self._sch.shift()
359 if EH: tryErr(EH.handle_error)
361 self.logger.error(traceback.format_exc())
362 if EH: tryErr(EH.handle_close)
363 if not len(self._sch):
371 events = self._epoll.poll(timeout=timeout)
372 except (IOError, select.error):
375 self.logger.error(traceback.format_exc())
376 self.doing = 'events'
377 for (fd, e) in events:
386 tryErr(o.handle_error)
388 self.logger.error(traceback.format_exc())
389 tryErr(o.handle_error)