StratumHandler: Safety check against issuing the same SessionId to multiple clients
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import collections
19 from copy import deepcopy
20 import json
21 import logging
22 import networkserver
23 import socket
24 import struct
25 from time import time
26 import traceback
27 from util import RejectedShare, swap32, target2bdiff, UniqueSessionIdManager
28
29 class StratumError(BaseException):
30         def __init__(self, errno, msg, tb = True):
31                 self.StratumErrNo = errno
32                 self.StratumErrMsg = msg
33                 self.StratumTB = tb
34
35 StratumCodes = {
36         'stale-prevblk': 21,
37         'stale-work': 21,
38         'duplicate': 22,
39         'H-not-zero': 23,
40         'high-hash': 23,
41 }
42
43 class StratumHandler(networkserver.SocketHandler):
44         logger = logging.getLogger('StratumHandler')
45         
46         def __init__(self, *a, **ka):
47                 super().__init__(*a, **ka)
48                 self.remoteHost = self.addr[0]
49                 self.changeTask(None)
50                 self.set_terminator(b"\n")
51                 self.Usernames = {}
52                 self.lastBDiff = None
53                 self.JobTargets = collections.OrderedDict()
54                 self.UA = None
55         
56         def sendReply(self, ob):
57                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
58         
59         def found_terminator(self):
60                 inbuf = b"".join(self.incoming).decode('ascii')
61                 self.incoming = []
62                 
63                 if not inbuf:
64                         return
65                 
66                 try:
67                         rpc = json.loads(inbuf)
68                 except ValueError:
69                         self.boot()
70                         return
71                 if 'method' not in rpc:
72                         # Assume this is a reply to our request
73                         funcname = '_stratumreply_%s' % (rpc['id'],)
74                         if not hasattr(self, funcname):
75                                 return
76                         try:
77                                 getattr(self, funcname)(rpc)
78                         except BaseException as e:
79                                 self.logger.debug(traceback.format_exc())
80                         return
81                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
82                 if not hasattr(self, funcname):
83                         self.sendReply({
84                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
85                                 'id': rpc['id'],
86                                 'result': None,
87                         })
88                         return
89                 
90                 try:
91                         rv = getattr(self, funcname)(*rpc['params'])
92                 except StratumError as e:
93                         self.sendReply({
94                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
95                                 'id': rpc['id'],
96                                 'result': None,
97                         })
98                         return
99                 except BaseException as e:
100                         fexc = traceback.format_exc()
101                         self.sendReply({
102                                 'error': (20, str(e), fexc),
103                                 'id': rpc['id'],
104                                 'result': None,
105                         })
106                         if not hasattr(e, 'StratumQuiet'):
107                                 self.logger.debug(fexc)
108                         return
109                 
110                 self.sendReply({
111                         'error': None,
112                         'id': rpc['id'],
113                         'result': rv,
114                 })
115         
116         def sendJob(self):
117                 target = self.server.defaultTarget
118                 if len(self.Usernames) == 1:
119                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
120                         if not dtarget is None:
121                                 target = dtarget
122                 bdiff = target2bdiff(target)
123                 if self.lastBDiff != bdiff:
124                         self.sendReply({
125                                 'id': None,
126                                 'method': 'mining.set_difficulty',
127                                 'params': [
128                                         bdiff
129                                 ],
130                         })
131                         self.lastBDiff = bdiff
132                 self.push(self.server.JobBytes)
133                 if len(self.JobTargets) > 4:
134                         self.JobTargets.popitem(False)
135                 self.JobTargets[self.server.JobId] = target
136         
137         def requestStratumUA(self):
138                 self.sendReply({
139                         'id': 7,
140                         'method': 'client.get_version',
141                         'params': (),
142                 })
143         
144         def _stratumreply_7(self, rpc):
145                 self.UA = rpc.get('result') or rpc
146         
147         def _stratum_mining_subscribe(self):
148                 if not hasattr(self, '_sid'):
149                         self._sid = UniqueSessionIdManager.get()
150                 if self.server._Clients.get(self._sid) not in (self, None):
151                         del self._sid
152                         raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate sessionid'))
153                 xid = struct.pack('=I', self._sid)  # NOTE: Assumes sessionids are 4 bytes
154                 self.extranonce1 = xid
155                 xid = b2a_hex(xid).decode('ascii')
156                 self.server._Clients[id(self)] = self
157                 self.changeTask(self.sendJob, 0)
158                 return [
159                         [
160                                 ['mining.notify', '%s1' % (xid,)],
161                                 ['mining.set_difficulty', '%s2' % (xid,)],
162                         ],
163                         xid,
164                         4,
165                 ]
166         
167         def close(self):
168                 if hasattr(self, '_sid'):
169                         UniqueSessionIdManager.put(self._sid)
170                         delattr(self, '_sid')
171                 try:
172                         del self.server._Clients[id(self)]
173                 except:
174                         pass
175                 super().close()
176         
177         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
178                 if username not in self.Usernames:
179                         raise StratumError(24, 'unauthorized-user', False)
180                 share = {
181                         'username': username,
182                         'remoteHost': self.remoteHost,
183                         'jobid': jobid,
184                         'extranonce1': self.extranonce1,
185                         'extranonce2': bytes.fromhex(extranonce2),
186                         'ntime': bytes.fromhex(ntime),
187                         'nonce': bytes.fromhex(nonce),
188                         'userAgent': self.UA,
189                         'submitProtocol': 'stratum',
190                 }
191                 if jobid in self.JobTargets:
192                         share['target'] = self.JobTargets[jobid]
193                 try:
194                         self.server.receiveShare(share)
195                 except RejectedShare as rej:
196                         rej = str(rej)
197                         errno = StratumCodes.get(rej, 20)
198                         raise StratumError(errno, rej, False)
199                 return True
200         
201         def _stratum_mining_authorize(self, username, password = None):
202                 try:
203                         valid = self.server.checkAuthentication(username, password)
204                 except:
205                         valid = False
206                 if valid:
207                         self.Usernames[username] = None
208                         self.changeTask(self.requestStratumUA, 0)
209                 return valid
210         
211         def _stratum_mining_get_transactions(self, jobid):
212                 try:
213                         (MC, wld) = self.server.getExistingStratumJob(jobid)
214                 except KeyError as e:
215                         e.StratumQuiet = True
216                         raise
217                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
218                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
219
220 class StratumServer(networkserver.AsyncSocketServer):
221         logger = logging.getLogger('StratumServer')
222         
223         waker = True
224         schMT = True
225         
226         extranonce1null = struct.pack('=I', 0)  # NOTE: Assumes sessionids are 4 bytes
227         
228         def __init__(self, *a, **ka):
229                 ka.setdefault('RequestHandlerClass', StratumHandler)
230                 super().__init__(*a, **ka)
231                 
232                 self._Clients = {}
233                 self._JobId = 0
234                 self.JobId = '%d' % (time(),)
235                 self.WakeRequest = None
236                 self.UpdateTask = None
237         
238         def checkAuthentication(self, username, password):
239                 return True
240         
241         def updateJob(self, wantClear = False):
242                 if self.UpdateTask:
243                         try:
244                                 self.rmSchedule(self.UpdateTask)
245                         except:
246                                 pass
247                 
248                 self._JobId += 1
249                 JobId = '%d %d' % (time(), self._JobId)
250                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
251                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
252                 
253                 if len(cb) > 96 - len(self.extranonce1null) - 4:
254                         if not self.rejecting:
255                                 self.logger.warning('Coinbase too big for stratum: disabling')
256                         self.rejecting = True
257                         self.boot_all()
258                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
259                         return
260                 elif self.rejecting:
261                         self.rejecting = False
262                         self.logger.info('Coinbase small enough for stratum again: reenabling')
263                 
264                 txn = deepcopy(merkleTree.data[0])
265                 cb += self.extranonce1null + b'Eloi'
266                 txn.setCoinbase(cb)
267                 txn.assemble()
268                 pos = txn.data.index(cb) + len(cb)
269                 
270                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
271                 
272                 self.JobBytes = json.dumps({
273                         'id': None,
274                         'method': 'mining.notify',
275                         'params': [
276                                 JobId,
277                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
278                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
279                                 b2a_hex(txn.data[pos:]).decode('ascii'),
280                                 steps,
281                                 '00000002',
282                                 b2a_hex(bits[::-1]).decode('ascii'),
283                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
284                                 not self.IsJobValid(self.JobId)
285                         ],
286                 }).encode('ascii') + b"\n"
287                 self.JobId = JobId
288                 
289                 self.WakeRequest = 1
290                 self.wakeup()
291                 
292                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
293         
294         def pre_schedule(self):
295                 if self.WakeRequest:
296                         self._wakeNodes()
297         
298         def _wakeNodes(self):
299                 self.WakeRequest = None
300                 C = self._Clients
301                 if not C:
302                         self.logger.debug('Nobody to wake up')
303                         return
304                 OC = len(C)
305                 self.logger.debug("%d clients to wake up..." % (OC,))
306                 
307                 now = time()
308                 
309                 for ic in list(C.values()):
310                         try:
311                                 ic.sendJob()
312                         except socket.error:
313                                 OC -= 1
314                                 # Ignore socket errors; let the main event loop take care of them later
315                         except:
316                                 OC -= 1
317                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
318                 
319                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
320         
321         def getTarget(*a, **ka):
322                 return None