Longpoll tracking by username
[bitcoin:eloipool.git] / jsonrpcserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import httpserver
18 import json
19 import logging
20 import networkserver
21 import socket
22 from time import time
23 import traceback
24
25 WithinLongpoll = httpserver.AsyncRequest
26
27 class _SentJSONError(BaseException):
28         def __init__(self, rv):
29                 self.rv = rv
30
31 class JSONRPCHandler(httpserver.HTTPHandler):
32         default_quirks = {
33                 'NELH': None,  # FIXME: identify which clients have a problem with this
34         }
35         
36         LPHeaders = {
37                 'X-Long-Polling': None,
38         }
39         
40         JSONRPCURIs = (b'/', b'/LP', b'/LP/')
41         
42         logger = logging.getLogger('JSONRPCHandler')
43         
44         def sendReply(self, status=200, body=b'', headers=None):
45                 headers = dict(headers) if headers else {}
46                 if body and body[0] == 123:  # b'{'
47                         headers.setdefault('Content-Type', 'application/json')
48                 if status == 200 and self.path in self.JSONRPCURIs:
49                         if not body:
50                                 headers.setdefault('Content-Type', 'application/json')
51                         headers.setdefault('X-Long-Polling', '/LP')
52                         if self.JSONRPCMethod == 'getwork':
53                                 # FIXME: Move this to jsonrpc_getwork somehow
54                                 headers.setdefault('X-Roll-NTime', 'expire=120')
55                 return super().sendReply(status, body, headers)
56         
57         def fmtError(self, reason = '', code = 100):
58                 reason = json.dumps(reason)
59                 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
60                 reason = reason.encode('utf8')
61                 return reason
62         
63         def doError(self, reason = '', code = 100):
64                 reason = self.fmtError(reason, code)
65                 return self.sendReply(500, reason)
66         
67         def checkAuthentication(self, un, pw):
68                 return bool(un)
69         
70         _MidstateNotAdv = (b'phoenix', b'poclbm', b'gminor')
71         def doHeader_user_agent(self, value):
72                 self.reqinfo['UA'] = value
73                 quirks = self.quirks
74                 (UA, v, *x) = value.split(b'/', 1) + [None]
75                 
76                 # Temporary HACK to keep working with older gmp-proxy
77                 # NOTE: This will go away someday.
78                 if UA == b'AuthServiceProxy':
79                         # SubmitBlock Boolean
80                         quirks['SBB'] = None
81                 
82                 try:
83                         if v[0] == b'v': v = v[1:]
84                         v = tuple(map(int, v.split(b'.'))) + (0,0,0)
85                 except:
86                         pass
87                 if UA in self._MidstateNotAdv:
88                         if UA == b'phoenix':
89                                 if v != (1, 50, 0):
90                                         quirks['midstate'] = None
91                                 if v[0] < 2 and v[1] < 8 and v[2] < 1:
92                                         quirks['NELH'] = None
93                         else:
94                                 quirks['midstate'] = None
95         
96         def doHeader_x_minimum_wait(self, value):
97                 self.reqinfo['MinWait'] = int(value)
98         
99         def doHeader_x_mining_extensions(self, value):
100                 self.extensions = value.decode('ascii').lower().split(' ')
101         
102         def processLP(self, lpid):
103                 lpw = self.server.LPId
104                 if isinstance(lpid, str):
105                         if lpw != lpid:
106                                 return
107                 self.doLongpoll()
108         
109         def doLongpoll(self, *a):
110                 timeNow = time()
111                 
112                 self._LP = True
113                 self._LPCall = a
114                 if 'NELH' not in self.quirks:
115                         # [NOT No] Early Longpoll Headers
116                         self.sendReply(200, body=None, headers=self.LPHeaders)
117                         self.push(b"1\r\n{\r\n")
118                         self.changeTask(self._chunkedKA, timeNow + 45)
119                 else:
120                         self.changeTask(None)
121                 
122                 waitTime = self.reqinfo.get('MinWait', 15)  # TODO: make default configurable
123                 self.waitTime = waitTime + timeNow
124                 
125                 totfromme = self.LPTrack()
126                 self.server._LPClients[id(self)] = self
127                 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.remoteHost))
128                 
129                 raise WithinLongpoll
130         
131         def _chunkedKA(self):
132                 # Keepalive via chunked transfer encoding
133                 self.push(b"1\r\n \r\n")
134                 self.changeTask(self._chunkedKA, time() + 45)
135         
136         def LPTrack(self):
137                 myip = self.remoteHost
138                 if myip not in self.server.LPTracking:
139                         self.server.LPTracking[myip] = 0
140                 self.server.LPTracking[myip] += 1
141                 
142                 myuser = self.Username
143                 if myuser not in self.server.LPTrackingByUser:
144                         self.server.LPTrackingByUser[myuser] = 0
145                 self.server.LPTrackingByUser[myuser] += 1
146                 
147                 return self.server.LPTracking[myip]
148         
149         def LPUntrack(self):
150                 self.server.LPTracking[self.remoteHost] -= 1
151                 self.server.LPTrackingByUser[self.Username] -= 1
152         
153         def cleanupLP(self):
154                 # Called when the connection is closed
155                 if not self._LP:
156                         return
157                 self.changeTask(None)
158                 try:
159                         del self.server._LPClients[id(self)]
160                 except KeyError:
161                         pass
162                 self.LPUntrack()
163         
164         def wakeLongpoll(self):
165                 now = time()
166                 if now < self.waitTime:
167                         self.changeTask(self.wakeLongpoll, self.waitTime)
168                         return
169                 else:
170                         self.changeTask(None)
171                 
172                 self.LPUntrack()
173                 
174                 rv = self._doJSON_i(*self._LPCall, longpoll=True)
175                 if 'NELH' not in self.quirks:
176                         rv = rv[1:]  # strip the '{' we already sent
177                         self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
178                         self.reset_request()
179                         return
180                 
181                 try:
182                         self.sendReply(200, body=rv, headers=self.LPHeaders)
183                         raise httpserver.RequestNotHandled
184                 except httpserver.RequestHandled:
185                         # Expected
186                         pass
187                 finally:
188                         self.reset_request()
189         
190         def _doJSON_i(self, reqid, method, params, longpoll = False):
191                 try:
192                         rv = getattr(self, method)(*params)
193                 except WithinLongpoll:
194                         self._LPCall = (reqid, method, params)
195                         raise
196                 except Exception as e:
197                         self.logger.error(("Error during JSON-RPC call: %s%s\n" % (method, params)) + traceback.format_exc())
198                         efun = self.fmtError if longpoll else self.doError
199                         return efun(r'Service error: %s' % (e,))
200                 try:
201                         rv.setdefault('submitold', True)
202                 except:
203                         pass
204                 rv = {'id': reqid, 'error': None, 'result': rv}
205                 try:
206                         rv = json.dumps(rv)
207                 except:
208                         efun = self.fmtError if longpoll else self.doError
209                         return efun(r'Error encoding reply in JSON')
210                 rv = rv.encode('utf8')
211                 return rv if longpoll else self.sendReply(200, rv, headers=self._JSONHeaders)
212         
213         def doJSON(self, data, longpoll = False):
214                 # TODO: handle JSON errors
215                 data = data.decode('utf8')
216                 if longpoll and not data:
217                         self.JSONRPCId = jsonid = 1
218                         self.JSONRPCMethod = 'getwork'
219                         self._JSONHeaders = {}
220                         return self.doLongpoll(1, 'doJSON_getwork', ())
221                 try:
222                         data = json.loads(data)
223                         method = str(data['method']).lower()
224                         self.JSONRPCId = jsonid = data['id']
225                         self.JSONRPCMethod = method
226                         method = 'doJSON_' + method
227                 except ValueError:
228                         return self.doError(r'Parse error')
229                 except TypeError:
230                         return self.doError(r'Bad call')
231                 if not hasattr(self, method):
232                         return self.doError(r'Procedure not found')
233                 # TODO: handle errors as JSON-RPC
234                 self._JSONHeaders = {}
235                 params = data.setdefault('params', ())
236                 procfun = self._doJSON_i
237                 if longpoll and not params:
238                         procfun = self.doLongpoll
239                 return procfun(jsonid, method, params)
240         
241         def handle_close(self):
242                 self.cleanupLP()
243                 super().handle_close()
244         
245         def handle_request(self):
246                 if not self.method in (b'GET', b'POST'):
247                         return self.sendReply(405)
248                 if not self.path in self.JSONRPCURIs:
249                         if isinstance(self.path, bytes) and self.path[:5] == b'/src/':
250                                 return self.handle_src_request()
251                         return self.sendReply(404)
252                 if not self.Username:
253                         return self.doAuthenticate()
254                 try:
255                         data = b''.join(self.incoming)
256                         return self.doJSON(data, self.path[:3] == b'/LP')
257                 except socket.error:
258                         raise
259                 except WithinLongpoll:
260                         raise
261                 except httpserver.RequestHandled:
262                         raise
263                 except:
264                         self.logger.error(traceback.format_exc())
265                         return self.doError('uncaught error')
266         
267         def reset_request(self):
268                 self._LP = False
269                 self.JSONRPCMethod = None
270                 super().reset_request()
271         
272 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
273 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
274 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
275
276 JSONRPCListener = networkserver.NetworkListener
277
278 class JSONRPCServer(networkserver.AsyncSocketServer):
279         logger = logging.getLogger('JSONRPCServer')
280         
281         waker = True
282         
283         def __init__(self, *a, **ka):
284                 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
285                 super().__init__(*a, **ka)
286                 
287                 self.SecretUser = None
288                 
289                 self._LPId = 0
290                 self.LPId = '%d' % (time(),)
291                 self.LPRequest = False
292                 self._LPClients = {}
293                 self._LPWaitTime = time() + 15
294                 
295                 self.LPTracking = {}
296                 self.LPTrackingByUser = {}
297         
298         def pre_schedule(self):
299                 if self.LPRequest == 1:
300                         self._LPsch()
301         
302         def wakeLongpoll(self):
303                 if self.LPRequest:
304                         self.logger.info('Ignoring longpoll attempt while another is waiting')
305                         return
306                 self.LPRequest = 1
307                 self._LPId += 1
308                 self.LPId = '%d %d' % (time(), self._LPId)
309                 self.wakeup()
310         
311         def _LPsch(self):
312                 now = time()
313                 if self._LPWaitTime > now:
314                         delay = self._LPWaitTime - now
315                         self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
316                         self.schedule(self._actualLP, self._LPWaitTime)
317                         self.LPRequest = 2
318                 else:
319                         self._actualLP()
320         
321         def _actualLP(self):
322                 self.LPRequest = False
323                 C = tuple(self._LPClients.values())
324                 self._LPClients = {}
325                 if not C:
326                         self.logger.info('Nobody to longpoll')
327                         return
328                 OC = len(C)
329                 self.logger.debug("%d clients to wake up..." % (OC,))
330                 
331                 now = time()
332                 
333                 for ic in C:
334                         try:
335                                 ic.wakeLongpoll()
336                         except socket.error:
337                                 OC -= 1
338                                 # Ignore socket errors; let the main event loop take care of them later
339                         except:
340                                 OC -= 1
341                                 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
342                 
343                 self._LPWaitTime = time()
344                 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
345                 self._LPWaitTime += 5  # TODO: make configurable: minimum time between longpolls
346         
347         def TopLPers(self, n = 0x10):
348                 tmp = list(self.LPTracking.keys())
349                 tmp.sort(key=lambda k: self.LPTracking[k])
350                 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):
351                         print(jerk)
352         
353         def TopLPersByUser(self, n = 0x10):
354                 tmp = list(self.LPTrackingByUser.keys())
355                 tmp.sort(key=lambda k: self.LPTrackingByUser[k])
356                 for jerk in map(lambda k: (k, self.LPTrackingByUser[k]), tmp[-n:]):
357                         print(jerk)