Workaround for bug in Python's math.log function
[bitcoin:eloipool.git] / networkserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import asynchat
18 import logging
19 import os
20 import select
21 import socket
22 from time import time
23 import traceback
24 from util import ScheduleDict, tryErr
25
26 EPOLL_READ = select.EPOLLIN | select.EPOLLPRI | select.EPOLLERR | select.EPOLLHUP
27 EPOLL_WRITE = select.EPOLLOUT
28
29 class SocketHandler:
30         ac_in_buffer_size = 4096
31         ac_out_buffer_size = 4096
32         
33         def handle_close(self):
34                 self.changeTask(None)
35                 self.wbuf = None
36                 self.close()
37         
38         def handle_error(self):
39                 self.logger.debug(traceback.format_exc())
40                 self.handle_close()
41         
42         def handle_read(self):
43                 try:
44                         data = self.recv (self.ac_in_buffer_size)
45                 except socket.error as why:
46                         self.handle_error()
47                         return
48                 
49                 if self.closeme:
50                         # All input is ignored from sockets we have "closed"
51                         return
52                 
53                 if isinstance(data, str) and self.use_encoding:
54                         data = bytes(str, self.encoding)
55                 self.ac_in_buffer = self.ac_in_buffer + data
56                 
57                 self.server.lastReadbuf = self.ac_in_buffer
58                 
59                 self.handle_readbuf()
60         
61         def push(self, data):
62                 if not len(self.wbuf):
63                         # Try to send as much as we can immediately
64                         try:
65                                 bs = self.socket.send(data)
66                         except:
67                                 # Chances are we'll fail later, but anyway...
68                                 bs = 0
69                         data = data[bs:]
70                         if not len(data):
71                                 return
72                 self.wbuf += data
73                 self.server.register_socket_m(self.fd, EPOLL_READ | EPOLL_WRITE)
74         
75         def handle_timeout(self):
76                 self.close()
77         
78         def handle_write(self):
79                 if self.wbuf is None:
80                         # Socket was just closed by remote peer
81                         return
82                 bs = self.socket.send(self.wbuf)
83                 self.wbuf = self.wbuf[bs:]
84                 if not len(self.wbuf):
85                         if self.closeme:
86                                 self.close()
87                                 return
88                         self.server.register_socket_m(self.fd, EPOLL_READ)
89         
90         recv = asynchat.async_chat.recv
91         
92         def close(self):
93                 if self.wbuf:
94                         self.closeme = True
95                         return
96                 if self.fd == -1:
97                         # Already closed
98                         return
99                 self.server.unregister_socket(self.fd)
100                 self.socket.close()
101                 self.fd = -1
102         
103         def changeTask(self, f, t = None):
104                 tryErr(self.server.rmSchedule, self._Task, IgnoredExceptions=KeyError)
105                 if f:
106                         self._Task = self.server.schedule(f, t, errHandler=self)
107         
108         def __init__(self, server, sock, addr):
109                 self.ac_in_buffer = b''
110                 self.wbuf = b''
111                 self.closeme = False
112                 self.server = server
113                 self.socket = sock
114                 self.addr = addr
115                 self._Task = None
116                 self.fd = sock.fileno()
117                 server.register_socket(self.fd, self)
118                 self.changeTask(self.handle_timeout, time() + 15)
119         
120         @classmethod
121         def _register(cls, scls):
122                 for a in dir(scls):
123                         if a == 'final_init':
124                                 f = lambda self, x=getattr(cls, a), y=getattr(scls, a): (x(self), y(self))
125                                 setattr(cls, a, f)
126                                 continue
127                         if a[0] == '_':
128                                 continue
129                         setattr(cls, a, getattr(scls, a))
130
131 class NetworkListener:
132         logger = logging.getLogger('SocketListener')
133         
134         def __init__(self, server, server_address, address_family = socket.AF_INET6):
135                 self.server = server
136                 self.server_address = server_address
137                 self.address_family = address_family
138                 tryErr(self.setup_socket, server_address, Logger=self.logger, ErrorMsg=server_address)
139         
140         def _makebind_py(self, server_address):
141                 sock = socket.socket(self.address_family, socket.SOCK_STREAM)
142                 sock.setblocking(0)
143                 try:
144                         sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
145                 except socket.error:
146                         pass
147                 sock.bind(server_address)
148                 return sock
149         
150         def _makebind_su(self, server_address):
151                 if self.address_family != socket.AF_INET6:
152                         raise NotImplementedError
153                 
154                 from bindservice import bindservice
155                 (node, service) = server_address
156                 if not node: node = ''
157                 if not service: service = ''
158                 fd = bindservice(str(node), str(service))
159                 sock = socket.fromfd(fd, socket.AF_INET6, socket.SOCK_STREAM)
160                 sock.setblocking(0)
161                 return sock
162         
163         def _makebind(self, *a, **ka):
164                 try:
165                         return self._makebind_py(*a, **ka)
166                 except BaseException as e:
167                         try:
168                                 return self._makebind_su(*a, **ka)
169                         except:
170                                 pass
171                         raise
172         
173         def setup_socket(self, server_address):
174                 sock = self._makebind(server_address)
175                 sock.listen(100)
176                 self.server.register_socket(sock.fileno(), self)
177                 self.socket = sock
178         
179         def handle_read(self):
180                 server = self.server
181                 conn, addr = self.socket.accept()
182                 conn.setblocking(False)
183                 h = server.RequestHandlerClass(server, conn, addr)
184         
185         def handle_error(self):
186                 # Ignore errors... like socket closing on the queue
187                 pass
188
189 class _Waker:
190         def __init__(self, server, fd):
191                 self.server = server
192                 self.fd = fd
193                 self.logger = logging.getLogger('Waker for %s' % (server.__class__.__name__,))
194         
195         def handle_read(self):
196                 data = os.read(self.fd, 1)
197                 if not data:
198                         self.logger.error('Got EOF on socket')
199                 self.logger.debug('Read wakeup')
200
201 class AsyncSocketServer:
202         logger = logging.getLogger('SocketServer')
203         
204         waker = False
205         
206         def __init__(self, RequestHandlerClass):
207                 if not hasattr(self, 'ServerName'):
208                         self.ServerName = 'Eloipool'
209                 
210                 self.RequestHandlerClass = RequestHandlerClass
211                 
212                 self.running = False
213                 self.keepgoing = True
214                 
215                 self._epoll = select.epoll()
216                 self._fd = {}
217                 
218                 self._sch = ScheduleDict()
219                 self._schEH = {}
220                 
221                 self.TrustedForwarders = ()
222                 
223                 if self.waker:
224                         (r, w) = os.pipe()
225                         o = _Waker(self, r)
226                         self.register_socket(r, o)
227                         self.waker = w
228         
229         def register_socket(self, fd, o, eventmask = EPOLL_READ):
230                 self._epoll.register(fd, eventmask)
231                 self._fd[fd] = o
232         
233         def register_socket_m(self, fd, eventmask):
234                 try:
235                         self._epoll.modify(fd, eventmask)
236                 except IOError:
237                         raise socket.error
238         
239         def unregister_socket(self, fd):
240                 del self._fd[fd]
241                 try:
242                         self._epoll.unregister(fd)
243                 except IOError:
244                         raise socket.error
245         
246         def schedule(self, task, startTime, errHandler=None):
247                 self._sch[task] = startTime
248                 if errHandler:
249                         self._schEH[id(task)] = errHandler
250                 return task
251         
252         def rmSchedule(self, task):
253                 del self._sch[task]
254                 k = id(task)
255                 if k in self._schEH:
256                         del self._schEH[k]
257         
258         def pre_schedule(self):
259                 pass
260         
261         def wakeup(self):
262                 if not self.waker:
263                         raise NotImplementedError('Class `%s\' did not enable waker' % (self.__class__.__name__))
264                 os.write(self.waker, b'\1')  # to break out of the epoll
265         
266         def final_init(self):
267                 pass
268         
269         def serve_forever(self):
270                 self.running = True
271                 self.final_init()
272                 while self.keepgoing:
273                         self.doing = 'pre-schedule'
274                         self.pre_schedule()
275                         self.doing = 'schedule'
276                         if len(self._sch):
277                                 timeNow = time()
278                                 while True:
279                                         timeNext = self._sch.nextTime()
280                                         if timeNow < timeNext:
281                                                 timeout = timeNext - timeNow
282                                                 break
283                                         f = self._sch.shift()
284                                         k = id(f)
285                                         EH = None
286                                         if k in self._schEH:
287                                                 EH = self._schEH[k]
288                                                 del self._schEH[k]
289                                         try:
290                                                 f()
291                                         except socket.error:
292                                                 if EH: tryErr(EH.handle_error)
293                                         except:
294                                                 self.logger.error(traceback.format_exc())
295                                                 if EH: tryErr(EH.handle_close)
296                                         if not len(self._sch):
297                                                 timeout = -1
298                                                 break
299                         else:
300                                 timeout = -1
301                         
302                         self.doing = 'poll'
303                         try:
304                                 events = self._epoll.poll(timeout=timeout)
305                         except (IOError, select.error):
306                                 continue
307                         except:
308                                 self.logger.error(traceback.format_exc())
309                         self.doing = 'events'
310                         for (fd, e) in events:
311                                 o = self._fd[fd]
312                                 self.lastHandler = o
313                                 try:
314                                         if e & EPOLL_READ:
315                                                 o.handle_read()
316                                         if e & EPOLL_WRITE:
317                                                 o.handle_write()
318                                 except socket.error:
319                                         tryErr(o.handle_error)
320                                 except:
321                                         self.logger.error(traceback.format_exc())
322                                         tryErr(o.handle_error)
323                 self.doing = None
324                 self.running = False