Initial support for stratum miners
[bitcoin:eloipool.git] / stratumserver.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 from copy import deepcopy
19 import json
20 import logging
21 import networkserver
22 #import re
23 import struct
24 from time import time
25 #import traceback
26 from util import RejectedShare, swap32
27
28 class StratumError(BaseException):
29         def __init__(self, errno, msg):
30                 self.StratumErrNo = errno
31                 self.StratumErrMsg = msg
32
33 StratumCodes = {
34         'stale-prevblk': 21,
35         'stale-work': 21,
36         'duplicate': 22,
37         'H-not-zero': 23,
38         'high-hash': 23,
39 }
40
41 class StratumHandler(networkserver.SocketHandler):
42         logger = logging.getLogger('StratumHandler')
43         
44         def __init__(self, *a, **ka):
45                 super().__init__(*a, **ka)
46                 self.remoteHost = self.addr[0]
47                 self.changeTask(None)
48                 self.set_terminator(b"\n")
49                 self.Usernames = {}
50         
51         def sendReply(self, ob):
52                 return self.push(json.dumps(ob).encode('ascii') + b"\n")
53         
54         def found_terminator(self):
55                 inbuf = b"".join(self.incoming).decode('ascii')
56                 self.incoming = []
57                 
58                 if not inbuf:
59                         return
60                 
61                 rpc = json.loads(inbuf)
62                 funcname = '_stratum_%s' % (rpc['method'].replace('.', '_'),)
63                 if not hasattr(self, funcname):
64                         self.sendReply({
65                                 'error': [-3, "Method '%s' not found" % (rpc['method'],), None],
66                                 'id': rpc['id'],
67                                 'result': None,
68                         })
69                         return
70                 
71                 try:
72                         rv = getattr(self, funcname)(*rpc['params'])
73                 except StratumError as e:
74                         self.sendReply({
75                                 'error': (e.StratumErrNo, e.StratumErrMsg, None),
76                                 'id': rpc['id'],
77                                 'result': None,
78                         })
79                         return
80                 
81                 self.sendReply({
82                         'error': None,
83                         'id': rpc['id'],
84                         'result': rv,
85                 })
86         
87         def sendJob(self):
88                 self.push(self.server.JobBytes)
89         
90         def _stratum_mining_subscribe(self):
91                 xid = struct.pack('@P', id(self))
92                 self.extranonce1 = xid
93                 xid = b2a_hex(xid).decode('ascii')
94                 self.server._Clients[id(self)] = self
95                 self.changeTask(self.sendJob, 0)
96                 return [
97                         [
98                                 ['mining.notify', '%s1' % (xid,)],
99                                 ['mining.set_difficulty', '%s2' % (xid,)],
100                         ],
101                         xid,
102                         4,
103                 ]
104         
105         def handle_close(self):
106                 try:
107                         del self.server._Clients[id(self)]
108                 except:
109                         pass
110                 super().handle_close()
111         
112         def _stratum_mining_submit(self, username, jobid, extranonce2, ntime, nonce):
113                 if username not in self.Usernames:
114                         pass # TODO
115                 share = {
116                         'username': username,
117                         'remoteHost': self.remoteHost,
118                         'jobid': jobid,
119                         'extranonce1': self.extranonce1,
120                         'extranonce2': bytes.fromhex(extranonce2),
121                         'ntime': bytes.fromhex(ntime),
122                         'nonce': bytes.fromhex(nonce),
123                 }
124                 try:
125                         self.server.receiveShare(share)
126                 except RejectedShare as rej:
127                         rej = str(rej)
128                         errno = StratumCodes.get(rej, 20)
129                         raise StratumError(errno, rej)
130                 return True
131         
132         def checkAuthentication(self, username, password):
133                 return True
134         
135         def _stratum_mining_authorize(self, username, password = None):
136                 try:
137                         valid = self.checkAuthentication(username, password)
138                 except:
139                         valid = False
140                 if valid:
141                         self.Usernames[username] = None
142                 return valid
143
144 class StratumServer(networkserver.AsyncSocketServer):
145         logger = logging.getLogger('StratumServer')
146         
147         waker = True
148         
149         extranonce1null = struct.pack('@P', 0)
150         
151         def __init__(self, *a, **ka):
152                 ka.setdefault('RequestHandlerClass', StratumHandler)
153                 super().__init__(*a, **ka)
154                 
155                 self._Clients = {}
156                 self._JobId = 0
157                 self.JobId = '%d' % (time(),)
158                 self.WakeRequest = None
159         
160         def updateJob(self, wantClear = False):
161                 self._JobId += 1
162                 JobId = '%d %d' % (time(), self._JobId)
163                 (MC, wld) = self.getStratumJob(JobId, wantClear=wantClear)
164                 (height, merkleTree, cb, prevBlock, bits) = MC[:5]
165                 
166                 if len(cb) > 96 - len(self.extranonce1null) - 4:
167                         self.logger.warning('Coinbase too big for Stratum: GIVING CLIENTS INVALID JOBS')
168                         # TODO: shutdown stratum
169                         # TODO: restart automatically when coinbase works?
170                 
171                 txn = deepcopy(merkleTree.data[0])
172                 cb += self.extranonce1null + b'Eloi'
173                 txn.setCoinbase(cb)
174                 txn.assemble()
175                 pos = txn.data.index(cb) + len(cb)
176                 
177                 steps = list(b2a_hex(h).decode('ascii') for h in merkleTree._steps)
178                 
179                 self.JobBytes = json.dumps({
180                         'id': None,
181                         'method': 'mining.notify',
182                         'params': [
183                                 JobId,
184                                 b2a_hex(swap32(prevBlock)).decode('ascii'),
185                                 b2a_hex(txn.data[:pos - len(self.extranonce1null) - 4]).decode('ascii'),
186                                 b2a_hex(txn.data[pos:]).decode('ascii'),
187                                 steps,
188                                 '00000002',
189                                 b2a_hex(bits[::-1]).decode('ascii'),
190                                 b2a_hex(struct.pack('>L', int(time()))).decode('ascii'),
191                                 False
192                         ],
193                 }).encode('ascii') + b"\n"
194                 self.JobId = JobId
195                 
196                 self.WakeRequest = 1
197                 self.wakeup()
198         
199         def pre_schedule(self):
200                 if self.WakeRequest:
201                         self._wakeNodes()
202         
203         def _wakeNodes(self):
204                 self.WakeRequest = None
205                 C = self._Clients
206                 if not C:
207                         self.logger.info('Nobody to wake up')
208                         return
209                 OC = len(C)
210                 self.logger.debug("%d clients to wake up..." % (OC,))
211                 
212                 now = time()
213                 
214                 for ic in list(C.values()):
215                         try:
216                                 ic.sendJob()
217                         except socket.error:
218                                 OC -= 1
219                                 # Ignore socket errors; let the main event loop take care of them later
220                         except:
221                                 OC -= 1
222                                 self.logger.debug('Error sending new job:\n' + traceback.format_exc())
223                 
224                 self.logger.info('New job sent to %d clients in %.3f seconds' % (OC, time() - now))