Merge branch 'stratum_sidmanager'
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import collections
19 from copy import deepcopy
20 import json
21 import logging
22 import networkserver
23 import socket
24 import struct
25 from time import time
26 import traceback
27 from util import RejectedShare, swap32, target2bdiff, UniqueSessionIdManager
28
29 class StratumError(BaseException):
30         def __init__(self, errno, msg, tb = True):
31                 self.StratumErrNo = errno
32                 self.StratumErrMsg = msg
33                 self.StratumTB = tb
34
35 StratumCodes = {
36         'stale-prevblk': 21,
37         'stale-work': 21,
38         'duplicate': 22,
39         'H-not-zero': 23,
40         'high-hash': 23,
41 }
42
43 class StratumHandler(networkserver.SocketHandler):
44         logger = logging.getLogger('StratumHandler')
45         
46         def __init__(self, *a, **ka):
47                 super().__init__(*a, **ka)
48                 self.remoteHost = self.addr[0]
49                 self.changeTask(None)
50                 self.set_terminator(b"\n")
51                 self.Usernames = {}
52                 self.lastBDiff = None
53                 self.JobTargets = collections.OrderedDict()
54                 self.UA = None
55         
56         def sendReply(self, ob):
57                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
58         
59         def found_terminator(self):
60                 inbuf = b"".join(self.incoming).decode('ascii')
61                 self.incoming = []
62                 
63                 if not inbuf:
64                         return
65                 
66                 try:
67                         rpc = json.loads(inbuf)
68                 except ValueError:
69                         self.boot()
70                         return
71                 if 'method' not in rpc:
72                         # Assume this is a reply to our request
73                         funcname = '_stratumreply_%s' % (rpc['id'],)
74                         if not hasattr(self, funcname):
75                                 return
76                         try:
77                                 getattr(self, funcname)(rpc)
78                         except BaseException as e:
79                                 self.logger.debug(traceback.format_exc())
80                         return
81                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
82                 if not hasattr(self, funcname):
83                         self.sendReply({
84                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
85                                 'id': rpc['id'],
86                                 'result': None,
87                         })
88                         return
89                 
90                 try:
91                         rv = getattr(self, funcname)(*rpc['params'])
92                 except StratumError as e:
93                         self.sendReply({
94                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
95                                 'id': rpc['id'],
96                                 'result': None,
97                         })
98                         return
99                 except BaseException as e:
100                         fexc = traceback.format_exc()
101                         self.sendReply({
102                                 'error': (20, str(e), fexc),
103                                 'id': rpc['id'],
104                                 'result': None,
105                         })
106                         if not hasattr(e, 'StratumQuiet'):
107                                 self.logger.debug(fexc)
108                         return
109                 
110                 self.sendReply({
111                         'error': None,
112                         'id': rpc['id'],
113                         'result': rv,
114                 })
115         
116         def sendJob(self):
117                 target = self.server.defaultTarget
118                 if len(self.Usernames) == 1:
119                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
120                         if not dtarget is None:
121                                 target = dtarget
122                 bdiff = target2bdiff(target)
123                 if self.lastBDiff != bdiff:
124                         self.sendReply({
125                                 'id': None,
126                                 'method': 'mining.set_difficulty',
127                                 'params': [
128                                         bdiff
129                                 ],
130                         })
131                         self.lastBDiff = bdiff
132                 self.push(self.server.JobBytes)
133                 if len(self.JobTargets) > 4:
134                         self.JobTargets.popitem(False)
135                 self.JobTargets[self.server.JobId] = target
136         
137         def requestStratumUA(self):
138                 self.sendReply({
139                         'id': 7,
140                         'method': 'client.get_version',
141                         'params': (),
142                 })
143         
144         def _stratumreply_7(self, rpc):
145                 self.UA = rpc.get('result') or rpc
146         
147         def _stratum_mining_subscribe(self, UA = None, xid = None):
148                 if not UA is None:
149                         self.UA = UA
150                 if not hasattr(self, '_sid'):
151                         self._sid = UniqueSessionIdManager.get()
152                 if self.server._Clients.get(self._sid) not in (self, None):
153                         del self._sid
154                         raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate sessionid'))
155                 xid = struct.pack('=I', self._sid)  # NOTE: Assumes sessionids are 4 bytes
156                 self.extranonce1 = xid
157                 xid = b2a_hex(xid).decode('ascii')
158                 self.server._Clients[id(self)] = self
159                 self.changeTask(self.sendJob, 0)
160                 return [
161                         [
162                                 ['mining.notify', '%s1' % (xid,)],
163                                 ['mining.set_difficulty', '%s2' % (xid,)],
164                         ],
165                         xid,
166                         4,
167                 ]
168         
169         def close(self):
170                 if hasattr(self, '_sid'):
171                         UniqueSessionIdManager.put(self._sid)
172                         delattr(self, '_sid')
173                 try:
174                         del self.server._Clients[id(self)]
175                 except:
176                         pass
177                 super().close()
178         
179         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
180                 if username not in self.Usernames:
181                         raise StratumError(24, 'unauthorized-user', False)
182                 share = {
183                         'username': username,
184                         'remoteHost': self.remoteHost,
185                         'jobid': jobid,
186                         'extranonce1': self.extranonce1,
187                         'extranonce2': bytes.fromhex(extranonce2),
188                         'ntime': bytes.fromhex(ntime),
189                         'nonce': bytes.fromhex(nonce),
190                         'userAgent': self.UA,
191                         'submitProtocol': 'stratum',
192                 }
193                 if jobid in self.JobTargets:
194                         share['target'] = self.JobTargets[jobid]
195                 try:
196                         self.server.receiveShare(share)
197                 except RejectedShare as rej:
198                         rej = str(rej)
199                         errno = StratumCodes.get(rej, 20)
200                         raise StratumError(errno, rej, False)
201                 return True
202         
203         def _stratum_mining_authorize(self, username, password = None):
204                 try:
205                         valid = self.server.checkAuthentication(username, password)
206                 except:
207                         valid = False
208                 if valid:
209                         self.Usernames[username] = None
210                         self.changeTask(self.requestStratumUA, 0)
211                 return valid
212         
213         def _stratum_mining_get_transactions(self, jobid):
214                 try:
215                         (MC, wld) = self.server.getExistingStratumJob(jobid)
216                 except KeyError as e:
217                         e.StratumQuiet = True
218                         raise
219                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
220                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
221
222 class StratumServer(networkserver.AsyncSocketServer):
223         logger = logging.getLogger('StratumServer')
224         
225         waker = True
226         schMT = True
227         
228         extranonce1null = struct.pack('=I', 0)  # NOTE: Assumes sessionids are 4 bytes
229         
230         def __init__(self, *a, **ka):
231                 ka.setdefault('RequestHandlerClass', StratumHandler)
232                 super().__init__(*a, **ka)
233                 
234                 self._Clients = {}
235                 self._JobId = 0
236                 self.JobId = '%d' % (time(),)
237                 self.WakeRequest = None
238                 self.UpdateTask = None
239         
240         def checkAuthentication(self, username, password):
241                 return True
242         
243         def updateJob(self, wantClear = False):
244                 if self.UpdateTask:
245                         try:
246                                 self.rmSchedule(self.UpdateTask)
247                         except:
248                                 pass
249                 
250                 self._JobId += 1
251                 JobId = '%d %d' % (time(), self._JobId)
252                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
253                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
254                 
255                 if len(cb) > 96 - len(self.extranonce1null) - 4:
256                         if not self.rejecting:
257                                 self.logger.warning('Coinbase too big for stratum: disabling')
258                         self.rejecting = True
259                         self.boot_all()
260                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
261                         return
262                 elif self.rejecting:
263                         self.rejecting = False
264                         self.logger.info('Coinbase small enough for stratum again: reenabling')
265                 
266                 txn = deepcopy(merkleTree.data[0])
267                 cb += self.extranonce1null + b'Eloi'
268                 txn.setCoinbase(cb)
269                 txn.assemble()
270                 pos = txn.data.index(cb) + len(cb)
271                 
272                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
273                 
274                 self.JobBytes = json.dumps({
275                         'id': None,
276                         'method': 'mining.notify',
277                         'params': [
278                                 JobId,
279                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
280                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
281                                 b2a_hex(txn.data[pos:]).decode('ascii'),
282                                 steps,
283                                 '00000002',
284                                 b2a_hex(bits[::-1]).decode('ascii'),
285                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
286                                 not self.IsJobValid(self.JobId)
287                         ],
288                 }).encode('ascii') + b"\n"
289                 self.JobId = JobId
290                 
291                 self.WakeRequest = 1
292                 self.wakeup()
293                 
294                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
295         
296         def pre_schedule(self):
297                 if self.WakeRequest:
298                         self._wakeNodes()
299         
300         def _wakeNodes(self):
301                 self.WakeRequest = None
302                 C = self._Clients
303                 if not C:
304                         self.logger.debug('Nobody to wake up')
305                         return
306                 OC = len(C)
307                 self.logger.debug("%d clients to wake up..." % (OC,))
308                 
309                 now = time()
310                 
311                 for ic in list(C.values()):
312                         try:
313                                 ic.sendJob()
314                         except socket.error:
315                                 OC -= 1
316                                 # Ignore socket errors; let the main event loop take care of them later
317                         except:
318                                 OC -= 1
319                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
320                 
321                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
322         
323         def getTarget(*a, **ka):
324                 return None