stratumserver: Truncate overlong extranonce2 in share submissions to workaround BitFo...
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 import agplcompliance
18 from binascii import b2a_hex
19 import collections
20 from copy import deepcopy
21 import json
22 import logging
23 import networkserver
24 import socket
25 import struct
26 from time import time
27 import traceback
28 from util import RejectedShare, swap32, target2bdiff, UniqueSessionIdManager
29
30 extranonce2sz = 4
31
32 class StratumError(BaseException):
33         def __init__(self, errno, msg, tb = True):
34                 self.StratumErrNo = errno
35                 self.StratumErrMsg = msg
36                 self.StratumTB = tb
37
38 StratumCodes = {
39         'stale-prevblk': 21,
40         'stale-work': 21,
41         'duplicate': 22,
42         'H-not-zero': 23,
43         'high-hash': 23,
44 }
45
46 class StratumHandler(networkserver.SocketHandler):
47         logger = logging.getLogger('StratumHandler')
48         
49         def __init__(self, *a, **ka):
50                 super().__init__(*a, **ka)
51                 self.remoteHost = self.addr[0]
52                 self.changeTask(None)
53                 self.server.schedule(self.sendLicenseNotice, time() + 4, errHandler=self)
54                 self.set_terminator(b"\n")
55                 self.Usernames = {}
56                 self.lastBDiff = None
57                 self.JobTargets = collections.OrderedDict()
58                 self.UA = None
59                 self.LicenseSent = agplcompliance._SourceFiles is None
60         
61         def sendReply(self, ob):
62                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
63         
64         def found_terminator(self):
65                 inbuf = b"".join(self.incoming).decode('ascii')
66                 self.incoming = []
67                 
68                 if not inbuf:
69                         return
70                 
71                 try:
72                         rpc = json.loads(inbuf)
73                 except ValueError:
74                         self.boot()
75                         return
76                 if 'method' not in rpc:
77                         # Assume this is a reply to our request
78                         funcname = '_stratumreply_%s' % (rpc['id'],)
79                         if not hasattr(self, funcname):
80                                 return
81                         try:
82                                 getattr(self, funcname)(rpc)
83                         except BaseException as e:
84                                 self.logger.debug(traceback.format_exc())
85                         return
86                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
87                 if not hasattr(self, funcname):
88                         self.sendReply({
89                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
90                                 'id': rpc['id'],
91                                 'result': None,
92                         })
93                         return
94                 
95                 try:
96                         rv = getattr(self, funcname)(*rpc['params'])
97                 except StratumError as e:
98                         self.sendReply({
99                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
100                                 'id': rpc['id'],
101                                 'result': None,
102                         })
103                         return
104                 except BaseException as e:
105                         fexc = traceback.format_exc()
106                         self.sendReply({
107                                 'error': (20, str(e), fexc),
108                                 'id': rpc['id'],
109                                 'result': None,
110                         })
111                         if not hasattr(e, 'StratumQuiet'):
112                                 self.logger.debug(fexc)
113                         return
114                 
115                 if rpc['id'] is None:
116                         return
117                 
118                 self.sendReply({
119                         'error': None,
120                         'id': rpc['id'],
121                         'result': rv,
122                 })
123         
124         def sendLicenseNotice(self):
125                 if self.fd == -1:
126                         return
127                 if not self.LicenseSent:
128                         self.sendReply({
129                                 'id': 8,
130                                 'method': 'client.show_message',
131                                 'params': ('This stratum server is licensed under the GNU Affero General Public License, version 3. You may download source code over stratum using the server.get_source method.',),
132                         })
133                 self.LicenseSent = True
134         
135         def sendJob(self):
136                 target = self.server.defaultTarget
137                 if len(self.Usernames) == 1:
138                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
139                         if not dtarget is None:
140                                 target = dtarget
141                 bdiff = target2bdiff(target)
142                 if self.lastBDiff != bdiff:
143                         self.sendReply({
144                                 'id': None,
145                                 'method': 'mining.set_difficulty',
146                                 'params': [
147                                         bdiff
148                                 ],
149                         })
150                         self.lastBDiff = bdiff
151                 self.push(self.server.JobBytes)
152                 if len(self.JobTargets) > 4:
153                         self.JobTargets.popitem(False)
154                 self.JobTargets[self.server.JobId] = target
155         
156         def requestStratumUA(self):
157                 self.sendReply({
158                         'id': 7,
159                         'method': 'client.get_version',
160                         'params': (),
161                 })
162         
163         def _stratumreply_7(self, rpc):
164                 self.UA = rpc.get('result') or rpc
165         
166         def _stratum_mining_subscribe(self, UA = None, xid = None):
167                 if not UA is None:
168                         self.UA = UA
169                 if not hasattr(self, '_sid'):
170                         self._sid = UniqueSessionIdManager.get()
171                 if self.server._Clients.get(self._sid) not in (self, None):
172                         del self._sid
173                         raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate sessionid'))
174                 xid = struct.pack('=I', self._sid)  # NOTE: Assumes sessionids are 4 bytes
175                 self.extranonce1 = xid
176                 xid = b2a_hex(xid).decode('ascii')
177                 self.server._Clients[id(self)] = self
178                 self.changeTask(self.sendJob, 0)
179                 return [
180                         [
181                                 ['mining.notify', '%s1' % (xid,)],
182                                 ['mining.set_difficulty', '%s2' % (xid,)],
183                         ],
184                         xid,
185                         extranonce2sz,
186                 ]
187         
188         def close(self):
189                 if hasattr(self, '_sid'):
190                         UniqueSessionIdManager.put(self._sid)
191                         delattr(self, '_sid')
192                 try:
193                         del self.server._Clients[id(self)]
194                 except:
195                         pass
196                 super().close()
197         
198         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
199                 if username not in self.Usernames:
200                         raise StratumError(24, 'unauthorized-user', False)
201                 share = {
202                         'username': username,
203                         'remoteHost': self.remoteHost,
204                         'jobid': jobid,
205                         'extranonce1': self.extranonce1,
206                         'extranonce2': bytes.fromhex(extranonce2)[:extranonce2sz],
207                         'ntime': bytes.fromhex(ntime),
208                         'nonce': bytes.fromhex(nonce),
209                         'userAgent': self.UA,
210                         'submitProtocol': 'stratum',
211                 }
212                 if jobid in self.JobTargets:
213                         share['target'] = self.JobTargets[jobid]
214                 try:
215                         self.server.receiveShare(share)
216                 except RejectedShare as rej:
217                         rej = str(rej)
218                         errno = StratumCodes.get(rej, 20)
219                         raise StratumError(errno, rej, False)
220                 return True
221         
222         def _stratum_mining_authorize(self, username, password = None):
223                 try:
224                         valid = self.server.checkAuthentication(username, password)
225                 except:
226                         valid = False
227                 if valid:
228                         self.Usernames[username] = None
229                         self.changeTask(self.requestStratumUA, 0)
230                 return valid
231         
232         def _stratum_mining_get_transactions(self, jobid):
233                 try:
234                         (MC, wld) = self.server.getExistingStratumJob(jobid)
235                 except KeyError as e:
236                         e.StratumQuiet = True
237                         raise
238                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
239                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
240         
241         def _stratum_server_get_source(self, path = ''):
242                 s = agplcompliance.get_source(path.encode('utf8'))
243                 if s:
244                         s = list(s)
245                         s[1] = s[1].decode('latin-1')
246                 return s
247
248 class StratumServer(networkserver.AsyncSocketServer):
249         logger = logging.getLogger('StratumServer')
250         
251         waker = True
252         schMT = True
253         
254         extranonce1null = struct.pack('=I', 0)  # NOTE: Assumes sessionids are 4 bytes
255         
256         def __init__(self, *a, **ka):
257                 ka.setdefault('RequestHandlerClass', StratumHandler)
258                 super().__init__(*a, **ka)
259                 
260                 self._Clients = {}
261                 self._JobId = 0
262                 self.JobId = '%d' % (time(),)
263                 self.WakeRequest = None
264                 self.UpdateTask = None
265                 self._PendingQuickUpdates = set()
266         
267         def checkAuthentication(self, username, password):
268                 return True
269         
270         def updateJobOnly(self, wantClear = False, forceClean = False):
271                 self._JobId += 1
272                 JobId = '%d %d' % (time(), self._JobId)
273                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
274                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
275                 
276                 if len(cb) > 96 - len(self.extranonce1null) - 4:
277                         if not self.rejecting:
278                                 self.logger.warning('Coinbase too big for stratum: disabling')
279                         self.rejecting = True
280                         self.boot_all()
281                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
282                         return
283                 elif self.rejecting:
284                         self.rejecting = False
285                         self.logger.info('Coinbase small enough for stratum again: reenabling')
286                 
287                 txn = deepcopy(merkleTree.data[0])
288                 cb += self.extranonce1null + b'Eloi'
289                 txn.setCoinbase(cb)
290                 txn.assemble()
291                 pos = txn.data.index(cb) + len(cb)
292                 
293                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
294                 
295                 self.JobBytes = json.dumps({
296                         'id': None,
297                         'method': 'mining.notify',
298                         'params': [
299                                 JobId,
300                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
301                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
302                                 b2a_hex(txn.data[pos:]).decode('ascii'),
303                                 steps,
304                                 '00000002',
305                                 b2a_hex(bits[::-1]).decode('ascii'),
306                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
307                                 forceClean or not self.IsJobValid(self.JobId)
308                         ],
309                 }).encode('ascii') + b"\n"
310                 self.JobId = JobId
311                 
312         def updateJob(self, wantClear = False):
313                 if self.UpdateTask:
314                         try:
315                                 self.rmSchedule(self.UpdateTask)
316                         except:
317                                 pass
318                 
319                 self.updateJobOnly(wantClear=wantClear)
320                 
321                 self.WakeRequest = 1
322                 self.wakeup()
323                 
324                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
325         
326         def doQuickUpdate(self):
327                 PQU = self._PendingQuickUpdates
328                 self._PendingQuickUpdates = set()
329                 QUC = 0
330                 for ic in list(self._Clients.values()):
331                         if PQU.intersection(ic.Usernames):
332                                 if self.JobId in ic.JobTargets:
333                                         self.updateJobOnly(wantClear=True, forceClean=True)
334                                 try:
335                                         ic.sendJob()
336                                         QUC += 1
337                                 except socket.error:
338                                         # Ignore socket errors; let the main event loop take care of them later
339                                         pass
340                                 except:
341                                         self.logger.debug('Error sending quickupdate job:\n' + traceback.format_exc())
342                 if QUC:
343                         self.logger.debug("Quickupdated %d clients" % (QUC,))
344         
345         def quickDifficultyUpdate(self, username):
346                 self._PendingQuickUpdates.add(username)
347                 self.schedule(self.doQuickUpdate, time())
348         
349         def pre_schedule(self):
350                 if self.WakeRequest:
351                         self._wakeNodes()
352         
353         def _wakeNodes(self):
354                 self.WakeRequest = None
355                 C = self._Clients
356                 if not C:
357                         self.logger.debug('Nobody to wake up')
358                         return
359                 OC = len(C)
360                 self.logger.debug("%d clients to wake up..." % (OC,))
361                 
362                 now = time()
363                 
364                 for ic in list(C.values()):
365                         try:
366                                 ic.sendJob()
367                         except socket.error:
368                                 OC -= 1
369                                 # Ignore socket errors; let the main event loop take care of them later
370                         except:
371                                 OC -= 1
372                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
373                 
374                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
375         
376         def getTarget(*a, **ka):
377                 return None