Merge branch 'bugfix_race_coinbase_2' into bugfix_race_coinbase_3
[bitcoin:eloipool.git] / jsonrpcserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import asynchat
18 from base64 import b64decode
19 from binascii import a2b_hex, b2a_hex
20 from copy import deepcopy
21 from datetime import datetime
22 from email.utils import formatdate
23 import json
24 import logging
25 try:
26         import midstate
27         assert midstate.SHA256(b'This is just a test, ignore it. I am making it over 64-bytes long.')[:8] == (0x755f1a94, 0x999b270c, 0xf358c014, 0xfd39caeb, 0x0dcc9ebc, 0x4694cd1a, 0x8e95678e, 0x75fac450)
28 except:
29         logging.getLogger('jsonrpcserver').warning('Error importing \'midstate\' module; work will not provide midstates')
30         midstate = None
31 import networkserver
32 import os
33 import re
34 import socket
35 from struct import pack
36 import threading
37 from time import mktime, time, sleep
38 import traceback
39 from util import RejectedShare, swap32
40
41 class WithinLongpoll(BaseException):
42         pass
43
44 # TODO: keepalive/close
45 _CheckForDupesHACK = {}
46 class JSONRPCHandler(networkserver.SocketHandler):
47         HTTPStatus = {
48                 200: 'OK',
49                 401: 'Unauthorized',
50                 404: 'Not Found',
51                 405: 'Method Not Allowed',
52                 500: 'Internal Server Error',
53         }
54         
55         LPHeaders = {
56                 'X-Long-Polling': None,
57         }
58         
59         logger = logging.getLogger('JSONRPCHandler')
60         
61         def sendReply(self, status=200, body=b'', headers=None):
62                 buf = "HTTP/1.1 %d %s\r\n" % (status, self.HTTPStatus.get(status, 'Eligius'))
63                 headers = dict(headers) if headers else {}
64                 headers['Date'] = formatdate(timeval=mktime(datetime.now().timetuple()), localtime=False, usegmt=True)
65                 headers.setdefault('Server', 'Eloipool')
66                 if body is None:
67                         headers.setdefault('Transfer-Encoding', 'chunked')
68                         body = b''
69                 else:
70                         headers['Content-Length'] = len(body)
71                 if status == 200:
72                         headers.setdefault('Content-Type', 'application/json')
73                         headers.setdefault('X-Long-Polling', '/LP')
74                         headers.setdefault('X-Roll-NTime', 'expire=120')
75                 elif body and body[0] == 123:  # b'{'
76                         headers.setdefault('Content-Type', 'application/json')
77                 for k, v in headers.items():
78                         if v is None: continue
79                         buf += "%s: %s\r\n" % (k, v)
80                 buf += "\r\n"
81                 buf = buf.encode('utf8')
82                 buf += body
83                 self.push(buf)
84         
85         def doError(self, reason = '', code = 100):
86                 reason = json.dumps(reason)
87                 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
88                 return self.sendReply(500, reason.encode('utf8'))
89         
90         def doHeader_authorization(self, value):
91                 value = value.split(b' ')
92                 if len(value) != 2 or value[0] != b'Basic':
93                         return self.doError('Bad Authorization header')
94                 value = b64decode(value[1])
95                 value = value.split(b':')[0]
96                 self.Username = value.decode('utf8')
97         
98         def doHeader_connection(self, value):
99                 if value == b'close':
100                         self.quirks['close'] = None
101         
102         def doHeader_content_length(self, value):
103                 self.CL = int(value)
104         
105         def doHeader_user_agent(self, value):
106                 self.reqinfo['UA'] = value
107                 quirks = self.quirks
108                 try:
109                         if value[:9] == b'phoenix/v':
110                                 v = tuple(map(int, value[9:].split(b'.')))
111                                 if v[0] < 2 and v[1] < 8 and v[2] < 1:
112                                         quirks['NELH'] = None
113                 except:
114                         pass
115                 self.quirks = quirks
116         
117         def doHeader_x_minimum_wait(self, value):
118                 self.reqinfo['MinWait'] = int(value)
119         
120         def doHeader_x_mining_extensions(self, value):
121                 self.extensions = value.decode('ascii').lower().split(' ')
122         
123         def doAuthenticate(self):
124                 self.sendReply(401, headers={'WWW-Authenticate': 'Basic realm="Eligius"'})
125         
126         def doLongpoll(self):
127                 timeNow = time()
128                 
129                 self._LP = True
130                 if 'NELH' not in self.quirks:
131                         # [NOT No] Early Longpoll Headers
132                         self.sendReply(200, body=None, headers=self.LPHeaders)
133                         self.push(b"1\r\n{\r\n")
134                         self.changeTask(self._chunkedKA, timeNow + 45)
135                 else:
136                         self.changeTask(None)
137                 
138                 waitTime = self.reqinfo.get('MinWait', 15)  # TODO: make default configurable
139                 self.waitTime = waitTime + timeNow
140                 
141                 totfromme = self.LPTrack()
142                 self.server._LPClients[id(self)] = self
143                 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.addr[0]))
144                 
145                 raise WithinLongpoll
146         
147         def _chunkedKA(self):
148                 # Keepalive via chunked transfer encoding
149                 self.push(b"1\r\n \r\n")
150                 self.changeTask(self._chunkedKA, time() + 45)
151         
152         def LPTrack(self):
153                 myip = self.addr[0]
154                 if myip not in self.server.LPTracking:
155                         self.server.LPTracking[myip] = 0
156                 self.server.LPTracking[myip] += 1
157                 return self.server.LPTracking[myip]
158         
159         def LPUntrack(self):
160                 self.server.LPTracking[self.addr[0]] -= 1
161         
162         def cleanupLP(self):
163                 # Called when the connection is closed
164                 if not self._LP:
165                         return
166                 self.changeTask(None)
167                 try:
168                         del self.server._LPClients[id(self)]
169                 except KeyError:
170                         pass
171                 self.LPUntrack()
172         
173         def wakeLongpoll(self):
174                 now = time()
175                 if now < self.waitTime:
176                         self.changeTask(self.wakeLongpoll, self.waitTime)
177                         return
178                 else:
179                         self.changeTask(None)
180                 
181                 self.LPUntrack()
182                 
183                 rv = self.doJSON_getwork()
184                 rv['submitold'] = True
185                 rv = {'id': 1, 'error': None, 'result': rv}
186                 rv = json.dumps(rv)
187                 rv = rv.encode('utf8')
188                 if 'NELH' not in self.quirks:
189                         rv = rv[1:]  # strip the '{' we already sent
190                         self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
191                 else:
192                         self.sendReply(200, body=rv, headers=self.LPHeaders)
193                 
194                 self.reset_request()
195         
196         def doJSON(self, data):
197                 # TODO: handle JSON errors
198                 data = data.decode('utf8')
199                 try:
200                         data = json.loads(data)
201                         method = 'doJSON_' + str(data['method']).lower()
202                 except ValueError:
203                         return self.doError(r'Parse error')
204                 except TypeError:
205                         return self.doError(r'Bad call')
206                 if not hasattr(self, method):
207                         return self.doError(r'Procedure not found')
208                 # TODO: handle errors as JSON-RPC
209                 self._JSONHeaders = {}
210                 params = data.setdefault('params', ())
211                 try:
212                         rv = getattr(self, method)(*tuple(data['params']))
213                 except Exception as e:
214                         self.logger.error(("Error during JSON-RPC call: %s%s\n" % (method, params)) + traceback.format_exc())
215                         return self.doError(r'Service error: %s' % (e,))
216                 if rv is None:
217                         # response was already sent (eg, authentication request)
218                         return
219                 rv = {'id': data['id'], 'error': None, 'result': rv}
220                 try:
221                         rv = json.dumps(rv)
222                 except:
223                         return self.doError(r'Error encoding reply in JSON')
224                 rv = rv.encode('utf8')
225                 return self.sendReply(200, rv, headers=self._JSONHeaders)
226         
227         getwork_rv_template = {
228                 'data': '000000800000000000000000000000000000000000000000000000000000000000000000000000000000000080020000',
229                 'target': 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffff00000000',
230                 'hash1': '00000000000000000000000000000000000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000010000',
231         }
232         def doJSON_getwork(self, data=None):
233                 if not data is None:
234                         return self.doJSON_submitwork(data)
235                 rv = dict(self.getwork_rv_template)
236                 hdr = self.server.getBlockHeader(self.Username)
237                 
238                 # FIXME: this assumption breaks with internal rollntime
239                 # NOTE: noncerange needs to set nonce to start value at least
240                 global _CheckForDupesHACK
241                 uhdr = hdr[:68] + hdr[72:]
242                 if uhdr in _CheckForDupesHACK:
243                         raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate work'))
244                 _CheckForDupesHACK[uhdr] = None
245                 
246                 data = b2a_hex(swap32(hdr)).decode('utf8') + rv['data']
247                 # TODO: endian shuffle etc
248                 rv['data'] = data
249                 if midstate and 'midstate' not in self.extensions:
250                         h = midstate.SHA256(hdr)[:8]
251                         rv['midstate'] = b2a_hex(pack('<LLLLLLLL', *h)).decode('ascii')
252                 return rv
253         
254         def doJSON_submitwork(self, datax):
255                 data = swap32(a2b_hex(datax))[:80]
256                 share = {
257                         'data': data,
258                         '_origdata' : datax,
259                         'username': self.Username,
260                         'remoteHost': self.addr[0],
261                 }
262                 try:
263                         self.server.receiveShare(share)
264                 except RejectedShare as rej:
265                         self._JSONHeaders['X-Reject-Reason'] = str(rej)
266                         return False
267                 return True
268         
269         getmemorypool_rv_template = {
270                 'mutable': [
271                         'coinbase/append',
272                 ],
273                 'noncerange': '00000000ffffffff',
274                 'target': '00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff',
275                 'version': 1,
276         }
277         def doJSON_getmemorypool(self, data=None):
278                 if not data is None:
279                         return self.doJSON_submitblock(data)
280                 
281                 rv = dict(self.getmemorypool_rv_template)
282                 MC = self.server.getBlockTemplate(self.Username)
283                 (dummy, merkleTree, cb, prevBlock, bits) = MC
284                 rv['previousblockhash'] = b2a_hex(prevBlock[::-1]).decode('ascii')
285                 tl = []
286                 for txn in merkleTree.data[1:]:
287                         tl.append(b2a_hex(txn.data).decode('ascii'))
288                 rv['transactions'] = tl
289                 now = int(time())
290                 rv['time'] = now
291                 # FIXME: ensure mintime is always >= real mintime, both here and in share acceptance
292                 rv['mintime'] = now - 180
293                 rv['maxtime'] = now + 120
294                 rv['bits'] = b2a_hex(bits[::-1]).decode('ascii')
295                 t = deepcopy(merkleTree.data[0])
296                 t.setCoinbase(cb)
297                 t.assemble()
298                 rv['coinbasetxn'] = b2a_hex(t.data).decode('ascii')
299                 return rv
300         
301         def doJSON_submitblock(self, data):
302                 data = a2b_hex(data)
303                 share = {
304                         'data': data[:80],
305                         'blkdata': data[80:],
306                         'username': self.Username,
307                         'remoteHost': self.addr[0],
308                 }
309                 try:
310                         self.server.receiveShare(share)
311                 except RejectedShare as rej:
312                         self._JSONHeaders['X-Reject-Reason'] = str(rej)
313                         return False
314                 return True
315         
316         def doJSON_setworkaux(self, k, hexv = None):
317                 if self.Username != self.server.SecretUser:
318                         self.doAuthenticate()
319                         return None
320                 if hexv:
321                         self.server.aux[k] = a2b_hex(hexv)
322                 else:
323                         del self.server.aux[k]
324                 return True
325         
326         def handle_close(self):
327                 self.cleanupLP()
328                 super().handle_close()
329         
330         def handle_request(self):
331                 if not self.Username:
332                         return self.doAuthenticate()
333                 if not self.method in (b'GET', b'POST'):
334                         return self.sendReply(405)
335                 if not self.path in (b'/', b'/LP', b'/LP/'):
336                         return self.sendReply(404)
337                 try:
338                         if self.path[:3] == b'/LP':
339                                 return self.doLongpoll()
340                         data = b''.join(self.incoming)
341                         return self.doJSON(data)
342                 except socket.error:
343                         raise
344                 except WithinLongpoll:
345                         raise
346                 except:
347                         self.logger.error(traceback.format_exc())
348                         return self.doError('uncaught error')
349         
350         def parse_headers(self, hs):
351                 self.CL = None
352                 self.Username = None
353                 self.method = None
354                 self.path = None
355                 hs = re.split(br'\r?\n', hs)
356                 data = hs.pop(0).split(b' ')
357                 try:
358                         self.method = data[0]
359                         self.path = data[1]
360                 except IndexError:
361                         self.close()
362                         return
363                 self.extensions = []
364                 self.reqinfo = {}
365                 self.quirks = {}
366                 if data[2:] != [b'HTTP/1.1']:
367                         self.quirks['close'] = None
368                 self.quirks['NELH'] = None  # FIXME: identify which clients have a problem with this
369                 while True:
370                         try:
371                                 data = hs.pop(0)
372                         except IndexError:
373                                 break
374                         data = tuple(map(lambda a: a.strip(), data.split(b':', 1)))
375                         method = 'doHeader_' + data[0].decode('ascii').lower()
376                         if hasattr(self, method):
377                                 getattr(self, method)(data[1])
378         
379         def found_terminator(self):
380                 if self.reading_headers:
381                         inbuf = b"".join(self.incoming)
382                         self.incoming = []
383                         m = re.match(br'^[\r\n]+', inbuf)
384                         if m:
385                                 inbuf = inbuf[len(m.group(0)):]
386                         if not inbuf:
387                                 return
388                         
389                         self.reading_headers = False
390                         self.parse_headers(inbuf)
391                         if self.CL:
392                                 self.set_terminator(self.CL)
393                                 return
394                 
395                 self.set_terminator(None)
396                 try:
397                         self.handle_request()
398                         self.reset_request()
399                 except WithinLongpoll:
400                         pass
401         
402         def handle_error(self):
403                 self.logger.debug(traceback.format_exc())
404                 self.handle_close()
405         
406         get_terminator = asynchat.async_chat.get_terminator
407         set_terminator = asynchat.async_chat.set_terminator
408         
409         def handle_readbuf(self):
410                 while self.ac_in_buffer:
411                         lb = len(self.ac_in_buffer)
412                         terminator = self.get_terminator()
413                         if not terminator:
414                                 # no terminator, collect it all
415                                 self.collect_incoming_data (self.ac_in_buffer)
416                                 self.ac_in_buffer = b''
417                         elif isinstance(terminator, int):
418                                 # numeric terminator
419                                 n = terminator
420                                 if lb < n:
421                                         self.collect_incoming_data (self.ac_in_buffer)
422                                         self.ac_in_buffer = b''
423                                         self.terminator = self.terminator - lb
424                                 else:
425                                         self.collect_incoming_data (self.ac_in_buffer[:n])
426                                         self.ac_in_buffer = self.ac_in_buffer[n:]
427                                         self.terminator = 0
428                                         self.found_terminator()
429                         else:
430                                 # 3 cases:
431                                 # 1) end of buffer matches terminator exactly:
432                                 #    collect data, transition
433                                 # 2) end of buffer matches some prefix:
434                                 #    collect data to the prefix
435                                 # 3) end of buffer does not match any prefix:
436                                 #    collect data
437                                 # NOTE: this supports multiple different terminators, but
438                                 #       NOT ones that are prefixes of others...
439                                 if isinstance(self.ac_in_buffer, type(terminator)):
440                                         terminator = (terminator,)
441                                 termidx = tuple(map(self.ac_in_buffer.find, terminator))
442                                 try:
443                                         index = min(x for x in termidx if x >= 0)
444                                 except ValueError:
445                                         index = -1
446                                 if index != -1:
447                                         # we found the terminator
448                                         if index > 0:
449                                                 # don't bother reporting the empty string (source of subtle bugs)
450                                                 self.collect_incoming_data (self.ac_in_buffer[:index])
451                                         specific_terminator = terminator[termidx.index(index)]
452                                         terminator_len = len(specific_terminator)
453                                         self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:]
454                                         # This does the Right Thing if the terminator is changed here.
455                                         self.found_terminator()
456                                 else:
457                                         # check for a prefix of the terminator
458                                         termidx = tuple(map(lambda a: asynchat.find_prefix_at_end (self.ac_in_buffer, a), terminator))
459                                         index = max(termidx)
460                                         if index:
461                                                 if index != lb:
462                                                         # we found a prefix, collect up to the prefix
463                                                         self.collect_incoming_data (self.ac_in_buffer[:-index])
464                                                         self.ac_in_buffer = self.ac_in_buffer[-index:]
465                                                 break
466                                         else:
467                                                 # no prefix, collect it all
468                                                 self.collect_incoming_data (self.ac_in_buffer)
469                                                 self.ac_in_buffer = b''
470         
471         def reset_request(self):
472                 self.incoming = []
473                 self.set_terminator( (b"\n\n", b"\r\n\r\n") )
474                 self.reading_headers = True
475                 self._LP = False
476                 self.changeTask(self.handle_timeout, time() + 150)
477                 if 'close' in self.quirks:
478                         self.close()
479         
480         def collect_incoming_data(self, data):
481                 asynchat.async_chat._collect_incoming_data(self, data)
482         
483         def __init__(self, *a, **ka):
484                 super().__init__(*a, **ka)
485                 self.quirks = {}
486                 self.reset_request()
487         
488 setattr(JSONRPCHandler, 'doHeader_content-length', JSONRPCHandler.doHeader_content_length);
489 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
490 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
491 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
492
493 JSONRPCListener = networkserver.NetworkListener
494
495 class JSONRPCServer(networkserver.AsyncSocketServer):
496         logger = logging.getLogger('JSONRPCServer')
497         
498         waker = True
499         
500         def __init__(self, *a, **ka):
501                 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
502                 super().__init__(*a, **ka)
503                 
504                 self.SecretUser = None
505                 
506                 self.LPRequest = False
507                 self._LPClients = {}
508                 self._LPWaitTime = time() + 15
509                 
510                 self.LPTracking = {}
511         
512         def pre_schedule(self):
513                 if self.LPRequest == 1:
514                         self._LPsch()
515         
516         def wakeLongpoll(self):
517                 if self.LPRequest:
518                         self.logger.info('Ignoring longpoll attempt while another is waiting')
519                         return
520                 self.LPRequest = 1
521                 self.wakeup()
522         
523         def _LPsch(self):
524                 now = time()
525                 if self._LPWaitTime > now:
526                         delay = self._LPWaitTime - now
527                         self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
528                         self.schedule(self._actualLP, self._LPWaitTime)
529                         self.LPRequest = 2
530                 else:
531                         self._actualLP()
532         
533         def _actualLP(self):
534                 self.LPRequest = False
535                 C = tuple(self._LPClients.values())
536                 self._LPClients = {}
537                 if not C:
538                         self.logger.info('Nobody to longpoll')
539                         return
540                 OC = len(C)
541                 self.logger.debug("%d clients to wake up..." % (OC,))
542                 
543                 now = time()
544                 
545                 for ic in C:
546                         try:
547                                 ic.wakeLongpoll()
548                         except socket.error:
549                                 OC -= 1
550                                 # Ignore socket errors; let the main event loop take care of them later
551                         except:
552                                 OC -= 1
553                                 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
554                 
555                 self._LPWaitTime = time()
556                 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
557                 self._LPWaitTime += 5  # TODO: make configurable: minimum time between longpolls
558         
559         def TopLPers(self, n = 0x10):
560                 tmp = list(self.LPTracking.keys())
561                 tmp.sort(key=lambda k: self.LPTracking[k])
562                 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):
563                         print(jerk)