Workaround for bug in Python's math.log function
[bitcoin:eloipool.git] / jsonrpcserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import httpserver
19 import json
20 import logging
21 import networkserver
22 import socket
23 from time import time
24 import traceback
25
26 WithinLongpoll = httpserver.AsyncRequest
27
28 class _SentJSONError(BaseException):
29         def __init__(self, rv):
30                 self.rv = rv
31
32 class JSONRPCHandler(httpserver.HTTPHandler):
33         default_quirks = {
34                 'NELH': None,  # FIXME: identify which clients have a problem with this
35         }
36         
37         LPHeaders = {
38                 'X-Long-Polling': None,
39         }
40         
41         JSONRPCURIs = (b'/', b'/LP', b'/LP/')
42         
43         logger = logging.getLogger('JSONRPCHandler')
44         
45         def final_init(server):
46                 pass
47         
48         def sendReply(self, status=200, body=b'', headers=None):
49                 headers = dict(headers) if headers else {}
50                 if body and body[0] == 123:  # b'{'
51                         headers.setdefault('Content-Type', 'application/json')
52                 if status == 200 and self.path in self.JSONRPCURIs:
53                         if not body:
54                                 headers.setdefault('Content-Type', 'application/json')
55                         headers.setdefault('X-Long-Polling', '/LP')
56                         if self.JSONRPCMethod == 'getwork':
57                                 # FIXME: Move this to jsonrpc_getwork somehow
58                                 headers.setdefault('X-Roll-NTime', 'expire=120')
59                 return super().sendReply(status, body, headers)
60         
61         def fmtError(self, reason = '', code = 100):
62                 reason = json.dumps(reason)
63                 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
64                 reason = reason.encode('utf8')
65                 return reason
66         
67         def doError(self, reason = '', code = 100):
68                 reason = self.fmtError(reason, code)
69                 return self.sendReply(500, reason)
70         
71         def checkAuthentication(self, un, pw):
72                 return bool(un)
73         
74         _MidstateNotAdv = (b'phoenix', b'poclbm', b'gminor')
75         def doHeader_user_agent(self, value):
76                 self.reqinfo['UA'] = value
77                 quirks = self.quirks
78                 (UA, v, *x) = value.split(b'/', 1) + [None]
79                 
80                 # Temporary HACK to keep working with older gmp-proxy
81                 # NOTE: This will go away someday.
82                 if UA == b'AuthServiceProxy':
83                         # SubmitBlock Boolean
84                         quirks['SBB'] = None
85                 
86                 try:
87                         if v[0] == b'v': v = v[1:]
88                         v = tuple(map(int, v.split(b'.'))) + (0,0,0)
89                 except:
90                         pass
91                 if UA in self._MidstateNotAdv:
92                         if UA == b'phoenix':
93                                 if v != (1, 50, 0):
94                                         quirks['midstate'] = None
95                                 if v[0] < 2 and v[1] < 8 and v[2] < 1:
96                                         quirks['NELH'] = None
97                         else:
98                                 quirks['midstate'] = None
99         
100         def doHeader_x_minimum_wait(self, value):
101                 self.reqinfo['MinWait'] = int(value)
102         
103         def doHeader_x_mining_extensions(self, value):
104                 self.extensions = value.decode('ascii').lower().split(' ')
105         
106         def processLP(self, lpid):
107                 lpw = self.server.LPId
108                 if isinstance(lpid, str):
109                         if lpw != lpid:
110                                 return
111                 self.doLongpoll()
112         
113         def doLongpoll(self, *a):
114                 timeNow = time()
115                 
116                 self._LP = True
117                 self._LPCall = a
118                 if 'NELH' not in self.quirks:
119                         # [NOT No] Early Longpoll Headers
120                         self.sendReply(200, body=None, headers=self.LPHeaders)
121                         self.push(b"1\r\n{\r\n")
122                         self.changeTask(self._chunkedKA, timeNow + 45)
123                 else:
124                         self.changeTask(None)
125                 
126                 waitTime = self.reqinfo.get('MinWait', 15)  # TODO: make default configurable
127                 self.waitTime = waitTime + timeNow
128                 
129                 totfromme = self.LPTrack()
130                 self.server._LPClients[id(self)] = self
131                 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.remoteHost))
132                 
133                 raise WithinLongpoll
134         
135         def _chunkedKA(self):
136                 # Keepalive via chunked transfer encoding
137                 self.push(b"1\r\n \r\n")
138                 self.changeTask(self._chunkedKA, time() + 45)
139         
140         def LPTrack(self):
141                 myip = self.remoteHost
142                 if myip not in self.server.LPTracking:
143                         self.server.LPTracking[myip] = 0
144                 self.server.LPTracking[myip] += 1
145                 
146                 myuser = self.Username
147                 if myuser not in self.server.LPTrackingByUser:
148                         self.server.LPTrackingByUser[myuser] = 0
149                 self.server.LPTrackingByUser[myuser] += 1
150                 
151                 return self.server.LPTracking[myip]
152         
153         def LPUntrack(self):
154                 self.server.LPTracking[self.remoteHost] -= 1
155                 self.server.LPTrackingByUser[self.Username] -= 1
156         
157         def cleanupLP(self):
158                 # Called when the connection is closed
159                 if not self._LP:
160                         return
161                 self.changeTask(None)
162                 try:
163                         del self.server._LPClients[id(self)]
164                 except KeyError:
165                         pass
166                 self.LPUntrack()
167         
168         def wakeLongpoll(self):
169                 now = time()
170                 if now < self.waitTime:
171                         self.changeTask(self.wakeLongpoll, self.waitTime)
172                         return
173                 else:
174                         self.changeTask(None)
175                 
176                 self.LPUntrack()
177                 
178                 rv = self._doJSON_i(*self._LPCall, longpoll=True)
179                 if 'NELH' not in self.quirks:
180                         rv = rv[1:]  # strip the '{' we already sent
181                         self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
182                         self.reset_request()
183                         return
184                 
185                 try:
186                         self.sendReply(200, body=rv, headers=self.LPHeaders)
187                         raise httpserver.RequestNotHandled
188                 except httpserver.RequestHandled:
189                         # Expected
190                         pass
191                 finally:
192                         self.reset_request()
193         
194         def _doJSON_i(self, reqid, method, params, longpoll = False):
195                 try:
196                         rv = getattr(self, method)(*params)
197                 except WithinLongpoll:
198                         self._LPCall = (reqid, method, params)
199                         raise
200                 except Exception as e:
201                         self.logger.error(("Error during JSON-RPC call: %s%s\n" % (method, params)) + traceback.format_exc())
202                         efun = self.fmtError if longpoll else self.doError
203                         return efun(r'Service error: %s' % (e,))
204                 try:
205                         rv.setdefault('submitold', True)
206                 except:
207                         pass
208                 rv = {'id': reqid, 'error': None, 'result': rv}
209                 try:
210                         rv = json.dumps(rv)
211                 except:
212                         efun = self.fmtError if longpoll else self.doError
213                         return efun(r'Error encoding reply in JSON')
214                 rv = rv.encode('utf8')
215                 return rv if longpoll else self.sendReply(200, rv, headers=self._JSONHeaders)
216         
217         def doJSON(self, data, longpoll = False):
218                 # TODO: handle JSON errors
219                 data = data.decode('utf8')
220                 if longpoll and not data:
221                         self.JSONRPCId = jsonid = 1
222                         self.JSONRPCMethod = 'getwork'
223                         self._JSONHeaders = {}
224                         return self.doLongpoll(1, 'doJSON_getwork', ())
225                 try:
226                         data = json.loads(data)
227                         method = str(data['method']).lower()
228                         self.JSONRPCId = jsonid = data['id']
229                         self.JSONRPCMethod = method
230                         method = 'doJSON_' + method
231                 except ValueError:
232                         return self.doError(r'Parse error')
233                 except TypeError:
234                         return self.doError(r'Bad call')
235                 if not hasattr(self, method):
236                         return self.doError(r'Procedure not found')
237                 # TODO: handle errors as JSON-RPC
238                 self._JSONHeaders = {}
239                 params = data.setdefault('params', ())
240                 procfun = self._doJSON_i
241                 if longpoll and not params:
242                         procfun = self.doLongpoll
243                 return procfun(jsonid, method, params)
244         
245         def handle_close(self):
246                 self.cleanupLP()
247                 super().handle_close()
248         
249         def handle_request(self):
250                 if not self.method in (b'GET', b'POST'):
251                         return self.sendReply(405)
252                 if not self.path in self.JSONRPCURIs:
253                         if isinstance(self.path, bytes) and self.path[:5] == b'/src/':
254                                 return self.handle_src_request()
255                         return self.sendReply(404)
256                 if not self.Username:
257                         return self.doAuthenticate()
258                 try:
259                         data = b''.join(self.incoming)
260                         return self.doJSON(data, self.path[:3] == b'/LP')
261                 except socket.error:
262                         raise
263                 except WithinLongpoll:
264                         raise
265                 except httpserver.RequestHandled:
266                         raise
267                 except:
268                         self.logger.error(traceback.format_exc())
269                         return self.doError('uncaught error')
270         
271         def reset_request(self):
272                 self._LP = False
273                 self.JSONRPCMethod = None
274                 super().reset_request()
275         
276 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
277 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
278 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
279
280 JSONRPCListener = networkserver.NetworkListener
281
282 class JSONRPCServer(networkserver.AsyncSocketServer):
283         logger = logging.getLogger('JSONRPCServer')
284         
285         waker = True
286         
287         def __init__(self, *a, **ka):
288                 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
289                 super().__init__(*a, **ka)
290                 
291                 self.SecretUser = None
292                 self.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
293                 
294                 self._LPId = 0
295                 self.LPId = '%d' % (time(),)
296                 self.LPRequest = False
297                 self._LPClients = {}
298                 self._LPWaitTime = time() + 15
299                 
300                 self.LPTracking = {}
301                 self.LPTrackingByUser = {}
302         
303         def final_init(self):
304                 JSONRPCHandler.final_init(self)
305         
306         def pre_schedule(self):
307                 if self.LPRequest == 1:
308                         self._LPsch()
309         
310         def wakeLongpoll(self):
311                 if self.LPRequest:
312                         self.logger.info('Ignoring longpoll attempt while another is waiting')
313                         return
314                 self.LPRequest = 1
315                 self._LPId += 1
316                 self.LPId = '%d %d' % (time(), self._LPId)
317                 self.wakeup()
318         
319         def _LPsch(self):
320                 now = time()
321                 if self._LPWaitTime > now:
322                         delay = self._LPWaitTime - now
323                         self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
324                         self.schedule(self._actualLP, self._LPWaitTime)
325                         self.LPRequest = 2
326                 else:
327                         self._actualLP()
328         
329         def _actualLP(self):
330                 self.LPRequest = False
331                 C = tuple(self._LPClients.values())
332                 self._LPClients = {}
333                 if not C:
334                         self.logger.info('Nobody to longpoll')
335                         return
336                 OC = len(C)
337                 self.logger.debug("%d clients to wake up..." % (OC,))
338                 
339                 now = time()
340                 
341                 for ic in C:
342                         try:
343                                 ic.wakeLongpoll()
344                         except socket.error:
345                                 OC -= 1
346                                 # Ignore socket errors; let the main event loop take care of them later
347                         except:
348                                 OC -= 1
349                                 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
350                 
351                 self._LPWaitTime = time()
352                 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
353                 self._LPWaitTime += 5  # TODO: make configurable: minimum time between longpolls
354         
355         def TopLPers(self, n = 0x10):
356                 tmp = list(self.LPTracking.keys())
357                 tmp.sort(key=lambda k: self.LPTracking[k])
358                 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):
359                         print(jerk)
360         
361         def TopLPersByUser(self, n = 0x10):
362                 tmp = list(self.LPTrackingByUser.keys())
363                 tmp.sort(key=lambda k: self.LPTrackingByUser[k])
364                 for jerk in map(lambda k: (k, self.LPTrackingByUser[k]), tmp[-n:]):
365                         print(jerk)