1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from base64 import b64decode
19 from binascii import a2b_hex, b2a_hex
20 from copy import deepcopy
21 from datetime import datetime
22 from email.utils import formatdate
27 assert midstate.SHA256(b'This is just a test, ignore it. I am making it over 64-bytes long.')[:8] == (0x755f1a94, 0x999b270c, 0xf358c014, 0xfd39caeb, 0x0dcc9ebc, 0x4694cd1a, 0x8e95678e, 0x75fac450)
29 logging.getLogger('jsonrpcserver').warning('Error importing \'midstate\' module; work will not provide midstates')
35 from struct import pack
37 from time import mktime, time, sleep
39 from util import RejectedShare, swap32
41 class WithinLongpoll(BaseException):
44 # TODO: keepalive/close
45 _CheckForDupesHACK = {}
46 class JSONRPCHandler(networkserver.SocketHandler):
51 405: 'Method Not Allowed',
52 500: 'Internal Server Error',
56 'X-Long-Polling': None,
59 logger = logging.getLogger('JSONRPCHandler')
61 def sendReply(self, status=200, body=b'', headers=None):
62 buf = "HTTP/1.1 %d %s\r\n" % (status, self.HTTPStatus.get(status, 'Eligius'))
63 headers = dict(headers) if headers else {}
64 headers['Date'] = formatdate(timeval=mktime(datetime.now().timetuple()), localtime=False, usegmt=True)
65 headers.setdefault('Server', 'Eloipool')
67 headers.setdefault('Transfer-Encoding', 'chunked')
70 headers['Content-Length'] = len(body)
72 headers.setdefault('Content-Type', 'application/json')
73 headers.setdefault('X-Long-Polling', '/LP')
74 headers.setdefault('X-Roll-NTime', 'expire=120')
75 elif body and body[0] == 123: # b'{'
76 headers.setdefault('Content-Type', 'application/json')
77 for k, v in headers.items():
78 if v is None: continue
79 buf += "%s: %s\r\n" % (k, v)
81 buf = buf.encode('utf8')
85 def doError(self, reason = '', code = 100):
86 reason = json.dumps(reason)
87 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
88 return self.sendReply(500, reason.encode('utf8'))
90 def doHeader_authorization(self, value):
91 value = value.split(b' ')
92 if len(value) != 2 or value[0] != b'Basic':
93 return self.doError('Bad Authorization header')
94 value = b64decode(value[1])
95 value = value.split(b':')[0]
96 self.Username = value.decode('utf8')
98 def doHeader_connection(self, value):
100 self.quirks['close'] = None
102 def doHeader_content_length(self, value):
105 def doHeader_user_agent(self, value):
106 self.reqinfo['UA'] = value
109 if value[:9] == b'phoenix/v':
110 v = tuple(map(int, value[9:].split(b'.')))
111 if v[0] < 2 and v[1] < 8 and v[2] < 1:
112 quirks['NELH'] = None
117 def doHeader_x_minimum_wait(self, value):
118 self.reqinfo['MinWait'] = int(value)
120 def doHeader_x_mining_extensions(self, value):
121 self.extensions = value.decode('ascii').lower().split(' ')
123 def doAuthenticate(self):
124 self.sendReply(401, headers={'WWW-Authenticate': 'Basic realm="Eligius"'})
126 def doLongpoll(self):
130 if 'NELH' not in self.quirks:
131 # [NOT No] Early Longpoll Headers
132 self.sendReply(200, body=None, headers=self.LPHeaders)
133 self.push(b"1\r\n{\r\n")
134 self.changeTask(self._chunkedKA, timeNow + 45)
136 self.changeTask(None)
138 waitTime = self.reqinfo.get('MinWait', 15) # TODO: make default configurable
139 self.waitTime = waitTime + timeNow
141 totfromme = self.LPTrack()
142 self.server._LPClients[id(self)] = self
143 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.addr[0]))
147 def _chunkedKA(self):
148 # Keepalive via chunked transfer encoding
149 self.push(b"1\r\n \r\n")
150 self.changeTask(self._chunkedKA, time() + 45)
154 if myip not in self.server.LPTracking:
155 self.server.LPTracking[myip] = 0
156 self.server.LPTracking[myip] += 1
157 return self.server.LPTracking[myip]
160 self.server.LPTracking[self.addr[0]] -= 1
163 # Called when the connection is closed
166 self.changeTask(None)
168 del self.server._LPClients[id(self)]
173 def wakeLongpoll(self):
175 if now < self.waitTime:
176 self.changeTask(self.wakeLongpoll, self.waitTime)
179 self.changeTask(None)
183 rv = self.doJSON_getwork()
184 rv['submitold'] = True
185 rv = {'id': 1, 'error': None, 'result': rv}
187 rv = rv.encode('utf8')
188 if 'NELH' not in self.quirks:
189 rv = rv[1:] # strip the '{' we already sent
190 self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
192 self.sendReply(200, body=rv, headers=self.LPHeaders)
196 def doJSON(self, data):
197 # TODO: handle JSON errors
198 data = data.decode('utf8')
200 data = json.loads(data)
201 method = 'doJSON_' + str(data['method']).lower()
203 return self.doError(r'Parse error')
205 return self.doError(r'Bad call')
206 if not hasattr(self, method):
207 return self.doError(r'Procedure not found')
208 # TODO: handle errors as JSON-RPC
209 self._JSONHeaders = {}
210 params = data.setdefault('params', ())
212 rv = getattr(self, method)(*tuple(data['params']))
213 except Exception as e:
214 self.logger.error(("Error during JSON-RPC call: %s%s\n" % (method, params)) + traceback.format_exc())
215 return self.doError(r'Service error: %s' % (e,))
217 # response was already sent (eg, authentication request)
219 rv = {'id': data['id'], 'error': None, 'result': rv}
223 return self.doError(r'Error encoding reply in JSON')
224 rv = rv.encode('utf8')
225 return self.sendReply(200, rv, headers=self._JSONHeaders)
227 getwork_rv_template = {
228 'data': '000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000',
229 'target': 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000',
230 'hash1': '00000000000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000010000',
232 def doJSON_getwork(self, data=None):
234 return self.doJSON_submitwork(data)
235 rv = dict(self.getwork_rv_template)
236 hdr = self.server.getBlockHeader(self.Username)
238 # FIXME: this assumption breaks with internal rollntime
239 # NOTE: noncerange needs to set nonce to start value at least
240 global _CheckForDupesHACK
241 uhdr = hdr[:68] + hdr[72:]
242 if uhdr in _CheckForDupesHACK:
243 raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate work'))
244 _CheckForDupesHACK[uhdr] = None
246 data = b2a_hex(swap32(hdr)).decode('utf8') + rv['data']
247 # TODO: endian shuffle etc
249 if midstate and 'midstate' not in self.extensions:
250 h = midstate.SHA256(hdr)[:8]
251 rv['midstate'] = b2a_hex(pack('<LLLLLLLL', *h)).decode('ascii')
254 def doJSON_submitwork(self, datax):
255 data = swap32(a2b_hex(datax))[:80]
259 'username': self.Username,
260 'remoteHost': self.addr[0],
263 self.server.receiveShare(share)
264 except RejectedShare as rej:
265 self._JSONHeaders['X-Reject-Reason'] = str(rej)
269 getmemorypool_rv_template = {
273 'noncerange': '00000000ffffffff',
274 'target': '00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
277 def doJSON_getmemorypool(self, data=None):
279 return self.doJSON_submitblock(data)
281 rv = dict(self.getmemorypool_rv_template)
282 MC = self.server.getBlockTemplate(self.Username)
283 (dummy, merkleTree, cb, prevBlock, bits) = MC
284 rv['previousblockhash'] = b2a_hex(prevBlock[::-1]).decode('ascii')
286 for txn in merkleTree.data[1:]:
287 tl.append(b2a_hex(txn.data).decode('ascii'))
288 rv['transactions'] = tl
291 # FIXME: ensure mintime is always >= real mintime, both here and in share acceptance
292 rv['mintime'] = now - 180
293 rv['maxtime'] = now + 120
294 rv['bits'] = b2a_hex(bits[::-1]).decode('ascii')
295 t = deepcopy(merkleTree.data[0])
298 rv['coinbasetxn'] = b2a_hex(t.data).decode('ascii')
301 def doJSON_submitblock(self, data):
305 'blkdata': data[80:],
306 'username': self.Username,
307 'remoteHost': self.addr[0],
310 self.server.receiveShare(share)
311 except RejectedShare as rej:
312 self._JSONHeaders['X-Reject-Reason'] = str(rej)
316 def doJSON_setworkaux(self, k, hexv = None):
317 if self.Username != self.server.SecretUser:
318 self.doAuthenticate()
321 self.server.aux[k] = a2b_hex(hexv)
323 del self.server.aux[k]
326 def handle_close(self):
328 super().handle_close()
330 def handle_request(self):
331 if not self.Username:
332 return self.doAuthenticate()
333 if not self.method in (b'GET', b'POST'):
334 return self.sendReply(405)
335 if not self.path in (b'/', b'/LP', b'/LP/'):
336 return self.sendReply(404)
338 if self.path[:3] == b'/LP':
339 return self.doLongpoll()
340 data = b''.join(self.incoming)
341 return self.doJSON(data)
344 except WithinLongpoll:
347 self.logger.error(traceback.format_exc())
348 return self.doError('uncaught error')
350 def parse_headers(self, hs):
355 hs = re.split(br'\r?\n', hs)
356 data = hs.pop(0).split(b' ')
358 self.method = data[0]
366 if data[2:] != [b'HTTP/1.1']:
367 self.quirks['close'] = None
368 self.quirks['NELH'] = None # FIXME: identify which clients have a problem with this
374 data = tuple(map(lambda a: a.strip(), data.split(b':', 1)))
375 method = 'doHeader_' + data[0].decode('ascii').lower()
376 if hasattr(self, method):
377 getattr(self, method)(data[1])
379 def found_terminator(self):
380 if self.reading_headers:
381 inbuf = b"".join(self.incoming)
383 m = re.match(br'^[\r\n]+', inbuf)
385 inbuf = inbuf[len(m.group(0)):]
389 self.reading_headers = False
390 self.parse_headers(inbuf)
392 self.set_terminator(self.CL)
395 self.set_terminator(None)
397 self.handle_request()
399 except WithinLongpoll:
402 def handle_error(self):
403 self.logger.debug(traceback.format_exc())
406 get_terminator = asynchat.async_chat.get_terminator
407 set_terminator = asynchat.async_chat.set_terminator
409 def handle_readbuf(self):
410 while self.ac_in_buffer:
411 lb = len(self.ac_in_buffer)
412 terminator = self.get_terminator()
414 # no terminator, collect it all
415 self.collect_incoming_data (self.ac_in_buffer)
416 self.ac_in_buffer = b''
417 elif isinstance(terminator, int):
421 self.collect_incoming_data (self.ac_in_buffer)
422 self.ac_in_buffer = b''
423 self.terminator = self.terminator - lb
425 self.collect_incoming_data (self.ac_in_buffer[:n])
426 self.ac_in_buffer = self.ac_in_buffer[n:]
428 self.found_terminator()
431 # 1) end of buffer matches terminator exactly:
432 # collect data, transition
433 # 2) end of buffer matches some prefix:
434 # collect data to the prefix
435 # 3) end of buffer does not match any prefix:
437 # NOTE: this supports multiple different terminators, but
438 # NOT ones that are prefixes of others...
439 if isinstance(self.ac_in_buffer, type(terminator)):
440 terminator = (terminator,)
441 termidx = tuple(map(self.ac_in_buffer.find, terminator))
443 index = min(x for x in termidx if x >= 0)
447 # we found the terminator
449 # don't bother reporting the empty string (source of subtle bugs)
450 self.collect_incoming_data (self.ac_in_buffer[:index])
451 specific_terminator = terminator[termidx.index(index)]
452 terminator_len = len(specific_terminator)
453 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
454 # This does the Right Thing if the terminator is changed here.
455 self.found_terminator()
457 # check for a prefix of the terminator
458 termidx = tuple(map(lambda a: asynchat.find_prefix_at_end (self.ac_in_buffer, a), terminator))
462 # we found a prefix, collect up to the prefix
463 self.collect_incoming_data (self.ac_in_buffer[:-index])
464 self.ac_in_buffer = self.ac_in_buffer[-index:]
467 # no prefix, collect it all
468 self.collect_incoming_data (self.ac_in_buffer)
469 self.ac_in_buffer = b''
471 def reset_request(self):
473 self.set_terminator( (b"\n\n", b"\r\n\r\n") )
474 self.reading_headers = True
476 self.changeTask(self.handle_timeout, time() + 150)
477 if 'close' in self.quirks:
480 def collect_incoming_data(self, data):
481 asynchat.async_chat._collect_incoming_data(self, data)
483 def __init__(self, *a, **ka):
484 super().__init__(*a, **ka)
488 setattr(JSONRPCHandler, 'doHeader_content-length', JSONRPCHandler.doHeader_content_length);
489 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
490 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
491 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
493 JSONRPCListener = networkserver.NetworkListener
495 class JSONRPCServer(networkserver.AsyncSocketServer):
496 logger = logging.getLogger('JSONRPCServer')
500 def __init__(self, *a, **ka):
501 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
502 super().__init__(*a, **ka)
504 self.SecretUser = None
506 self.LPRequest = False
508 self._LPWaitTime = time() + 15
512 def pre_schedule(self):
513 if self.LPRequest == 1:
516 def wakeLongpoll(self):
518 self.logger.info('Ignoring longpoll attempt while another is waiting')
525 if self._LPWaitTime > now:
526 delay = self._LPWaitTime - now
527 self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
528 self.schedule(self._actualLP, self._LPWaitTime)
534 self.LPRequest = False
535 C = tuple(self._LPClients.values())
538 self.logger.info('Nobody to longpoll')
541 self.logger.debug("%d clients to wake up..." % (OC,))
550 # Ignore socket errors; let the main event loop take care of them later
553 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
555 self._LPWaitTime = time()
556 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
557 self._LPWaitTime += 5 # TODO: make configurable: minimum time between longpolls
559 def TopLPers(self, n = 0x10):
560 tmp = list(self.LPTracking.keys())
561 tmp.sort(key=lambda k: self.LPTracking[k])
562 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):