1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 from util import ScheduleDict, WithNoop, tryErr
27 EPOLL_READ = select.EPOLLIN | select.EPOLLPRI | select.EPOLLERR | select.EPOLLHUP
28 EPOLL_WRITE = select.EPOLLOUT
31 ac_in_buffer_size = 4096
32 ac_out_buffer_size = 4096
34 def handle_close(self):
38 def handle_error(self):
39 self.logger.debug(traceback.format_exc())
42 def handle_read(self):
44 data = self.recv (self.ac_in_buffer_size)
45 except socket.error as why:
50 # All input is ignored from sockets we have "closed"
53 if isinstance(data, str) and self.use_encoding:
54 data = bytes(str, self.encoding)
55 self.ac_in_buffer = self.ac_in_buffer + data
57 self.server.lastReadbuf = self.ac_in_buffer
61 collect_incoming_data = asynchat.async_chat._collect_incoming_data
62 get_terminator = asynchat.async_chat.get_terminator
63 set_terminator = asynchat.async_chat.set_terminator
65 def handle_readbuf(self):
66 while self.ac_in_buffer:
67 lb = len(self.ac_in_buffer)
68 terminator = self.get_terminator()
70 # no terminator, collect it all
71 self.collect_incoming_data (self.ac_in_buffer)
72 self.ac_in_buffer = b''
73 elif isinstance(terminator, int):
77 self.collect_incoming_data (self.ac_in_buffer)
78 self.ac_in_buffer = b''
79 self.terminator = self.terminator - lb
81 self.collect_incoming_data (self.ac_in_buffer[:n])
82 self.ac_in_buffer = self.ac_in_buffer[n:]
84 self.found_terminator()
87 # 1) end of buffer matches terminator exactly:
88 # collect data, transition
89 # 2) end of buffer matches some prefix:
90 # collect data to the prefix
91 # 3) end of buffer does not match any prefix:
93 # NOTE: this supports multiple different terminators, but
94 # NOT ones that are prefixes of others...
95 if isinstance(self.ac_in_buffer, type(terminator)):
96 terminator = (terminator,)
97 termidx = tuple(map(self.ac_in_buffer.find, terminator))
99 index = min(x for x in termidx if x >= 0)
103 # we found the terminator
105 # don't bother reporting the empty string (source of subtle bugs)
106 self.collect_incoming_data (self.ac_in_buffer[:index])
107 specific_terminator = terminator[termidx.index(index)]
108 terminator_len = len(specific_terminator)
109 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
110 # This does the Right Thing if the terminator is changed here.
111 self.found_terminator()
113 # check for a prefix of the terminator
114 termidx = tuple(map(lambda a: asynchat.find_prefix_at_end (self.ac_in_buffer, a), terminator))
118 # we found a prefix, collect up to the prefix
119 self.collect_incoming_data (self.ac_in_buffer[:-index])
120 self.ac_in_buffer = self.ac_in_buffer[-index:]
123 # no prefix, collect it all
124 self.collect_incoming_data (self.ac_in_buffer)
125 self.ac_in_buffer = b''
127 def push(self, data):
128 if not len(self.wbuf):
129 # Try to send as much as we can immediately
131 bs = self.socket.send(data)
133 # Chances are we'll fail later, but anyway...
139 self.server.register_socket_m(self.fd, EPOLL_READ | EPOLL_WRITE)
141 def handle_timeout(self):
144 def handle_write(self):
145 if self.wbuf is None:
146 # Socket was just closed by remote peer
148 bs = self.socket.send(self.wbuf)
149 self.wbuf = self.wbuf[bs:]
150 if not len(self.wbuf):
154 self.server.register_socket_m(self.fd, EPOLL_READ)
156 recv = asynchat.async_chat.recv
166 del self.server.connections[id(self)]
169 self.server.unregister_socket(self.fd)
170 self.changeTask(None)
176 self.ac_in_buffer = b''
178 def changeTask(self, f, t = None):
179 tryErr(self.server.rmSchedule, self._Task, IgnoredExceptions=KeyError)
181 self._Task = self.server.schedule(f, t, errHandler=self)
185 def __init__(self, server, sock, addr):
186 self.ac_in_buffer = b''
194 self.fd = sock.fileno()
195 server.register_socket(self.fd, self)
196 server.connections[id(self)] = self
197 self.changeTask(self.handle_timeout, time() + 15)
200 def _register(cls, scls):
202 if a == 'final_init':
203 f = lambda self, x=getattr(cls, a), y=getattr(scls, a): (x(self), y(self))
208 setattr(cls, a, getattr(scls, a))
210 class NetworkListener:
211 logger = logging.getLogger('SocketListener')
213 def __init__(self, server, server_address, address_family = socket.AF_INET6):
215 self.server_address = server_address
216 self.address_family = address_family
217 tryErr(self.setup_socket, server_address, Logger=self.logger, ErrorMsg=server_address)
219 def _makebind_py(self, server_address):
220 sock = socket.socket(self.address_family, socket.SOCK_STREAM)
223 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
226 sock.bind(server_address)
229 def _makebind_su(self, server_address):
230 if self.address_family != socket.AF_INET6:
231 raise NotImplementedError
233 from bindservice import bindservice
234 (node, service) = server_address
235 if not node: node = ''
236 if not service: service = ''
237 fd = bindservice(str(node), str(service))
238 sock = socket.fromfd(fd, socket.AF_INET6, socket.SOCK_STREAM)
242 def _makebind(self, *a, **ka):
244 return self._makebind_py(*a, **ka)
245 except BaseException as e:
247 return self._makebind_su(*a, **ka)
252 def setup_socket(self, server_address):
253 sock = self._makebind(server_address)
255 self.server.register_socket(sock.fileno(), self)
258 def handle_read(self):
260 conn, addr = self.socket.accept()
264 conn.setblocking(False)
265 h = server.RequestHandlerClass(server, conn, addr)
267 def handle_error(self):
268 # Ignore errors... like socket closing on the queue
272 def __init__(self, server, fd):
275 self.logger = logging.getLogger('Waker for %s' % (server.__class__.__name__,))
277 def handle_read(self):
278 data = os.read(self.fd, 1)
280 self.logger.error('Got EOF on socket')
281 self.logger.debug('Read wakeup')
283 class AsyncSocketServer:
284 logger = logging.getLogger('SocketServer')
289 def __init__(self, RequestHandlerClass):
290 if not hasattr(self, 'ServerName'):
291 self.ServerName = 'Eloipool'
293 self.RequestHandlerClass = RequestHandlerClass
296 self.keepgoing = True
297 self.rejecting = False
300 self._epoll = select.epoll()
302 self.connections = {}
304 self._sch = ScheduleDict()
307 self._schLock = threading.Lock()
309 self._schLock = WithNoop
311 self.TrustedForwarders = ()
316 self.register_socket(r, o)
319 def register_socket(self, fd, o, eventmask = EPOLL_READ):
320 self._epoll.register(fd, eventmask)
323 def register_socket_m(self, fd, eventmask):
325 self._epoll.modify(fd, eventmask)
329 def unregister_socket(self, fd):
332 self._epoll.unregister(fd)
336 def schedule(self, task, startTime, errHandler=None):
338 self._sch[task] = startTime
340 self._schEH[id(task)] = errHandler
343 def rmSchedule(self, task):
350 def pre_schedule(self):
355 raise NotImplementedError('Class `%s\' did not enable waker' % (self.__class__.__name__))
356 os.write(self.waker, b'\1') # to break out of the epoll
358 def final_init(self):
362 conns = tuple(self.connections.values())
364 tryErr(lambda: c.boot())
366 def serve_forever(self):
369 while self.keepgoing:
370 self.doing = 'pre-schedule'
372 self.doing = 'schedule'
377 if not len(self._sch):
380 timeNext = self._sch.nextTime()
381 if timeNow < timeNext:
382 timeout = timeNext - timeNow
384 f = self._sch.shift()
393 if EH: tryErr(EH.handle_error)
395 self.logger.error(traceback.format_exc())
396 if EH: tryErr(EH.handle_close)
399 if self.lastidle < timeNow - 1:
401 elif timeout < 0 or timeout > 1:
406 events = self._epoll.poll(timeout=timeout)
407 except (IOError, select.error):
410 self.logger.error(traceback.format_exc())
412 self.doing = 'events'
414 self.lastidle = time()
415 for (fd, e) in events:
424 tryErr(o.handle_error)
426 self.logger.error(traceback.format_exc())
427 tryErr(o.handle_error)