StratumHandler: Accept mining.subscribe(UserAgent, SessionID) method form (SessionID...
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import collections
19 from copy import deepcopy
20 import json
21 import logging
22 import networkserver
23 import socket
24 import struct
25 from time import time
26 import traceback
27 from util import RejectedShare, swap32, target2bdiff
28
29 class StratumError(BaseException):
30         def __init__(self, errno, msg, tb = True):
31                 self.StratumErrNo = errno
32                 self.StratumErrMsg = msg
33                 self.StratumTB = tb
34
35 StratumCodes = {
36         'stale-prevblk': 21,
37         'stale-work': 21,
38         'duplicate': 22,
39         'H-not-zero': 23,
40         'high-hash': 23,
41 }
42
43 class StratumHandler(networkserver.SocketHandler):
44         logger = logging.getLogger('StratumHandler')
45         
46         def __init__(self, *a, **ka):
47                 super().__init__(*a, **ka)
48                 self.remoteHost = self.addr[0]
49                 self.changeTask(None)
50                 self.set_terminator(b"\n")
51                 self.Usernames = {}
52                 self.lastBDiff = None
53                 self.JobTargets = collections.OrderedDict()
54                 self.UA = None
55         
56         def sendReply(self, ob):
57                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
58         
59         def found_terminator(self):
60                 inbuf = b"".join(self.incoming).decode('ascii')
61                 self.incoming = []
62                 
63                 if not inbuf:
64                         return
65                 
66                 try:
67                         rpc = json.loads(inbuf)
68                 except ValueError:
69                         self.boot()
70                         return
71                 if 'method' not in rpc:
72                         # Assume this is a reply to our request
73                         funcname = '_stratumreply_%s' % (rpc['id'],)
74                         if not hasattr(self, funcname):
75                                 return
76                         try:
77                                 getattr(self, funcname)(rpc)
78                         except BaseException as e:
79                                 self.logger.debug(traceback.format_exc())
80                         return
81                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
82                 if not hasattr(self, funcname):
83                         self.sendReply({
84                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
85                                 'id': rpc['id'],
86                                 'result': None,
87                         })
88                         return
89                 
90                 try:
91                         rv = getattr(self, funcname)(*rpc['params'])
92                 except StratumError as e:
93                         self.sendReply({
94                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
95                                 'id': rpc['id'],
96                                 'result': None,
97                         })
98                         return
99                 except BaseException as e:
100                         fexc = traceback.format_exc()
101                         self.sendReply({
102                                 'error': (20, str(e), fexc),
103                                 'id': rpc['id'],
104                                 'result': None,
105                         })
106                         if not hasattr(e, 'StratumQuiet'):
107                                 self.logger.debug(fexc)
108                         return
109                 
110                 self.sendReply({
111                         'error': None,
112                         'id': rpc['id'],
113                         'result': rv,
114                 })
115         
116         def sendJob(self):
117                 target = self.server.defaultTarget
118                 if len(self.Usernames) == 1:
119                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
120                         if not dtarget is None:
121                                 target = dtarget
122                 bdiff = target2bdiff(target)
123                 if self.lastBDiff != bdiff:
124                         self.sendReply({
125                                 'id': None,
126                                 'method': 'mining.set_difficulty',
127                                 'params': [
128                                         bdiff
129                                 ],
130                         })
131                         self.lastBDiff = bdiff
132                 self.push(self.server.JobBytes)
133                 if len(self.JobTargets) > 4:
134                         self.JobTargets.popitem(False)
135                 self.JobTargets[self.server.JobId] = target
136         
137         def requestStratumUA(self):
138                 self.sendReply({
139                         'id': 7,
140                         'method': 'client.get_version',
141                         'params': (),
142                 })
143         
144         def _stratumreply_7(self, rpc):
145                 self.UA = rpc.get('result') or rpc
146         
147         def _stratum_mining_subscribe(self, UA = None, xid = None):
148                 if not UA is None:
149                         self.UA = UA
150                 xid = struct.pack('@P', id(self))
151                 self.extranonce1 = xid
152                 xid = b2a_hex(xid).decode('ascii')
153                 self.server._Clients[id(self)] = self
154                 self.changeTask(self.sendJob, 0)
155                 return [
156                         [
157                                 ['mining.notify', '%s1' % (xid,)],
158                                 ['mining.set_difficulty', '%s2' % (xid,)],
159                         ],
160                         xid,
161                         4,
162                 ]
163         
164         def close(self):
165                 try:
166                         del self.server._Clients[id(self)]
167                 except:
168                         pass
169                 super().close()
170         
171         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
172                 if username not in self.Usernames:
173                         raise StratumError(24, 'unauthorized-user', False)
174                 share = {
175                         'username': username,
176                         'remoteHost': self.remoteHost,
177                         'jobid': jobid,
178                         'extranonce1': self.extranonce1,
179                         'extranonce2': bytes.fromhex(extranonce2),
180                         'ntime': bytes.fromhex(ntime),
181                         'nonce': bytes.fromhex(nonce),
182                         'userAgent': self.UA,
183                         'submitProtocol': 'stratum',
184                 }
185                 if jobid in self.JobTargets:
186                         share['target'] = self.JobTargets[jobid]
187                 try:
188                         self.server.receiveShare(share)
189                 except RejectedShare as rej:
190                         rej = str(rej)
191                         errno = StratumCodes.get(rej, 20)
192                         raise StratumError(errno, rej, False)
193                 return True
194         
195         def _stratum_mining_authorize(self, username, password = None):
196                 try:
197                         valid = self.server.checkAuthentication(username, password)
198                 except:
199                         valid = False
200                 if valid:
201                         self.Usernames[username] = None
202                         self.changeTask(self.requestStratumUA, 0)
203                 return valid
204         
205         def _stratum_mining_get_transactions(self, jobid):
206                 try:
207                         (MC, wld) = self.server.getExistingStratumJob(jobid)
208                 except KeyError as e:
209                         e.StratumQuiet = True
210                         raise
211                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
212                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
213
214 class StratumServer(networkserver.AsyncSocketServer):
215         logger = logging.getLogger('StratumServer')
216         
217         waker = True
218         schMT = True
219         
220         extranonce1null = struct.pack('@P', 0)
221         
222         def __init__(self, *a, **ka):
223                 ka.setdefault('RequestHandlerClass', StratumHandler)
224                 super().__init__(*a, **ka)
225                 
226                 self._Clients = {}
227                 self._JobId = 0
228                 self.JobId = '%d' % (time(),)
229                 self.WakeRequest = None
230                 self.UpdateTask = None
231         
232         def checkAuthentication(self, username, password):
233                 return True
234         
235         def updateJob(self, wantClear = False):
236                 if self.UpdateTask:
237                         try:
238                                 self.rmSchedule(self.UpdateTask)
239                         except:
240                                 pass
241                 
242                 self._JobId += 1
243                 JobId = '%d %d' % (time(), self._JobId)
244                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
245                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
246                 
247                 if len(cb) > 96 - len(self.extranonce1null) - 4:
248                         if not self.rejecting:
249                                 self.logger.warning('Coinbase too big for stratum: disabling')
250                         self.rejecting = True
251                         self.boot_all()
252                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
253                         return
254                 elif self.rejecting:
255                         self.rejecting = False
256                         self.logger.info('Coinbase small enough for stratum again: reenabling')
257                 
258                 txn = deepcopy(merkleTree.data[0])
259                 cb += self.extranonce1null + b'Eloi'
260                 txn.setCoinbase(cb)
261                 txn.assemble()
262                 pos = txn.data.index(cb) + len(cb)
263                 
264                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
265                 
266                 self.JobBytes = json.dumps({
267                         'id': None,
268                         'method': 'mining.notify',
269                         'params': [
270                                 JobId,
271                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
272                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
273                                 b2a_hex(txn.data[pos:]).decode('ascii'),
274                                 steps,
275                                 '00000002',
276                                 b2a_hex(bits[::-1]).decode('ascii'),
277                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
278                                 not self.IsJobValid(self.JobId)
279                         ],
280                 }).encode('ascii') + b"\n"
281                 self.JobId = JobId
282                 
283                 self.WakeRequest = 1
284                 self.wakeup()
285                 
286                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
287         
288         def pre_schedule(self):
289                 if self.WakeRequest:
290                         self._wakeNodes()
291         
292         def _wakeNodes(self):
293                 self.WakeRequest = None
294                 C = self._Clients
295                 if not C:
296                         self.logger.debug('Nobody to wake up')
297                         return
298                 OC = len(C)
299                 self.logger.debug("%d clients to wake up..." % (OC,))
300                 
301                 now = time()
302                 
303                 for ic in list(C.values()):
304                         try:
305                                 ic.sendJob()
306                         except socket.error:
307                                 OC -= 1
308                                 # Ignore socket errors; let the main event loop take care of them later
309                         except:
310                                 OC -= 1
311                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
312                 
313                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
314         
315         def getTarget(*a, **ka):
316                 return None