1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from binascii import b2a_hex
20 from copy import deepcopy
28 from util import RejectedShare, swap32, target2bdiff, UniqueSessionIdManager
32 class StratumError(BaseException):
33 def __init__(self, errno, msg, tb = True):
34 self.StratumErrNo = errno
35 self.StratumErrMsg = msg
46 class StratumHandler(networkserver.SocketHandler):
47 logger = logging.getLogger('StratumHandler')
49 def __init__(self, *a, **ka):
50 super().__init__(*a, **ka)
51 self.remoteHost = self.addr[0]
53 self.server.schedule(self.sendLicenseNotice, time() + 4, errHandler=self)
54 self.set_terminator(b"\n")
57 self.JobTargets = collections.OrderedDict()
59 self.LicenseSent = agplcompliance._SourceFiles is None
61 def sendReply(self, ob):
62 return self.push(json.dumps(ob).encode('ascii') + b"\n")
64 def found_terminator(self):
65 inbuf = b"".join(self.incoming).decode('ascii')
72 rpc = json.loads(inbuf)
76 if 'method' not in rpc:
77 # Assume this is a reply to our request
78 funcname = '_stratumreply_%s' % (rpc['id'],)
79 if not hasattr(self, funcname):
82 getattr(self, funcname)(rpc)
83 except BaseException as e:
84 self.logger.debug(traceback.format_exc())
86 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
87 if not hasattr(self, funcname):
89 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
96 rv = getattr(self, funcname)(*rpc['params'])
97 except StratumError as e:
99 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
104 except BaseException as e:
105 fexc = traceback.format_exc()
107 'error': (20, str(e), fexc),
111 if not hasattr(e, 'StratumQuiet'):
112 self.logger.debug(fexc)
115 if rpc['id'] is None:
124 def sendLicenseNotice(self):
127 if not self.LicenseSent:
130 'method': 'client.show_message',
131 'params': ('This stratum server is licensed under the GNU Affero General Public License, version 3. You may download source code over stratum using the server.get_source method.',),
133 self.LicenseSent = True
136 target = self.server.defaultTarget
137 if len(self.Usernames) == 1:
138 dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
139 if not dtarget is None:
141 bdiff = target2bdiff(target)
142 if self.lastBDiff != bdiff:
145 'method': 'mining.set_difficulty',
150 self.lastBDiff = bdiff
151 self.push(self.server.JobBytes)
152 if len(self.JobTargets) > 4:
153 self.JobTargets.popitem(False)
154 self.JobTargets[self.server.JobId] = target
156 def requestStratumUA(self):
159 'method': 'client.get_version',
163 def _stratumreply_7(self, rpc):
164 self.UA = rpc.get('result') or rpc
166 def _stratum_mining_subscribe(self, UA = None, xid = None):
169 if not hasattr(self, '_sid'):
170 self._sid = UniqueSessionIdManager.get()
171 if self.server._Clients.get(self._sid) not in (self, None):
173 raise self.server.RaiseRedFlags(RuntimeError('issuing duplicate sessionid'))
174 xid = struct.pack('=I', self._sid) # NOTE: Assumes sessionids are 4 bytes
175 self.extranonce1 = xid
176 xid = b2a_hex(xid).decode('ascii')
177 self.server._Clients[id(self)] = self
178 self.changeTask(self.sendJob, 0)
181 ['mining.notify', '%s1' % (xid,)],
182 ['mining.set_difficulty', '%s2' % (xid,)],
189 if hasattr(self, '_sid'):
190 UniqueSessionIdManager.put(self._sid)
191 delattr(self, '_sid')
193 del self.server._Clients[id(self)]
198 def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
199 if username not in self.Usernames:
200 raise StratumError(24, 'unauthorized-user', False)
202 'username': username,
203 'remoteHost': self.remoteHost,
205 'extranonce1': self.extranonce1,
206 'extranonce2': bytes.fromhex(extranonce2)[:extranonce2sz],
207 'ntime': bytes.fromhex(ntime),
208 'nonce': bytes.fromhex(nonce),
209 'userAgent': self.UA,
210 'submitProtocol': 'stratum',
212 if jobid in self.JobTargets:
213 share['target'] = self.JobTargets[jobid]
215 self.server.receiveShare(share)
216 except RejectedShare as rej:
218 errno = StratumCodes.get(rej, 20)
219 raise StratumError(errno, rej, False)
222 def _stratum_mining_authorize(self, username, password = None):
224 valid = self.server.checkAuthentication(username, password)
228 self.Usernames[username] = None
229 self.changeTask(self.requestStratumUA, 0)
232 def _stratum_mining_get_transactions(self, jobid):
234 (MC, wld) = self.server.getExistingStratumJob(jobid)
235 except KeyError as e:
236 e.StratumQuiet = True
238 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
239 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
241 def _stratum_server_get_source(self, path = ''):
242 s = agplcompliance.get_source(path.encode('utf8'))
245 s[1] = s[1].decode('latin-1')
248 class StratumServer(networkserver.AsyncSocketServer):
249 logger = logging.getLogger('StratumServer')
254 extranonce1null = struct.pack('=I', 0) # NOTE: Assumes sessionids are 4 bytes
256 def __init__(self, *a, **ka):
257 ka.setdefault('RequestHandlerClass', StratumHandler)
258 super().__init__(*a, **ka)
262 self.JobId = '%d' % (time(),)
263 self.WakeRequest = None
264 self.UpdateTask = None
265 self._PendingQuickUpdates = set()
267 def checkAuthentication(self, username, password):
270 def updateJobOnly(self, wantClear = False, forceClean = False):
272 JobId = '%d %d' % (time(), self._JobId)
273 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
274 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
276 if len(cb) > 96 - len(self.extranonce1null) - 4:
277 if not self.rejecting:
278 self.logger.warning('Coinbase too big for stratum: disabling')
279 self.rejecting = True
281 self.UpdateTask = self.schedule(self.updateJob, time() + 10)
284 self.rejecting = False
285 self.logger.info('Coinbase small enough for stratum again: reenabling')
287 txn = deepcopy(merkleTree.data[0])
288 cb += self.extranonce1null + b'Eloi'
291 pos = txn.data.index(cb) + len(cb)
293 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
295 self.JobBytes = json.dumps({
297 'method': 'mining.notify',
300 b2a_hex(swap32(prevBlock)).decode('ascii'),
301 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
302 b2a_hex(txn.data[pos:]).decode('ascii'),
305 b2a_hex(bits[::-1]).decode('ascii'),
306 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
307 forceClean or not self.IsJobValid(self.JobId)
309 }).encode('ascii') + b"\n"
312 def updateJob(self, wantClear = False):
315 self.rmSchedule(self.UpdateTask)
319 self.updateJobOnly(wantClear=wantClear)
324 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
326 def doQuickUpdate(self):
327 PQU = self._PendingQuickUpdates
328 self._PendingQuickUpdates = set()
330 for ic in list(self._Clients.values()):
331 if PQU.intersection(ic.Usernames):
332 if self.JobId in ic.JobTargets:
333 self.updateJobOnly(wantClear=True, forceClean=True)
338 # Ignore socket errors; let the main event loop take care of them later
341 self.logger.debug('Error sending quickupdate job:\n' + traceback.format_exc())
343 self.logger.debug("Quickupdated %d clients" % (QUC,))
345 def quickDifficultyUpdate(self, username):
346 self._PendingQuickUpdates.add(username)
347 self.schedule(self.doQuickUpdate, time())
349 def pre_schedule(self):
353 def _wakeNodes(self):
354 self.WakeRequest = None
357 self.logger.debug('Nobody to wake up')
360 self.logger.debug("%d clients to wake up..." % (OC,))
364 for ic in list(C.values()):
369 # Ignore socket errors; let the main event loop take care of them later
372 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
374 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
376 def getTarget(*a, **ka):