Implemented authentication modules, and added 2 modules: allowall and simplefile
[bitcoin:eloipool.git] / jsonrpcserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import httpserver
19 import json
20 import logging
21 import networkserver
22 import socket
23 from time import time
24 import traceback
25
26 WithinLongpoll = httpserver.AsyncRequest
27
28 class _SentJSONError(BaseException):
29         def __init__(self, rv):
30                 self.rv = rv
31
32 class JSONRPCHandler(httpserver.HTTPHandler):
33         default_quirks = {
34                 'NELH': None,  # FIXME: identify which clients have a problem with this
35         }
36         
37         LPHeaders = {
38                 'X-Long-Polling': None,
39         }
40         
41         JSONRPCURIs = (b'/', b'/LP', b'/LP/')
42         
43         logger = logging.getLogger('JSONRPCHandler')
44         
45         def final_init(server):
46                 pass
47         
48         def __init__(self, *a, **ka):
49                 super().__init__(*a, **ka)
50                 self.UA = None
51         
52         def sendReply(self, status=200, body=b'', headers=None, *a, **ka):
53                 headers = dict(headers) if headers else {}
54                 if body and body[0] == 123:  # b'{'
55                         headers.setdefault('Content-Type', 'application/json')
56                 if status == 200 and self.path in self.JSONRPCURIs:
57                         if not body:
58                                 headers.setdefault('Content-Type', 'application/json')
59                         headers.setdefault('X-Long-Polling', '/LP')
60                 return super().sendReply(status, body, headers, *a, **ka)
61         
62         def fmtError(self, reason = '', code = 100):
63                 reason = json.dumps(reason)
64                 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
65                 reason = reason.encode('utf8')
66                 return reason
67         
68         def doError(self, reason = '', code = 100):
69                 reason = self.fmtError(reason, code)
70                 return self.sendReply(500, reason)
71         
72         def checkAuthentication(self, un, pw):
73                 return self.server.checkAuthentication(un.decode('utf8'), pw.decode('utf8'))
74         
75         _MidstateNotAdv = (b'phoenix', b'poclbm', b'gminor')
76         def doHeader_user_agent(self, value):
77                 self.reqinfo['UA'] = value
78                 self.UA = value.decode('latin-1')  # technically ASCII, but latin-1 ignores errors
79                 quirks = self.quirks
80                 (UA, v, *x) = value.split(b'/', 1) + [None]
81                 
82                 # Temporary HACK to keep working with older gmp-proxy
83                 # NOTE: This will go away someday.
84                 if UA == b'AuthServiceProxy':
85                         # SubmitBlock Boolean
86                         quirks['SBB'] = None
87                 
88                 try:
89                         if v[0] == b'v': v = v[1:]
90                         v = tuple(map(int, v.split(b'.'))) + (0,0,0)
91                 except:
92                         pass
93                 if UA in self._MidstateNotAdv:
94                         if UA == b'phoenix':
95                                 if v != (1, 50, 0):
96                                         quirks['midstate'] = None
97                                 if v[0] < 2 and v[1] < 8 and v[2] < 1:
98                                         quirks['NELH'] = None
99                         else:
100                                 quirks['midstate'] = None
101         
102         def doHeader_x_minimum_wait(self, value):
103                 self.reqinfo['MinWait'] = int(value)
104         
105         def doHeader_x_mining_extensions(self, value):
106                 self.extensions = value.decode('ascii').lower().split(' ')
107         
108         def processLP(self, lpid):
109                 lpw = self.server.LPId
110                 if isinstance(lpid, str):
111                         if lpw != lpid:
112                                 return
113                 self.doLongpoll()
114         
115         def doLongpoll(self, *a):
116                 timeNow = time()
117                 
118                 self._LP = True
119                 self._LPCall = a
120                 if 'NELH' not in self.quirks:
121                         # [NOT No] Early Longpoll Headers
122                         self.sendReply(200, body=None, headers=self.LPHeaders)
123                         self.push(b"1\r\n{\r\n")
124                         self.changeTask(self._chunkedKA, timeNow + 45)
125                 else:
126                         self.changeTask(None)
127                 
128                 waitTime = self.reqinfo.get('MinWait', 15)  # TODO: make default configurable
129                 self.waitTime = waitTime + timeNow
130                 
131                 totfromme = self.LPTrack()
132                 self.server._LPClients[id(self)] = self
133                 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.remoteHost))
134                 
135                 raise WithinLongpoll
136         
137         def _chunkedKA(self):
138                 # Keepalive via chunked transfer encoding
139                 self.push(b"1\r\n \r\n")
140                 self.changeTask(self._chunkedKA, time() + 45)
141         
142         def LPTrack(self):
143                 myip = self.remoteHost
144                 if myip not in self.server.LPTracking:
145                         self.server.LPTracking[myip] = 0
146                 self.server.LPTracking[myip] += 1
147                 
148                 myuser = self.Username
149                 if myuser not in self.server.LPTrackingByUser:
150                         self.server.LPTrackingByUser[myuser] = 0
151                 self.server.LPTrackingByUser[myuser] += 1
152                 
153                 return self.server.LPTracking[myip]
154         
155         def LPUntrack(self):
156                 self.server.LPTracking[self.remoteHost] -= 1
157                 self.server.LPTrackingByUser[self.Username] -= 1
158         
159         def cleanupLP(self):
160                 # Called when the connection is closed
161                 if not self._LP:
162                         return
163                 self.changeTask(None)
164                 try:
165                         del self.server._LPClients[id(self)]
166                 except KeyError:
167                         pass
168                 self.LPUntrack()
169         
170         def wakeLongpoll(self, wantClear = False):
171                 now = time()
172                 if now < self.waitTime:
173                         self.changeTask(lambda: self.wakeLongpoll(wantClear), self.waitTime)
174                         return
175                 else:
176                         self.changeTask(None)
177                 
178                 self.LPUntrack()
179                 
180                 self.server.tls.wantClear = wantClear
181                 try:
182                         rv = self._doJSON_i(*self._LPCall, longpoll=True)
183                 except WithinLongpoll:
184                         # Not sure why this would happen right now, but handle it sanely...
185                         return
186                 finally:
187                         self.server.tls.wantClear = False
188                 if 'NELH' not in self.quirks:
189                         rv = rv[1:]  # strip the '{' we already sent
190                         self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
191                         self.reset_request()
192                         return
193                 
194                 try:
195                         self.sendReply(200, body=rv, headers=self.LPHeaders, tryCompression=False)
196                         raise httpserver.RequestNotHandled
197                 except httpserver.RequestHandled:
198                         # Expected
199                         pass
200                 finally:
201                         self.reset_request()
202         
203         def _doJSON_i(self, reqid, method, params, longpoll = False):
204                 try:
205                         rv = getattr(self, method)(*params)
206                 except WithinLongpoll:
207                         self._LPCall = (reqid, method, params)
208                         raise
209                 except Exception as e:
210                         self.logger.error(("Error during JSON-RPC call (UA=%s, IP=%s): %s%s\n" % (self.reqinfo.get('UA'), self.remoteHost, method, params)) + traceback.format_exc())
211                         efun = self.fmtError if longpoll else self.doError
212                         return efun(r'Service error: %s' % (e,))
213                 rv = {'id': reqid, 'error': None, 'result': rv}
214                 try:
215                         rv = json.dumps(rv)
216                 except:
217                         efun = self.fmtError if longpoll else self.doError
218                         return efun(r'Error encoding reply in JSON')
219                 rv = rv.encode('utf8')
220                 return rv if longpoll else self.sendReply(200, rv, headers=self._JSONHeaders)
221         
222         def doJSON(self, data, longpoll = False):
223                 # TODO: handle JSON errors
224                 try:
225                         data = data.decode('utf8')
226                 except UnicodeDecodeError as e:
227                         return self.doError(str(e))
228                 if longpoll and not data:
229                         self.JSONRPCId = jsonid = 1
230                         self.JSONRPCMethod = 'getwork'
231                         self._JSONHeaders = {}
232                         return self.doLongpoll(1, 'doJSON_getwork', ())
233                 try:
234                         data = json.loads(data)
235                         method = str(data['method']).lower()
236                         self.JSONRPCId = jsonid = data['id']
237                         self.JSONRPCMethod = method
238                         method = 'doJSON_' + method
239                 except ValueError:
240                         return self.doError(r'Parse error')
241                 except TypeError:
242                         return self.doError(r'Bad call')
243                 if not hasattr(self, method):
244                         return self.doError(r'Procedure not found')
245                 # TODO: handle errors as JSON-RPC
246                 self._JSONHeaders = {}
247                 params = data.setdefault('params', ())
248                 procfun = self._doJSON_i
249                 if longpoll and not params:
250                         procfun = self.doLongpoll
251                 return procfun(jsonid, method, params)
252         
253         def handle_close(self):
254                 self.cleanupLP()
255                 super().handle_close()
256         
257         def handle_request(self):
258                 if not self.method in (b'GET', b'POST'):
259                         return self.sendReply(405)
260                 if not self.path in self.JSONRPCURIs:
261                         if isinstance(self.path, bytes) and self.path[:5] == b'/src/':
262                                 return self.handle_src_request()
263                         return self.sendReply(404)
264                 if not self.Username:
265                         return self.doAuthenticate()
266                 try:
267                         data = b''.join(self.incoming)
268                         return self.doJSON(data, self.path[:3] == b'/LP')
269                 except socket.error:
270                         raise
271                 except WithinLongpoll:
272                         raise
273                 except httpserver.RequestHandled:
274                         raise
275                 except:
276                         self.logger.error(traceback.format_exc())
277                         return self.doError('uncaught error')
278         
279         def reset_request(self):
280                 self._LP = False
281                 self.JSONRPCMethod = None
282                 super().reset_request()
283         
284 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
285 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
286 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
287
288 JSONRPCListener = networkserver.NetworkListener
289
290 class JSONRPCServer(networkserver.AsyncSocketServer):
291         logger = logging.getLogger('JSONRPCServer')
292         
293         waker = True
294         
295         def __init__(self, *a, **ka):
296                 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
297                 super().__init__(*a, **ka)
298                 
299                 self.SecretUser = None
300                 self.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
301                 
302                 self._LPId = 0
303                 self.LPId = '%d' % (time(),)
304                 self.LPRequest = False
305                 self._LPClients = {}
306                 self._LPWaitTime = time() + 15
307                 
308                 self.LPTracking = {}
309                 self.LPTrackingByUser = {}
310         
311         def checkAuthentication(self, username, password):
312                 return True
313         
314         def final_init(self):
315                 JSONRPCHandler.final_init(self)
316         
317         def pre_schedule(self):
318                 if self.LPRequest == 1:
319                         self._LPsch()
320         
321         def wakeLongpoll(self, wantClear = False):
322                 if self.LPRequest:
323                         self.logger.info('Ignoring longpoll attempt while another is waiting')
324                         return
325                 self._LPId += 1
326                 self.LPId = '%d %d' % (time(), self._LPId)
327                 self._LPWantClear = wantClear
328                 self.LPRequest = 1
329                 self.wakeup()
330         
331         def _LPsch(self):
332                 now = time()
333                 if self._LPWaitTime > now:
334                         delay = self._LPWaitTime - now
335                         self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
336                         self.schedule(self._actualLP, self._LPWaitTime)
337                         self.LPRequest = 2
338                 else:
339                         self._actualLP()
340         
341         def _actualLP(self):
342                 self.LPRequest = False
343                 C = tuple(self._LPClients.values())
344                 self._LPClients = {}
345                 if not C:
346                         self.logger.info('Nobody to longpoll')
347                         return
348                 OC = len(C)
349                 self.logger.debug("%d clients to wake up..." % (OC,))
350                 
351                 now = time()
352                 
353                 for ic in C:
354                         self.lastHandler = ic
355                         try:
356                                 ic.wakeLongpoll(self._LPWantClear)
357                         except socket.error:
358                                 OC -= 1
359                                 # Ignore socket errors; let the main event loop take care of them later
360                         except:
361                                 OC -= 1
362                                 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
363                 
364                 self._LPWaitTime = time()
365                 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
366                 self._LPWaitTime += 5  # TODO: make configurable: minimum time between longpolls
367         
368         def TopLPers(self, n = 0x10):
369                 tmp = list(self.LPTracking.keys())
370                 tmp.sort(key=lambda k: self.LPTracking[k])
371                 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):
372                         print(jerk)
373         
374         def TopLPersByUser(self, n = 0x10):
375                 tmp = list(self.LPTrackingByUser.keys())
376                 tmp.sort(key=lambda k: self.LPTrackingByUser[k])
377                 for jerk in map(lambda k: (k, self.LPTrackingByUser[k]), tmp[-n:]):
378                         print(jerk)