1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from binascii import b2a_hex
26 WithinLongpoll = httpserver.AsyncRequest
28 class _SentJSONError(BaseException):
29 def __init__(self, rv):
32 class JSONRPCHandler(httpserver.HTTPHandler):
34 'NELH': None, # FIXME: identify which clients have a problem with this
38 'X-Long-Polling': None,
41 JSONRPCURIs = (b'/', b'/LP', b'/LP/')
43 logger = logging.getLogger('JSONRPCHandler')
45 def final_init(server):
48 def __init__(self, *a, **ka):
49 super().__init__(*a, **ka)
52 def sendReply(self, status=200, body=b'', headers=None, *a, **ka):
53 headers = dict(headers) if headers else {}
54 if body and body[0] == 123: # b'{'
55 headers.setdefault('Content-Type', 'application/json')
56 if status == 200 and self.path in self.JSONRPCURIs:
58 headers.setdefault('Content-Type', 'application/json')
59 headers.setdefault('X-Long-Polling', '/LP')
60 return super().sendReply(status, body, headers, *a, **ka)
62 def fmtError(self, reason = '', code = 100):
63 reason = json.dumps(reason)
64 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
65 reason = reason.encode('utf8')
68 def doError(self, reason = '', code = 100):
69 reason = self.fmtError(reason, code)
70 return self.sendReply(500, reason)
72 def checkAuthentication(self, un, pw):
73 return self.server.checkAuthentication(un.decode('utf8'), pw.decode('utf8'))
75 _MidstateNotAdv = (b'phoenix', b'poclbm', b'gminor')
76 def doHeader_user_agent(self, value):
77 self.reqinfo['UA'] = value
78 self.UA = value.decode('latin-1') # technically ASCII, but latin-1 ignores errors
80 (UA, v, *x) = value.split(b'/', 1) + [None]
82 # Temporary HACK to keep working with older gmp-proxy
83 # NOTE: This will go away someday.
84 if UA == b'AuthServiceProxy':
89 if v[0] == b'v': v = v[1:]
90 v = tuple(map(int, v.split(b'.'))) + (0,0,0)
93 if UA in self._MidstateNotAdv:
96 quirks['midstate'] = None
97 if v[0] < 2 and v[1] < 8 and v[2] < 1:
100 quirks['midstate'] = None
102 def doHeader_x_minimum_wait(self, value):
103 self.reqinfo['MinWait'] = int(value)
105 def doHeader_x_mining_extensions(self, value):
106 self.extensions = value.decode('ascii').lower().split(' ')
108 def processLP(self, lpid):
109 lpw = self.server.LPId
110 if isinstance(lpid, str):
115 def doLongpoll(self, *a):
120 if 'NELH' not in self.quirks:
121 # [NOT No] Early Longpoll Headers
122 self.sendReply(200, body=None, headers=self.LPHeaders)
123 self.push(b"1\r\n{\r\n")
124 self.changeTask(self._chunkedKA, timeNow + 45)
126 self.changeTask(None)
128 waitTime = self.reqinfo.get('MinWait', 15) # TODO: make default configurable
129 self.waitTime = waitTime + timeNow
131 totfromme = self.LPTrack()
132 self.server._LPClients[id(self)] = self
133 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.remoteHost))
137 def _chunkedKA(self):
138 # Keepalive via chunked transfer encoding
139 self.push(b"1\r\n \r\n")
140 self.changeTask(self._chunkedKA, time() + 45)
143 myip = self.remoteHost
144 if myip not in self.server.LPTracking:
145 self.server.LPTracking[myip] = 0
146 self.server.LPTracking[myip] += 1
148 myuser = self.Username
149 if myuser not in self.server.LPTrackingByUser:
150 self.server.LPTrackingByUser[myuser] = 0
151 self.server.LPTrackingByUser[myuser] += 1
153 return self.server.LPTracking[myip]
156 self.server.LPTracking[self.remoteHost] -= 1
157 self.server.LPTrackingByUser[self.Username] -= 1
160 # Called when the connection is closed
163 self.changeTask(None)
165 del self.server._LPClients[id(self)]
170 def wakeLongpoll(self, wantClear = False):
172 if now < self.waitTime:
173 self.changeTask(lambda: self.wakeLongpoll(wantClear), self.waitTime)
176 self.changeTask(None)
180 self.server.tls.wantClear = wantClear
182 rv = self._doJSON_i(*self._LPCall, longpoll=True)
183 except WithinLongpoll:
184 # Not sure why this would happen right now, but handle it sanely...
187 self.server.tls.wantClear = False
188 if 'NELH' not in self.quirks:
189 rv = rv[1:] # strip the '{' we already sent
190 self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
195 self.sendReply(200, body=rv, headers=self.LPHeaders, tryCompression=False)
196 raise httpserver.RequestNotHandled
197 except httpserver.RequestHandled:
203 def _doJSON_i(self, reqid, method, params, longpoll = False):
205 rv = getattr(self, method)(*params)
206 except WithinLongpoll:
207 self._LPCall = (reqid, method, params)
209 except Exception as e:
210 self.logger.error(("Error during JSON-RPC call (UA=%s, IP=%s): %s%s\n" % (self.reqinfo.get('UA'), self.remoteHost, method, params)) + traceback.format_exc())
211 efun = self.fmtError if longpoll else self.doError
212 return efun(r'Service error: %s' % (e,))
213 rv = {'id': reqid, 'error': None, 'result': rv}
217 efun = self.fmtError if longpoll else self.doError
218 return efun(r'Error encoding reply in JSON')
219 rv = rv.encode('utf8')
220 return rv if longpoll else self.sendReply(200, rv, headers=self._JSONHeaders)
222 def doJSON(self, data, longpoll = False):
223 # TODO: handle JSON errors
225 data = data.decode('utf8')
226 except UnicodeDecodeError as e:
227 return self.doError(str(e))
228 if longpoll and not data:
229 self.JSONRPCId = jsonid = 1
230 self.JSONRPCMethod = 'getwork'
231 self._JSONHeaders = {}
232 return self.doLongpoll(1, 'doJSON_getwork', ())
234 data = json.loads(data)
235 method = str(data['method']).lower()
236 self.JSONRPCId = jsonid = data['id']
237 self.JSONRPCMethod = method
238 method = 'doJSON_' + method
240 return self.doError(r'Parse error')
242 return self.doError(r'Bad call')
243 if not hasattr(self, method):
244 return self.doError(r'Procedure not found')
245 # TODO: handle errors as JSON-RPC
246 self._JSONHeaders = {}
247 params = data.setdefault('params', ())
248 procfun = self._doJSON_i
249 if longpoll and not params:
250 procfun = self.doLongpoll
251 return procfun(jsonid, method, params)
253 def handle_close(self):
255 super().handle_close()
257 def handle_request(self):
258 if not self.method in (b'GET', b'POST'):
259 return self.sendReply(405)
260 if not self.path in self.JSONRPCURIs:
261 if isinstance(self.path, bytes) and self.path[:5] == b'/src/':
262 return self.handle_src_request()
263 return self.sendReply(404)
264 if not self.Username:
265 return self.doAuthenticate()
267 data = b''.join(self.incoming)
268 return self.doJSON(data, self.path[:3] == b'/LP')
271 except WithinLongpoll:
273 except httpserver.RequestHandled:
276 self.logger.error(traceback.format_exc())
277 return self.doError('uncaught error')
279 def reset_request(self):
281 self.JSONRPCMethod = None
282 super().reset_request()
284 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
285 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
286 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
288 JSONRPCListener = networkserver.NetworkListener
290 class JSONRPCServer(networkserver.AsyncSocketServer):
291 logger = logging.getLogger('JSONRPCServer')
295 def __init__(self, *a, **ka):
296 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
297 super().__init__(*a, **ka)
299 self.SecretUser = None
300 self.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
303 self.LPId = '%d' % (time(),)
304 self.LPRequest = False
306 self._LPWaitTime = time() + 15
309 self.LPTrackingByUser = {}
311 def checkAuthentication(self, username, password):
314 def final_init(self):
315 JSONRPCHandler.final_init(self)
317 def pre_schedule(self):
318 if self.LPRequest == 1:
321 def wakeLongpoll(self, wantClear = False):
323 self.logger.info('Ignoring longpoll attempt while another is waiting')
326 self.LPId = '%d %d' % (time(), self._LPId)
327 self._LPWantClear = wantClear
333 if self._LPWaitTime > now:
334 delay = self._LPWaitTime - now
335 self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
336 self.schedule(self._actualLP, self._LPWaitTime)
342 self.LPRequest = False
343 C = tuple(self._LPClients.values())
346 self.logger.info('Nobody to longpoll')
349 self.logger.debug("%d clients to wake up..." % (OC,))
354 self.lastHandler = ic
356 ic.wakeLongpoll(self._LPWantClear)
359 # Ignore socket errors; let the main event loop take care of them later
362 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
364 self._LPWaitTime = time()
365 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
366 self._LPWaitTime += 5 # TODO: make configurable: minimum time between longpolls
368 def TopLPers(self, n = 0x10):
369 tmp = list(self.LPTracking.keys())
370 tmp.sort(key=lambda k: self.LPTracking[k])
371 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):
374 def TopLPersByUser(self, n = 0x10):
375 tmp = list(self.LPTrackingByUser.keys())
376 tmp.sort(key=lambda k: self.LPTrackingByUser[k])
377 for jerk in map(lambda k: (k, self.LPTrackingByUser[k]), tmp[-n:]):