1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from base64 import b64decode
19 from binascii import a2b_hex, b2a_hex
20 from copy import deepcopy
21 from datetime import datetime
22 from email.utils import formatdate
27 assert midstate.SHA256(b'This is just a test, ignore it. I am making it over 64-bytes long.')[:8] == (0x755f1a94, 0x999b270c, 0xf358c014, 0xfd39caeb, 0x0dcc9ebc, 0x4694cd1a, 0x8e95678e, 0x75fac450)
29 logging.getLogger('jsonrpcserver').warning('Error importing \'midstate\' module; work will not provide midstates')
35 from struct import pack
37 from time import mktime, time, sleep
39 from util import RejectedShare, swap32
41 class WithinLongpoll(BaseException):
44 class RequestAlreadyHandled(BaseException):
47 class RequestHandled(RequestAlreadyHandled):
50 class RequestNotHandled(BaseException):
53 # TODO: keepalive/close
54 _CheckForDupesHACK = {}
55 class JSONRPCHandler(networkserver.SocketHandler):
60 405: 'Method Not Allowed',
61 500: 'Internal Server Error',
65 'X-Long-Polling': None,
68 logger = logging.getLogger('JSONRPCHandler')
70 def sendReply(self, status=200, body=b'', headers=None):
72 raise RequestAlreadyHandled
73 buf = "HTTP/1.1 %d %s\r\n" % (status, self.HTTPStatus.get(status, 'Eligius'))
74 headers = dict(headers) if headers else {}
75 headers['Date'] = formatdate(timeval=mktime(datetime.now().timetuple()), localtime=False, usegmt=True)
76 headers.setdefault('Server', 'Eloipool')
78 headers.setdefault('Transfer-Encoding', 'chunked')
80 headers['Content-Length'] = len(body)
82 headers.setdefault('Content-Type', 'application/json')
83 headers.setdefault('X-Long-Polling', '/LP')
84 headers.setdefault('X-Roll-NTime', 'expire=120')
85 elif body and body[0] == 123: # b'{'
86 headers.setdefault('Content-Type', 'application/json')
87 for k, v in headers.items():
88 if v is None: continue
89 buf += "%s: %s\r\n" % (k, v)
91 buf = buf.encode('utf8')
100 def doError(self, reason = '', code = 100):
101 reason = json.dumps(reason)
102 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
103 return self.sendReply(500, reason.encode('utf8'))
105 def doHeader_authorization(self, value):
106 value = value.split(b' ')
107 if len(value) != 2 or value[0] != b'Basic':
108 return self.doError('Bad Authorization header')
109 value = b64decode(value[1])
110 value = value.split(b':')[0]
111 self.Username = value.decode('utf8')
113 def doHeader_content_length(self, value):
116 def doHeader_user_agent(self, value):
117 self.reqinfo['UA'] = value
120 if value[:9] == b'phoenix/v':
121 v = tuple(map(int, value[9:].split(b'.')))
122 if v[0] < 2 and v[1] < 8 and v[2] < 1:
123 quirks['NELH'] = None
128 def doHeader_x_minimum_wait(self, value):
129 self.reqinfo['MinWait'] = int(value)
131 def doHeader_x_mining_extensions(self, value):
132 self.extensions = value.decode('ascii').lower().split(' ')
134 def doAuthenticate(self):
135 self.sendReply(401, headers={'WWW-Authenticate': 'Basic realm="Eligius"'})
137 def doLongpoll(self):
141 if 'NELH' not in self.quirks:
142 # [NOT No] Early Longpoll Headers
143 self.sendReply(200, body=None, headers=self.LPHeaders)
144 self.push(b"1\r\n{\r\n")
145 self.changeTask(self._chunkedKA, timeNow + 45)
147 self.changeTask(None)
149 waitTime = self.reqinfo.get('MinWait', 15) # TODO: make default configurable
150 self.waitTime = waitTime + timeNow
152 totfromme = self.LPTrack()
153 self.server._LPClients[id(self)] = self
154 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.addr[0]))
158 def _chunkedKA(self):
159 # Keepalive via chunked transfer encoding
160 self.push(b"1\r\n \r\n")
161 self.changeTask(self._chunkedKA, time() + 45)
165 if myip not in self.server.LPTracking:
166 self.server.LPTracking[myip] = 0
167 self.server.LPTracking[myip] += 1
168 return self.server.LPTracking[myip]
171 self.server.LPTracking[self.addr[0]] -= 1
174 # Called when the connection is closed
177 self.changeTask(None)
179 del self.server._LPClients[id(self)]
184 def wakeLongpoll(self):
186 if now < self.waitTime:
187 self.changeTask(self.wakeLongpoll, self.waitTime)
190 self.changeTask(None)
194 rv = self.doJSON_getwork()
195 rv['submitold'] = True
196 rv = {'id': 1, 'error': None, 'result': rv}
198 rv = rv.encode('utf8')
199 if 'NELH' not in self.quirks:
200 rv = rv[1:] # strip the '{' we already sent
201 self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
206 self.sendReply(200, body=rv, headers=self.LPHeaders)
207 raise RequestNotHandled
208 except RequestHandled:
214 def doJSON(self, data):
215 # TODO: handle JSON errors
216 data = data.decode('utf8')
218 data = json.loads(data)
219 method = 'doJSON_' + str(data['method']).lower()
221 return self.doError(r'Parse error')
223 return self.doError(r'Bad call')
224 if not hasattr(self, method):
225 return self.doError(r'Procedure not found')
226 # TODO: handle errors as JSON-RPC
227 self._JSONHeaders = {}
228 params = data.setdefault('params', ())
230 rv = getattr(self, method)(*tuple(data['params']))
231 except Exception as e:
232 self.logger.error(("Error during JSON-RPC call: %s%s\n" % (method, params)) + traceback.format_exc())
233 return self.doError(r'Service error: %s' % (e,))
234 rv = {'id': data['id'], 'error': None, 'result': rv}
238 return self.doError(r'Error encoding reply in JSON')
239 rv = rv.encode('utf8')
240 return self.sendReply(200, rv, headers=self._JSONHeaders)
242 getwork_rv_template = {
243 'data': '000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000',
244 'target': 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000',
245 'hash1': '00000000000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000010000',
247 def doJSON_getwork(self, data=None):
249 return self.doJSON_submitwork(data)
250 rv = dict(self.getwork_rv_template)
251 hdr = self.server.getBlockHeader(self.Username)
253 # FIXME: this assumption breaks with internal rollntime
254 # NOTE: noncerange needs to set nonce to start value at least
255 global _CheckForDupesHACK
256 uhdr = hdr[:68] + hdr[72:]
257 if uhdr in _CheckForDupesHACK:
258 raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate work'))
259 _CheckForDupesHACK[uhdr] = None
261 data = b2a_hex(swap32(hdr)).decode('utf8') + rv['data']
262 # TODO: endian shuffle etc
264 if midstate and 'midstate' not in self.extensions:
265 h = midstate.SHA256(hdr)[:8]
266 rv['midstate'] = b2a_hex(pack('<LLLLLLLL', *h)).decode('ascii')
269 def doJSON_submitwork(self, datax):
270 data = swap32(a2b_hex(datax))[:80]
274 'username': self.Username,
275 'remoteHost': self.addr[0],
278 self.server.receiveShare(share)
279 except RejectedShare as rej:
280 self._JSONHeaders['X-Reject-Reason'] = str(rej)
284 getmemorypool_rv_template = {
286 'noncerange': '00000000ffffffff',
287 'target': '00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
290 def doJSON_getmemorypool(self, data=None):
292 return self.doJSON_submitblock(data)
294 rv = dict(self.getmemorypool_rv_template)
295 MC = self.server.getBlockTemplate(self.Username)
296 (dummy, merkleTree, cb, prevBlock, bits) = MC
297 rv['previousblockhash'] = b2a_hex(prevBlock[::-1]).decode('ascii')
299 for txn in merkleTree.data[1:]:
300 tl.append(b2a_hex(txn.data).decode('ascii'))
301 rv['transactions'] = tl
304 # FIXME: ensure mintime is always >= real mintime, both here and in share acceptance
305 rv['mintime'] = now - 180
306 rv['maxtime'] = now + 120
307 rv['bits'] = b2a_hex(bits[::-1]).decode('ascii')
308 t = deepcopy(merkleTree.data[0])
311 rv['coinbasetxn'] = b2a_hex(t.data).decode('ascii')
314 def doJSON_submitblock(self, data):
318 'blkdata': data[80:],
319 'username': self.Username,
320 'remoteHost': self.addr[0],
323 self.server.receiveShare(share)
324 except RejectedShare as rej:
325 self._JSONHeaders['X-Reject-Reason'] = str(rej)
329 def doJSON_setworkaux(self, k, hexv = None):
330 if self.Username != self.server.SecretUser:
331 self.doAuthenticate()
334 self.server.aux[k] = a2b_hex(hexv)
336 del self.server.aux[k]
339 def handle_close(self):
341 super().handle_close()
343 def handle_request(self):
344 if not self.Username:
345 return self.doAuthenticate()
346 if not self.method in (b'GET', b'POST'):
347 return self.sendReply(405)
348 if not self.path in (b'/', b'/LP', b'/LP/'):
349 return self.sendReply(404)
351 if self.path[:3] == b'/LP':
352 return self.doLongpoll()
353 data = b''.join(self.incoming)
354 return self.doJSON(data)
357 except WithinLongpoll:
359 except RequestHandled:
362 self.logger.error(traceback.format_exc())
363 return self.doError('uncaught error')
365 def parse_headers(self, hs):
370 hs = re.split(br'\r?\n', hs)
371 data = hs.pop(0).split(b' ')
373 self.method = data[0]
386 data = tuple(map(lambda a: a.strip(), data.split(b':', 1)))
387 method = 'doHeader_' + data[0].decode('ascii').lower()
388 if hasattr(self, method):
390 getattr(self, method)(data[1])
391 except RequestAlreadyHandled:
392 # Ignore multiple errors and such
395 def found_terminator(self):
396 if self.reading_headers:
397 inbuf = b"".join(self.incoming)
399 m = re.match(br'^[\r\n]+', inbuf)
401 inbuf = inbuf[len(m.group(0)):]
405 self.reading_headers = False
406 self.parse_headers(inbuf)
408 self.set_terminator(self.CL)
411 self.set_terminator(None)
413 self.handle_request()
414 raise RequestNotHandled
415 except RequestHandled:
417 except WithinLongpoll:
420 self.logger.error(traceback.format_exc())
422 def handle_error(self):
423 self.logger.debug(traceback.format_exc())
426 get_terminator = asynchat.async_chat.get_terminator
427 set_terminator = asynchat.async_chat.set_terminator
429 def handle_readbuf(self):
430 while self.ac_in_buffer:
431 lb = len(self.ac_in_buffer)
432 terminator = self.get_terminator()
434 # no terminator, collect it all
435 self.collect_incoming_data (self.ac_in_buffer)
436 self.ac_in_buffer = b''
437 elif isinstance(terminator, int):
441 self.collect_incoming_data (self.ac_in_buffer)
442 self.ac_in_buffer = b''
443 self.terminator = self.terminator - lb
445 self.collect_incoming_data (self.ac_in_buffer[:n])
446 self.ac_in_buffer = self.ac_in_buffer[n:]
448 self.found_terminator()
451 # 1) end of buffer matches terminator exactly:
452 # collect data, transition
453 # 2) end of buffer matches some prefix:
454 # collect data to the prefix
455 # 3) end of buffer does not match any prefix:
457 # NOTE: this supports multiple different terminators, but
458 # NOT ones that are prefixes of others...
459 if isinstance(self.ac_in_buffer, type(terminator)):
460 terminator = (terminator,)
461 termidx = tuple(map(self.ac_in_buffer.find, terminator))
463 index = min(x for x in termidx if x >= 0)
467 # we found the terminator
469 # don't bother reporting the empty string (source of subtle bugs)
470 self.collect_incoming_data (self.ac_in_buffer[:index])
471 specific_terminator = terminator[termidx.index(index)]
472 terminator_len = len(specific_terminator)
473 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
474 # This does the Right Thing if the terminator is changed here.
475 self.found_terminator()
477 # check for a prefix of the terminator
478 termidx = tuple(map(lambda a: asynchat.find_prefix_at_end (self.ac_in_buffer, a), terminator))
482 # we found a prefix, collect up to the prefix
483 self.collect_incoming_data (self.ac_in_buffer[:-index])
484 self.ac_in_buffer = self.ac_in_buffer[-index:]
487 # no prefix, collect it all
488 self.collect_incoming_data (self.ac_in_buffer)
489 self.ac_in_buffer = b''
491 def reset_request(self):
492 self.replySent = False
494 self.set_terminator( (b"\n\n", b"\r\n\r\n") )
495 self.reading_headers = True
497 self.changeTask(self.handle_timeout, time() + 15)
499 def collect_incoming_data(self, data):
500 asynchat.async_chat._collect_incoming_data(self, data)
502 def __init__(self, *a, **ka):
503 super().__init__(*a, **ka)
506 setattr(JSONRPCHandler, 'doHeader_content-length', JSONRPCHandler.doHeader_content_length);
507 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
508 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
509 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
511 JSONRPCListener = networkserver.NetworkListener
513 class JSONRPCServer(networkserver.AsyncSocketServer):
514 logger = logging.getLogger('JSONRPCServer')
518 def __init__(self, *a, **ka):
519 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
520 super().__init__(*a, **ka)
522 self.SecretUser = None
524 self.LPRequest = False
526 self._LPWaitTime = time() + 15
530 def pre_schedule(self):
531 if self.LPRequest == 1:
534 def wakeLongpoll(self):
536 self.logger.info('Ignoring longpoll attempt while another is waiting')
543 if self._LPWaitTime > now:
544 delay = self._LPWaitTime - now
545 self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
546 self.schedule(self._actualLP, self._LPWaitTime)
552 self.LPRequest = False
553 C = tuple(self._LPClients.values())
556 self.logger.info('Nobody to longpoll')
559 self.logger.debug("%d clients to wake up..." % (OC,))
566 self._LPWaitTime = time()
567 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
568 self._LPWaitTime += 5 # TODO: make configurable: minimum time between longpolls
570 def TopLPers(self, n = 0x10):
571 tmp = list(self.LPTracking.keys())
572 tmp.sort(key=lambda k: self.LPTracking[k])
573 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):