UniqueSessionIdManager: Support for delaying releases of session ids, and picking...
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import collections
19 from copy import deepcopy
20 import json
21 import logging
22 import networkserver
23 import socket
24 import struct
25 from time import time
26 import traceback
27 from util import RejectedShare, swap32, target2bdiff, UniqueSessionIdManager
28
29 class StratumError(BaseException):
30         def __init__(self, errno, msg, tb = True):
31                 self.StratumErrNo = errno
32                 self.StratumErrMsg = msg
33                 self.StratumTB = tb
34
35 StratumCodes = {
36         'stale-prevblk': 21,
37         'stale-work': 21,
38         'duplicate': 22,
39         'H-not-zero': 23,
40         'high-hash': 23,
41 }
42
43 class StratumHandler(networkserver.SocketHandler):
44         logger = logging.getLogger('StratumHandler')
45         
46         def __init__(self, *a, **ka):
47                 super().__init__(*a, **ka)
48                 self.remoteHost = self.addr[0]
49                 self.changeTask(None)
50                 self.set_terminator(b"\n")
51                 self.Usernames = {}
52                 self.lastBDiff = None
53                 self.JobTargets = collections.OrderedDict()
54         
55         def sendReply(self, ob):
56                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
57         
58         def found_terminator(self):
59                 inbuf = b"".join(self.incoming).decode('ascii')
60                 self.incoming = []
61                 
62                 if not inbuf:
63                         return
64                 
65                 try:
66                         rpc = json.loads(inbuf)
67                 except ValueError:
68                         self.boot()
69                         return
70                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
71                 if not hasattr(self, funcname):
72                         self.sendReply({
73                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
74                                 'id': rpc['id'],
75                                 'result': None,
76                         })
77                         return
78                 
79                 try:
80                         rv = getattr(self, funcname)(*rpc['params'])
81                 except StratumError as e:
82                         self.sendReply({
83                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
84                                 'id': rpc['id'],
85                                 'result': None,
86                         })
87                         return
88                 except BaseException as e:
89                         fexc = traceback.format_exc()
90                         self.sendReply({
91                                 'error': (20, str(e), fexc),
92                                 'id': rpc['id'],
93                                 'result': None,
94                         })
95                         if not hasattr(e, 'StratumQuiet'):
96                                 self.logger.debug(fexc)
97                         return
98                 
99                 self.sendReply({
100                         'error': None,
101                         'id': rpc['id'],
102                         'result': rv,
103                 })
104         
105         def sendJob(self):
106                 target = self.server.defaultTarget
107                 if len(self.Usernames) == 1:
108                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
109                         if not dtarget is None:
110                                 target = dtarget
111                 bdiff = target2bdiff(target)
112                 if self.lastBDiff != bdiff:
113                         self.sendReply({
114                                 'id': None,
115                                 'method': 'mining.set_difficulty',
116                                 'params': [
117                                         bdiff
118                                 ],
119                         })
120                         self.lastBDiff = bdiff
121                 self.push(self.server.JobBytes)
122                 if len(self.JobTargets) > 4:
123                         self.JobTargets.popitem(False)
124                 self.JobTargets[self.server.JobId] = target
125         
126         def _stratum_mining_subscribe(self):
127                 if not hasattr(self, '_sid'):
128                         self._sid = UniqueSessionIdManager.get()
129                 xid = struct.pack('=I', self._sid)  # NOTE: Assumes sessionids are 4 bytes
130                 self.extranonce1 = xid
131                 xid = b2a_hex(xid).decode('ascii')
132                 self.server._Clients[id(self)] = self
133                 self.changeTask(self.sendJob, 0)
134                 return [
135                         [
136                                 ['mining.notify', '%s1' % (xid,)],
137                                 ['mining.set_difficulty', '%s2' % (xid,)],
138                         ],
139                         xid,
140                         4,
141                 ]
142         
143         def handle_close(self):
144                 if hasattr(self, '_sid'):
145                         UniqueSessionIdManager.put(self._sid)
146                         delattr(self, '_sid')
147                 try:
148                         del self.server._Clients[id(self)]
149                 except:
150                         pass
151                 super().handle_close()
152         
153         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
154                 if username not in self.Usernames:
155                         raise StratumError(24, 'unauthorized-user', False)
156                 share = {
157                         'username': username,
158                         'remoteHost': self.remoteHost,
159                         'jobid': jobid,
160                         'extranonce1': self.extranonce1,
161                         'extranonce2': bytes.fromhex(extranonce2),
162                         'ntime': bytes.fromhex(ntime),
163                         'nonce': bytes.fromhex(nonce),
164                 }
165                 if jobid in self.JobTargets:
166                         share['target'] = self.JobTargets[jobid]
167                 try:
168                         self.server.receiveShare(share)
169                 except RejectedShare as rej:
170                         rej = str(rej)
171                         errno = StratumCodes.get(rej, 20)
172                         raise StratumError(errno, rej, False)
173                 return True
174         
175         def checkAuthentication(self, username, password):
176                 return True
177         
178         def _stratum_mining_authorize(self, username, password = None):
179                 try:
180                         valid = self.checkAuthentication(username, password)
181                 except:
182                         valid = False
183                 if valid:
184                         self.Usernames[username] = None
185                 return valid
186         
187         def _stratum_mining_get_transactions(self, jobid):
188                 try:
189                         (MC, wld) = self.server.getExistingStratumJob(jobid)
190                 except KeyError as e:
191                         e.StratumQuiet = True
192                         raise
193                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
194                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
195
196 class StratumServer(networkserver.AsyncSocketServer):
197         logger = logging.getLogger('StratumServer')
198         
199         waker = True
200         schMT = True
201         
202         extranonce1null = struct.pack('=I', 0)  # NOTE: Assumes sessionids are 4 bytes
203         
204         def __init__(self, *a, **ka):
205                 ka.setdefault('RequestHandlerClass', StratumHandler)
206                 super().__init__(*a, **ka)
207                 
208                 self._Clients = {}
209                 self._JobId = 0
210                 self.JobId = '%d' % (time(),)
211                 self.WakeRequest = None
212                 self.UpdateTask = None
213         
214         def updateJob(self, wantClear = False):
215                 if self.UpdateTask:
216                         try:
217                                 self.rmSchedule(self.UpdateTask)
218                         except:
219                                 pass
220                 
221                 self._JobId += 1
222                 JobId = '%d %d' % (time(), self._JobId)
223                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
224                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
225                 
226                 if len(cb) > 96 - len(self.extranonce1null) - 4:
227                         if not self.rejecting:
228                                 self.logger.warning('Coinbase too big for stratum: disabling')
229                         self.rejecting = True
230                         self.boot_all()
231                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
232                         return
233                 elif self.rejecting:
234                         self.rejecting = False
235                         self.logger.info('Coinbase small enough for stratum again: reenabling')
236                 
237                 txn = deepcopy(merkleTree.data[0])
238                 cb += self.extranonce1null + b'Eloi'
239                 txn.setCoinbase(cb)
240                 txn.assemble()
241                 pos = txn.data.index(cb) + len(cb)
242                 
243                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
244                 
245                 self.JobBytes = json.dumps({
246                         'id': None,
247                         'method': 'mining.notify',
248                         'params': [
249                                 JobId,
250                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
251                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
252                                 b2a_hex(txn.data[pos:]).decode('ascii'),
253                                 steps,
254                                 '00000002',
255                                 b2a_hex(bits[::-1]).decode('ascii'),
256                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
257                                 not self.IsJobValid(self.JobId)
258                         ],
259                 }).encode('ascii') + b"\n"
260                 self.JobId = JobId
261                 
262                 self.WakeRequest = 1
263                 self.wakeup()
264                 
265                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
266         
267         def pre_schedule(self):
268                 if self.WakeRequest:
269                         self._wakeNodes()
270         
271         def _wakeNodes(self):
272                 self.WakeRequest = None
273                 C = self._Clients
274                 if not C:
275                         self.logger.debug('Nobody to wake up')
276                         return
277                 OC = len(C)
278                 self.logger.debug("%d clients to wake up..." % (OC,))
279                 
280                 now = time()
281                 
282                 for ic in list(C.values()):
283                         try:
284                                 ic.sendJob()
285                         except socket.error:
286                                 OC -= 1
287                                 # Ignore socket errors; let the main event loop take care of them later
288                         except:
289                                 OC -= 1
290                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
291                 
292                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
293         
294         def getTarget(*a, **ka):
295                 return None