Merge branch 'bugfix_submitblock_rv' into bugfix_submitblock_rv_2
[bitcoin:eloipool.git] / jsonrpcserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import httpserver
18 import json
19 import logging
20 import networkserver
21 import socket
22 from time import time
23 import traceback
24
25 WithinLongpoll = httpserver.AsyncRequest
26
27 class JSONRPCHandler(httpserver.HTTPHandler):
28         default_quirks = {
29                 'NELH': None,  # FIXME: identify which clients have a problem with this
30         }
31         
32         LPHeaders = {
33                 'X-Long-Polling': None,
34         }
35         
36         logger = logging.getLogger('JSONRPCHandler')
37         
38         def sendReply(self, status=200, body=b'', headers=None):
39                 headers = dict(headers) if headers else {}
40                 if status == 200:
41                         headers.setdefault('Content-Type', 'application/json')
42                         headers.setdefault('X-Long-Polling', '/LP')
43                         headers.setdefault('X-Roll-NTime', 'expire=120')
44                 elif body and body[0] == 123:  # b'{'
45                         headers.setdefault('Content-Type', 'application/json')
46                 return super().sendReply(status, body, headers)
47         
48         def doError(self, reason = '', code = 100):
49                 reason = json.dumps(reason)
50                 reason = r'{"result":null,"id":null,"error":{"name":"JSONRPCError","code":%d,"message":%s}}' % (code, reason)
51                 return self.sendReply(500, reason.encode('utf8'))
52         
53         def checkAuthentication(self, un, pw):
54                 return bool(un)
55         
56         def doHeader_user_agent(self, value):
57                 self.reqinfo['UA'] = value
58                 quirks = self.quirks
59                 try:
60                         if value[:9] == b'phoenix/v':
61                                 v = tuple(map(int, value[9:].split(b'.')))
62                                 if v[0] < 2 and v[1] < 8 and v[2] < 1:
63                                         quirks['NELH'] = None
64                 except:
65                         pass
66                 self.quirks = quirks
67         
68         def doHeader_x_minimum_wait(self, value):
69                 self.reqinfo['MinWait'] = int(value)
70         
71         def doHeader_x_mining_extensions(self, value):
72                 self.extensions = value.decode('ascii').lower().split(' ')
73         
74         def doLongpoll(self):
75                 timeNow = time()
76                 
77                 self._LP = True
78                 if 'NELH' not in self.quirks:
79                         # [NOT No] Early Longpoll Headers
80                         self.sendReply(200, body=None, headers=self.LPHeaders)
81                         self.push(b"1\r\n{\r\n")
82                         self.changeTask(self._chunkedKA, timeNow + 45)
83                 else:
84                         self.changeTask(None)
85                 
86                 waitTime = self.reqinfo.get('MinWait', 15)  # TODO: make default configurable
87                 self.waitTime = waitTime + timeNow
88                 
89                 totfromme = self.LPTrack()
90                 self.server._LPClients[id(self)] = self
91                 self.logger.debug("New LP client; %d total; %d from %s" % (len(self.server._LPClients), totfromme, self.addr[0]))
92                 
93                 raise WithinLongpoll
94         
95         def _chunkedKA(self):
96                 # Keepalive via chunked transfer encoding
97                 self.push(b"1\r\n \r\n")
98                 self.changeTask(self._chunkedKA, time() + 45)
99         
100         def LPTrack(self):
101                 myip = self.addr[0]
102                 if myip not in self.server.LPTracking:
103                         self.server.LPTracking[myip] = 0
104                 self.server.LPTracking[myip] += 1
105                 return self.server.LPTracking[myip]
106         
107         def LPUntrack(self):
108                 self.server.LPTracking[self.addr[0]] -= 1
109         
110         def cleanupLP(self):
111                 # Called when the connection is closed
112                 if not self._LP:
113                         return
114                 self.changeTask(None)
115                 try:
116                         del self.server._LPClients[id(self)]
117                 except KeyError:
118                         pass
119                 self.LPUntrack()
120         
121         def wakeLongpoll(self):
122                 now = time()
123                 if now < self.waitTime:
124                         self.changeTask(self.wakeLongpoll, self.waitTime)
125                         return
126                 else:
127                         self.changeTask(None)
128                 
129                 self.LPUntrack()
130                 
131                 rv = self.doJSON_getwork()
132                 rv['submitold'] = True
133                 rv = {'id': 1, 'error': None, 'result': rv}
134                 rv = json.dumps(rv)
135                 rv = rv.encode('utf8')
136                 if 'NELH' not in self.quirks:
137                         rv = rv[1:]  # strip the '{' we already sent
138                         self.push(('%x' % len(rv)).encode('utf8') + b"\r\n" + rv + b"\r\n0\r\n\r\n")
139                 else:
140                         self.sendReply(200, body=rv, headers=self.LPHeaders)
141                 
142                 self.reset_request()
143         
144         def doJSON(self, data):
145                 # TODO: handle JSON errors
146                 data = data.decode('utf8')
147                 try:
148                         data = json.loads(data)
149                         method = 'doJSON_' + str(data['method']).lower()
150                 except ValueError:
151                         return self.doError(r'Parse error')
152                 except TypeError:
153                         return self.doError(r'Bad call')
154                 if not hasattr(self, method):
155                         return self.doError(r'Procedure not found')
156                 # TODO: handle errors as JSON-RPC
157                 self._JSONHeaders = {}
158                 params = data.setdefault('params', ())
159                 try:
160                         rv = getattr(self, method)(*tuple(data['params']))
161                 except Exception as e:
162                         self.logger.error(("Error during JSON-RPC call: %s%s\n" % (method, params)) + traceback.format_exc())
163                         return self.doError(r'Service error: %s' % (e,))
164                 if rv is None:
165                         # response was already sent (eg, authentication request)
166                         return
167                 rv = {'id': data['id'], 'error': None, 'result': rv}
168                 try:
169                         rv = json.dumps(rv)
170                 except:
171                         return self.doError(r'Error encoding reply in JSON')
172                 rv = rv.encode('utf8')
173                 return self.sendReply(200, rv, headers=self._JSONHeaders)
174         
175         def handle_close(self):
176                 self.cleanupLP()
177                 super().handle_close()
178         
179         def handle_request(self):
180                 if not self.Username:
181                         return self.doAuthenticate()
182                 if not self.method in (b'GET', b'POST'):
183                         return self.sendReply(405)
184                 if not self.path in (b'/', b'/LP', b'/LP/'):
185                         return self.sendReply(404)
186                 try:
187                         if self.path[:3] == b'/LP':
188                                 return self.doLongpoll()
189                         data = b''.join(self.incoming)
190                         return self.doJSON(data)
191                 except socket.error:
192                         raise
193                 except WithinLongpoll:
194                         raise
195                 except:
196                         self.logger.error(traceback.format_exc())
197                         return self.doError('uncaught error')
198         
199         def reset_request(self):
200                 self._LP = False
201                 super().reset_request()
202         
203 setattr(JSONRPCHandler, 'doHeader_user-agent', JSONRPCHandler.doHeader_user_agent);
204 setattr(JSONRPCHandler, 'doHeader_x-minimum-wait', JSONRPCHandler.doHeader_x_minimum_wait);
205 setattr(JSONRPCHandler, 'doHeader_x-mining-extensions', JSONRPCHandler.doHeader_x_mining_extensions);
206
207 JSONRPCListener = networkserver.NetworkListener
208
209 class JSONRPCServer(networkserver.AsyncSocketServer):
210         logger = logging.getLogger('JSONRPCServer')
211         
212         waker = True
213         
214         def __init__(self, *a, **ka):
215                 ka.setdefault('RequestHandlerClass', JSONRPCHandler)
216                 super().__init__(*a, **ka)
217                 
218                 self.SecretUser = None
219                 
220                 self.LPRequest = False
221                 self._LPClients = {}
222                 self._LPWaitTime = time() + 15
223                 
224                 self.LPTracking = {}
225         
226         def pre_schedule(self):
227                 if self.LPRequest == 1:
228                         self._LPsch()
229         
230         def wakeLongpoll(self):
231                 if self.LPRequest:
232                         self.logger.info('Ignoring longpoll attempt while another is waiting')
233                         return
234                 self.LPRequest = 1
235                 self.wakeup()
236         
237         def _LPsch(self):
238                 now = time()
239                 if self._LPWaitTime > now:
240                         delay = self._LPWaitTime - now
241                         self.logger.info('Waiting %.3g seconds to longpoll' % (delay,))
242                         self.schedule(self._actualLP, self._LPWaitTime)
243                         self.LPRequest = 2
244                 else:
245                         self._actualLP()
246         
247         def _actualLP(self):
248                 self.LPRequest = False
249                 C = tuple(self._LPClients.values())
250                 self._LPClients = {}
251                 if not C:
252                         self.logger.info('Nobody to longpoll')
253                         return
254                 OC = len(C)
255                 self.logger.debug("%d clients to wake up..." % (OC,))
256                 
257                 now = time()
258                 
259                 for ic in C:
260                         try:
261                                 ic.wakeLongpoll()
262                         except socket.error:
263                                 OC -= 1
264                                 # Ignore socket errors; let the main event loop take care of them later
265                         except:
266                                 OC -= 1
267                                 self.logger.debug('Error waking longpoll handler:\n' + traceback.format_exc())
268                 
269                 self._LPWaitTime = time()
270                 self.logger.info('Longpoll woke up %d clients in %.3f seconds' % (OC, self._LPWaitTime - now))
271                 self._LPWaitTime += 5  # TODO: make configurable: minimum time between longpolls
272         
273         def TopLPers(self, n = 0x10):
274                 tmp = list(self.LPTracking.keys())
275                 tmp.sort(key=lambda k: self.LPTracking[k])
276                 for jerk in map(lambda k: (k, self.LPTracking[k]), tmp[-n:]):
277                         print(jerk)