Implemented authentication modules, and added 2 modules: allowall and simplefile
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 import collections
19 from copy import deepcopy
20 import json
21 import logging
22 import networkserver
23 import socket
24 import struct
25 from time import time
26 import traceback
27 from util import RejectedShare, swap32, target2bdiff
28
29 class StratumError(BaseException):
30         def __init__(self, errno, msg, tb = True):
31                 self.StratumErrNo = errno
32                 self.StratumErrMsg = msg
33                 self.StratumTB = tb
34
35 StratumCodes = {
36         'stale-prevblk': 21,
37         'stale-work': 21,
38         'duplicate': 22,
39         'H-not-zero': 23,
40         'high-hash': 23,
41 }
42
43 class StratumHandler(networkserver.SocketHandler):
44         logger = logging.getLogger('StratumHandler')
45         
46         def __init__(self, *a, **ka):
47                 super().__init__(*a, **ka)
48                 self.remoteHost = self.addr[0]
49                 self.changeTask(None)
50                 self.set_terminator(b"\n")
51                 self.Usernames = {}
52                 self.lastBDiff = None
53                 self.JobTargets = collections.OrderedDict()
54                 self.UA = None
55         
56         def sendReply(self, ob):
57                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
58         
59         def found_terminator(self):
60                 inbuf = b"".join(self.incoming).decode('ascii')
61                 self.incoming = []
62                 
63                 if not inbuf:
64                         return
65                 
66                 try:
67                         rpc = json.loads(inbuf)
68                 except ValueError:
69                         self.boot()
70                         return
71                 if 'method' not in rpc:
72                         # Assume this is a reply to our request
73                         funcname = '_stratumreply_%s' % (rpc['id'],)
74                         if not hasattr(self, funcname):
75                                 return
76                         try:
77                                 getattr(self, funcname)(rpc)
78                         except BaseException as e:
79                                 self.logger.debug(traceback.format_exc())
80                         return
81                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
82                 if not hasattr(self, funcname):
83                         self.sendReply({
84                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
85                                 'id': rpc['id'],
86                                 'result': None,
87                         })
88                         return
89                 
90                 try:
91                         rv = getattr(self, funcname)(*rpc['params'])
92                 except StratumError as e:
93                         self.sendReply({
94                                 'error': (e.StratumErrNo, e.StratumErrMsg, traceback.format_exc() if e.StratumTB else None),
95                                 'id': rpc['id'],
96                                 'result': None,
97                         })
98                         return
99                 except BaseException as e:
100                         fexc = traceback.format_exc()
101                         self.sendReply({
102                                 'error': (20, str(e), fexc),
103                                 'id': rpc['id'],
104                                 'result': None,
105                         })
106                         if not hasattr(e, 'StratumQuiet'):
107                                 self.logger.debug(fexc)
108                         return
109                 
110                 self.sendReply({
111                         'error': None,
112                         'id': rpc['id'],
113                         'result': rv,
114                 })
115         
116         def sendJob(self):
117                 target = self.server.defaultTarget
118                 if len(self.Usernames) == 1:
119                         dtarget = self.server.getTarget(next(iter(self.Usernames)), time())
120                         if not dtarget is None:
121                                 target = dtarget
122                 bdiff = target2bdiff(target)
123                 if self.lastBDiff != bdiff:
124                         self.sendReply({
125                                 'id': None,
126                                 'method': 'mining.set_difficulty',
127                                 'params': [
128                                         bdiff
129                                 ],
130                         })
131                         self.lastBDiff = bdiff
132                 self.push(self.server.JobBytes)
133                 if len(self.JobTargets) > 4:
134                         self.JobTargets.popitem(False)
135                 self.JobTargets[self.server.JobId] = target
136         
137         def requestStratumUA(self):
138                 self.sendReply({
139                         'id': 7,
140                         'method': 'client.get_version',
141                         'params': (),
142                 })
143         
144         def _stratumreply_7(self, rpc):
145                 self.UA = rpc.get('result') or rpc
146         
147         def _stratum_mining_subscribe(self):
148                 xid = struct.pack('@P', id(self))
149                 self.extranonce1 = xid
150                 xid = b2a_hex(xid).decode('ascii')
151                 self.server._Clients[id(self)] = self
152                 self.changeTask(self.sendJob, 0)
153                 return [
154                         [
155                                 ['mining.notify', '%s1' % (xid,)],
156                                 ['mining.set_difficulty', '%s2' % (xid,)],
157                         ],
158                         xid,
159                         4,
160                 ]
161         
162         def close(self):
163                 try:
164                         del self.server._Clients[id(self)]
165                 except:
166                         pass
167                 super().close()
168         
169         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
170                 if username not in self.Usernames:
171                         raise StratumError(24, 'unauthorized-user', False)
172                 share = {
173                         'username': username,
174                         'remoteHost': self.remoteHost,
175                         'jobid': jobid,
176                         'extranonce1': self.extranonce1,
177                         'extranonce2': bytes.fromhex(extranonce2),
178                         'ntime': bytes.fromhex(ntime),
179                         'nonce': bytes.fromhex(nonce),
180                         'userAgent': self.UA,
181                         'submitProtocol': 'stratum',
182                 }
183                 if jobid in self.JobTargets:
184                         share['target'] = self.JobTargets[jobid]
185                 try:
186                         self.server.receiveShare(share)
187                 except RejectedShare as rej:
188                         rej = str(rej)
189                         errno = StratumCodes.get(rej, 20)
190                         raise StratumError(errno, rej, False)
191                 return True
192         
193         def _stratum_mining_authorize(self, username, password = None):
194                 try:
195                         valid = self.server.checkAuthentication(username, password)
196                 except:
197                         valid = False
198                 if valid:
199                         self.Usernames[username] = None
200                         self.changeTask(self.requestStratumUA, 0)
201                 return valid
202         
203         def _stratum_mining_get_transactions(self, jobid):
204                 try:
205                         (MC, wld) = self.server.getExistingStratumJob(jobid)
206                 except KeyError as e:
207                         e.StratumQuiet = True
208                         raise
209                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
210                 return list(b2a_hex(txn.data).decode('ascii') for txn in merkleTree.data[1:])
211
212 class StratumServer(networkserver.AsyncSocketServer):
213         logger = logging.getLogger('StratumServer')
214         
215         waker = True
216         schMT = True
217         
218         extranonce1null = struct.pack('@P', 0)
219         
220         def __init__(self, *a, **ka):
221                 ka.setdefault('RequestHandlerClass', StratumHandler)
222                 super().__init__(*a, **ka)
223                 
224                 self._Clients = {}
225                 self._JobId = 0
226                 self.JobId = '%d' % (time(),)
227                 self.WakeRequest = None
228                 self.UpdateTask = None
229         
230         def checkAuthentication(self, username, password):
231                 return True
232         
233         def updateJob(self, wantClear = False):
234                 if self.UpdateTask:
235                         try:
236                                 self.rmSchedule(self.UpdateTask)
237                         except:
238                                 pass
239                 
240                 self._JobId += 1
241                 JobId = '%d %d' % (time(), self._JobId)
242                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
243                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
244                 
245                 if len(cb) > 96 - len(self.extranonce1null) - 4:
246                         if not self.rejecting:
247                                 self.logger.warning('Coinbase too big for stratum: disabling')
248                         self.rejecting = True
249                         self.boot_all()
250                         self.UpdateTask = self.schedule(self.updateJob, time() + 10)
251                         return
252                 elif self.rejecting:
253                         self.rejecting = False
254                         self.logger.info('Coinbase small enough for stratum again: reenabling')
255                 
256                 txn = deepcopy(merkleTree.data[0])
257                 cb += self.extranonce1null + b'Eloi'
258                 txn.setCoinbase(cb)
259                 txn.assemble()
260                 pos = txn.data.index(cb) + len(cb)
261                 
262                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
263                 
264                 self.JobBytes = json.dumps({
265                         'id': None,
266                         'method': 'mining.notify',
267                         'params': [
268                                 JobId,
269                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
270                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
271                                 b2a_hex(txn.data[pos:]).decode('ascii'),
272                                 steps,
273                                 '00000002',
274                                 b2a_hex(bits[::-1]).decode('ascii'),
275                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
276                                 not self.IsJobValid(self.JobId)
277                         ],
278                 }).encode('ascii') + b"\n"
279                 self.JobId = JobId
280                 
281                 self.WakeRequest = 1
282                 self.wakeup()
283                 
284                 self.UpdateTask = self.schedule(self.updateJob, time() + 55)
285         
286         def pre_schedule(self):
287                 if self.WakeRequest:
288                         self._wakeNodes()
289         
290         def _wakeNodes(self):
291                 self.WakeRequest = None
292                 C = self._Clients
293                 if not C:
294                         self.logger.debug('Nobody to wake up')
295                         return
296                 OC = len(C)
297                 self.logger.debug("%d clients to wake up..." % (OC,))
298                 
299                 now = time()
300                 
301                 for ic in list(C.values()):
302                         try:
303                                 ic.sendJob()
304                         except socket.error:
305                                 OC -= 1
306                                 # Ignore socket errors; let the main event loop take care of them later
307                         except:
308                                 OC -= 1
309                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
310                 
311                 self.logger.debug('New job sent to %d clients in %.3f seconds' % (OC, time() - now))
312         
313         def getTarget(*a, **ka):
314                 return None