Merge commit 'd5eaf7b' into stratum_sidmanager
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import collections
19 from copy import deepcopy
20 import json
21 import logging
22 import networkserver
23 import socket
24 import struct
25 from time import time
26 import traceback
27 from util import RejectedShare, swap32, target2bdiff, UniqueSessionIdManager
28
29 class StratumError(BaseException):
30         def __init__(self, errno, msg, tb = True):
31                 self.StratumErrNo = errno
32                 self.StratumErrMsg = msg
33                 self.StratumTB = tb
34
35 StratumCodes = {
36         'stale-prevblk': 21,
37         'stale-work': 21,
38         'duplicate': 22,
39         'H-not-zero': 23,
40         'high-hash': 23,
41 }
42
43 class StratumHandler(networkserver.SocketHandler):
44         logger = logging.getLogger('StratumHandler')
45         
46         def __init__(self, *a, **ka):
47                 super().__init__(*a, **ka)
48                 self.remoteHost = self.addr[0]
49                 self.changeTask(None)
50                 self.set_terminator(b"\n")
51                 self.Usernames = {}
52                 self.lastBDiff = None
53                 self.JobTargets = collections.OrderedDict()
54                 self.UA = None
55         
56         def sendReply(self, ob):
57                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
58         
59         def found_terminator(self):
60                 inbuf = b"".join(self.incoming).decode('ascii')
61                 self.incoming = []
62                 
63                 if not inbuf:
64                         return
65                 
66                 try:
67                         rpc = json.loads(inbuf)
68                 except ValueError:
69                         self.boot()
70                         return
71                 if 'method' not in rpc:
72                         # Assume this is a reply to our request
73                         funcname = '_stratumreply_%s' % (rpc['id'],)
74                         if not hasattr(self, funcname):
75                                 return
76                         try:
77                                 getattr(self, funcname)(rpc)
78                         except BaseException as e:
79                                 self.logger.debug(traceback.format_exc())
80                         return
81                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
82                 if not hasattr(self, funcname):
83                         self.sendReply({
84                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
85                                 'id': rpc['id'],
86                                 'result': None,
87                         })
88                         return
89                 
90                 try:
91                         rv = getattr(self, funcname)(*rpc['params'])
92                 except StratumError as e:
93                         self.sendReply({
94                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
95                                 'id': rpc['id'],
96                                 'result': None,
97                         })
98                         return
99                 except BaseException as e:
100                         fexc = traceback.format_exc()
101                         self.sendReply({
102                                 'error': (20, str(e), fexc),
103                                 'id': rpc['id'],
104                                 'result': None,
105                         })
106                         if not hasattr(e, 'StratumQuiet'):
107                                 self.logger.debug(fexc)
108                         return
109                 
110                 self.sendReply({
111                         'error': None,
112                         'id': rpc['id'],
113                         'result': rv,
114                 })
115         
116         def sendJob(self):
117                 target = self.server.defaultTarget
118                 if len(self.Usernames) == 1:
119                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
120                         if not dtarget is None:
121                                 target = dtarget
122                 bdiff = target2bdiff(target)
123                 if self.lastBDiff != bdiff:
124                         self.sendReply({
125                                 'id': None,
126                                 'method': 'mining.set_difficulty',
127                                 'params': [
128                                         bdiff
129                                 ],
130                         })
131                         self.lastBDiff = bdiff
132                 self.push(self.server.JobBytes)
133                 if len(self.JobTargets) > 4:
134                         self.JobTargets.popitem(False)
135                 self.JobTargets[self.server.JobId] = target
136         
137         def requestStratumUA(self):
138                 self.sendReply({
139                         'id': 7,
140                         'method': 'client.get_version',
141                         'params': (),
142                 })
143         
144         def _stratumreply_7(self, rpc):
145                 self.UA = rpc.get('result') or rpc
146         
147         def _stratum_mining_subscribe(self):
148                 if not hasattr(self, '_sid'):
149                         self._sid = UniqueSessionIdManager.get()
150                 xid = struct.pack('=I', self._sid)  # NOTE: Assumes sessionids are 4 bytes
151                 self.extranonce1 = xid
152                 xid = b2a_hex(xid).decode('ascii')
153                 self.server._Clients[id(self)] = self
154                 self.changeTask(self.sendJob, 0)
155                 return [
156                         [
157                                 ['mining.notify', '%s1' % (xid,)],
158                                 ['mining.set_difficulty', '%s2' % (xid,)],
159                         ],
160                         xid,
161                         4,
162                 ]
163         
164         def close(self):
165                 if hasattr(self, '_sid'):
166                         UniqueSessionIdManager.put(self._sid)
167                         delattr(self, '_sid')
168                 try:
169                         del self.server._Clients[id(self)]
170                 except:
171                         pass
172                 super().close()
173         
174         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
175                 if username not in self.Usernames:
176                         raise StratumError(24, 'unauthorized-user', False)
177                 share = {
178                         'username': username,
179                         'remoteHost': self.remoteHost,
180                         'jobid': jobid,
181                         'extranonce1': self.extranonce1,
182                         'extranonce2': bytes.fromhex(extranonce2),
183                         'ntime': bytes.fromhex(ntime),
184                         'nonce': bytes.fromhex(nonce),
185                         'userAgent': self.UA,
186                         'submitProtocol': 'stratum',
187                 }
188                 if jobid in self.JobTargets:
189                         share['target'] = self.JobTargets[jobid]
190                 try:
191                         self.server.receiveShare(share)
192                 except RejectedShare as rej:
193                         rej = str(rej)
194                         errno = StratumCodes.get(rej, 20)
195                         raise StratumError(errno, rej, False)
196                 return True
197         
198         def _stratum_mining_authorize(self, username, password = None):
199                 try:
200                         valid = self.server.checkAuthentication(username, password)
201                 except:
202                         valid = False
203                 if valid:
204                         self.Usernames[username] = None
205                         self.changeTask(self.requestStratumUA, 0)
206                 return valid
207         
208         def _stratum_mining_get_transactions(self, jobid):
209                 try:
210                         (MC, wld) = self.server.getExistingStratumJob(jobid)
211                 except KeyError as e:
212                         e.StratumQuiet = True
213                         raise
214                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
215                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
216
217 class StratumServer(networkserver.AsyncSocketServer):
218         logger = logging.getLogger('StratumServer')
219         
220         waker = True
221         schMT = True
222         
223         extranonce1null = struct.pack('=I', 0)  # NOTE: Assumes sessionids are 4 bytes
224         
225         def __init__(self, *a, **ka):
226                 ka.setdefault('RequestHandlerClass', StratumHandler)
227                 super().__init__(*a, **ka)
228                 
229                 self._Clients = {}
230                 self._JobId = 0
231                 self.JobId = '%d' % (time(),)
232                 self.WakeRequest = None
233                 self.UpdateTask = None
234         
235         def checkAuthentication(self, username, password):
236                 return True
237         
238         def updateJob(self, wantClear = False):
239                 if self.UpdateTask:
240                         try:
241                                 self.rmSchedule(self.UpdateTask)
242                         except:
243                                 pass
244                 
245                 self._JobId += 1
246                 JobId = '%d %d' % (time(), self._JobId)
247                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
248                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
249                 
250                 if len(cb) > 96 - len(self.extranonce1null) - 4:
251                         if not self.rejecting:
252                                 self.logger.warning('Coinbase too big for stratum: disabling')
253                         self.rejecting = True
254                         self.boot_all()
255                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
256                         return
257                 elif self.rejecting:
258                         self.rejecting = False
259                         self.logger.info('Coinbase small enough for stratum again: reenabling')
260                 
261                 txn = deepcopy(merkleTree.data[0])
262                 cb += self.extranonce1null + b'Eloi'
263                 txn.setCoinbase(cb)
264                 txn.assemble()
265                 pos = txn.data.index(cb) + len(cb)
266                 
267                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
268                 
269                 self.JobBytes = json.dumps({
270                         'id': None,
271                         'method': 'mining.notify',
272                         'params': [
273                                 JobId,
274                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
275                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
276                                 b2a_hex(txn.data[pos:]).decode('ascii'),
277                                 steps,
278                                 '00000002',
279                                 b2a_hex(bits[::-1]).decode('ascii'),
280                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
281                                 not self.IsJobValid(self.JobId)
282                         ],
283                 }).encode('ascii') + b"\n"
284                 self.JobId = JobId
285                 
286                 self.WakeRequest = 1
287                 self.wakeup()
288                 
289                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
290         
291         def pre_schedule(self):
292                 if self.WakeRequest:
293                         self._wakeNodes()
294         
295         def _wakeNodes(self):
296                 self.WakeRequest = None
297                 C = self._Clients
298                 if not C:
299                         self.logger.debug('Nobody to wake up')
300                         return
301                 OC = len(C)
302                 self.logger.debug("%d clients to wake up..." % (OC,))
303                 
304                 now = time()
305                 
306                 for ic in list(C.values()):
307                         try:
308                                 ic.sendJob()
309                         except socket.error:
310                                 OC -= 1
311                                 # Ignore socket errors; let the main event loop take care of them later
312                         except:
313                                 OC -= 1
314                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
315                 
316                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
317         
318         def getTarget(*a, **ka):
319                 return None