1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 from util import ScheduleDict, WithNoop, tryErr
27 EPOLL_READ = select.EPOLLIN | select.EPOLLPRI | select.EPOLLERR | select.EPOLLHUP
28 EPOLL_WRITE = select.EPOLLOUT
31 ac_in_buffer_size = 4096
32 ac_out_buffer_size = 4096
34 def handle_close(self):
38 def handle_error(self):
39 self.logger.debug(traceback.format_exc())
42 # NOTE: This function checks for socket-closed condition and calls handle_close
43 recv = asynchat.async_chat.recv
45 def handle_read(self):
47 data = self.recv (self.ac_in_buffer_size)
48 except socket.error as why:
53 # All input is ignored from sockets we have "closed"
56 if isinstance(data, str) and self.use_encoding:
57 data = bytes(str, self.encoding)
58 self.ac_in_buffer = self.ac_in_buffer + data
60 self.server.lastReadbuf = self.ac_in_buffer
64 collect_incoming_data = asynchat.async_chat._collect_incoming_data
65 get_terminator = asynchat.async_chat.get_terminator
66 set_terminator = asynchat.async_chat.set_terminator
68 def handle_readbuf(self):
69 while self.ac_in_buffer:
70 lb = len(self.ac_in_buffer)
71 terminator = self.get_terminator()
73 # no terminator, collect it all
74 self.collect_incoming_data (self.ac_in_buffer)
75 self.ac_in_buffer = b''
76 elif isinstance(terminator, int):
80 self.collect_incoming_data (self.ac_in_buffer)
81 self.ac_in_buffer = b''
82 self.terminator = self.terminator - lb
84 self.collect_incoming_data (self.ac_in_buffer[:n])
85 self.ac_in_buffer = self.ac_in_buffer[n:]
87 self.found_terminator()
90 # 1) end of buffer matches terminator exactly:
91 # collect data, transition
92 # 2) end of buffer matches some prefix:
93 # collect data to the prefix
94 # 3) end of buffer does not match any prefix:
96 # NOTE: this supports multiple different terminators, but
97 # NOT ones that are prefixes of others...
98 if isinstance(self.ac_in_buffer, type(terminator)):
99 terminator = (terminator,)
100 termidx = tuple(map(self.ac_in_buffer.find, terminator))
102 index = min(x for x in termidx if x >= 0)
106 # we found the terminator
108 # don't bother reporting the empty string (source of subtle bugs)
109 self.collect_incoming_data (self.ac_in_buffer[:index])
110 specific_terminator = terminator[termidx.index(index)]
111 terminator_len = len(specific_terminator)
112 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
113 # This does the Right Thing if the terminator is changed here.
114 self.found_terminator()
116 # check for a prefix of the terminator
117 termidx = tuple(map(lambda a: asynchat.find_prefix_at_end (self.ac_in_buffer, a), terminator))
121 # we found a prefix, collect up to the prefix
122 self.collect_incoming_data (self.ac_in_buffer[:-index])
123 self.ac_in_buffer = self.ac_in_buffer[-index:]
126 # no prefix, collect it all
127 self.collect_incoming_data (self.ac_in_buffer)
128 self.ac_in_buffer = b''
130 def push(self, data):
131 if not len(self.wbuf):
132 # Try to send as much as we can immediately
134 bs = self.socket.send(data)
136 # Chances are we'll fail later, but anyway...
142 self.server.register_socket_m(self.fd, EPOLL_READ | EPOLL_WRITE)
144 def handle_timeout(self):
147 def handle_write(self):
148 if self.wbuf is None:
149 # Socket was just closed by remote peer
151 bs = self.socket.send(self.wbuf)
152 self.wbuf = self.wbuf[bs:]
153 if not len(self.wbuf):
157 self.server.register_socket_m(self.fd, EPOLL_READ)
167 del self.server.connections[id(self)]
170 self.server.unregister_socket(self.fd)
171 self.changeTask(None)
177 self.ac_in_buffer = b''
179 def changeTask(self, f, t = None):
180 tryErr(self.server.rmSchedule, self._Task, IgnoredExceptions=KeyError)
182 self._Task = self.server.schedule(f, t, errHandler=self)
186 def __init__(self, server, sock, addr):
187 self.ac_in_buffer = b''
195 self.fd = sock.fileno()
196 server.register_socket(self.fd, self)
197 server.connections[id(self)] = self
198 self.changeTask(self.handle_timeout, time() + 15)
201 def _register(cls, scls):
203 if a == 'final_init':
204 f = lambda self, x=getattr(cls, a), y=getattr(scls, a): (x(self), y(self))
209 setattr(cls, a, getattr(scls, a))
211 class NetworkListener:
212 logger = logging.getLogger('SocketListener')
214 def __init__(self, server, server_address, address_family = socket.AF_INET6):
216 self.server_address = server_address
217 self.address_family = address_family
218 tryErr(self.setup_socket, server_address, Logger=self.logger, ErrorMsg=server_address)
220 def _makebind_py(self, server_address):
221 sock = socket.socket(self.address_family, socket.SOCK_STREAM)
224 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
227 sock.bind(server_address)
230 def _makebind_su(self, server_address):
231 if self.address_family != socket.AF_INET6:
232 raise NotImplementedError
234 from bindservice import bindservice
235 (node, service) = server_address
236 if not node: node = ''
237 if not service: service = ''
238 fd = bindservice(str(node), str(service))
239 sock = socket.fromfd(fd, socket.AF_INET6, socket.SOCK_STREAM)
243 def _makebind(self, *a, **ka):
245 return self._makebind_py(*a, **ka)
246 except BaseException as e:
248 return self._makebind_su(*a, **ka)
253 def setup_socket(self, server_address):
254 sock = self._makebind(server_address)
256 self.server.register_socket(sock.fileno(), self)
259 def handle_read(self):
261 conn, addr = self.socket.accept()
265 conn.setblocking(False)
266 h = server.RequestHandlerClass(server, conn, addr)
268 def handle_error(self):
269 # Ignore errors... like socket closing on the queue
273 def __init__(self, server, fd):
276 self.logger = logging.getLogger('Waker for %s' % (server.__class__.__name__,))
278 def handle_read(self):
279 data = os.read(self.fd, 1)
281 self.logger.error('Got EOF on socket')
282 self.logger.debug('Read wakeup')
284 class AsyncSocketServer:
285 logger = logging.getLogger('SocketServer')
290 def __init__(self, RequestHandlerClass):
291 if not hasattr(self, 'ServerName'):
292 self.ServerName = 'Eloipool'
294 self.RequestHandlerClass = RequestHandlerClass
297 self.keepgoing = True
298 self.rejecting = False
301 self._epoll = select.epoll()
303 self.connections = {}
305 self._sch = ScheduleDict()
308 self._schLock = threading.Lock()
310 self._schLock = WithNoop
312 self.TrustedForwarders = ()
317 self.register_socket(r, o)
320 def register_socket(self, fd, o, eventmask = EPOLL_READ):
321 self._epoll.register(fd, eventmask)
324 def register_socket_m(self, fd, eventmask):
326 self._epoll.modify(fd, eventmask)
330 def unregister_socket(self, fd):
333 self._epoll.unregister(fd)
337 def schedule(self, task, startTime, errHandler=None):
339 self._sch[task] = startTime
341 self._schEH[id(task)] = errHandler
344 def rmSchedule(self, task):
351 def pre_schedule(self):
356 raise NotImplementedError('Class `%s\' did not enable waker' % (self.__class__.__name__))
357 os.write(self.waker, b'\1') # to break out of the epoll
359 def final_init(self):
363 conns = tuple(self.connections.values())
365 tryErr(lambda: c.boot())
367 def serve_forever(self):
370 while self.keepgoing:
371 self.doing = 'pre-schedule'
373 self.doing = 'schedule'
378 if not len(self._sch):
381 timeNext = self._sch.nextTime()
382 if timeNow < timeNext:
383 timeout = timeNext - timeNow
385 f = self._sch.shift()
394 if EH: tryErr(EH.handle_error)
396 self.logger.error(traceback.format_exc())
397 if EH: tryErr(EH.handle_close)
400 if self.lastidle < timeNow - 1:
402 elif timeout < 0 or timeout > 1:
407 events = self._epoll.poll(timeout=timeout)
408 except (IOError, select.error):
411 self.logger.error(traceback.format_exc())
413 self.doing = 'events'
415 self.lastidle = time()
416 for (fd, e) in events:
425 tryErr(o.handle_error)
427 self.logger.error(traceback.format_exc())
428 tryErr(o.handle_error)