Allow coinbase/append mutation for getmemorypool mining
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 from util import Bits2Target
80
81 workLog = {}
82 networkTarget = None
83 DupeShareHACK = {}
84
85 server = None
86 def updateBlocks():
87         if server:
88                 server.wakeLongpoll()
89
90 def blockChanged():
91         global DupeShareHACK
92         DupeShareHACK = {}
93         global MM, networkTarget, server
94         networkTarget = Bits2Target(MM.currentBlock[1])
95         workLog.clear()
96         updateBlocks()
97
98
99 from merklemaker import merkleMaker
100 MM = merkleMaker()
101 MM.__dict__.update(config.__dict__)
102 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
103 MM.clearCoinbaseTxn.assemble()
104 MM.makeCoinbaseTxn = makeCoinbaseTxn
105 MM.onBlockChange = blockChanged
106 MM.onBlockUpdate = updateBlocks
107 MM.start()
108
109
110 from binascii import b2a_hex
111 from copy import deepcopy
112 from struct import pack, unpack
113 from time import time
114 from util import RejectedShare, dblsha, hash2int, swap32
115 import jsonrpc
116 import threading
117 import traceback
118
119 gotwork = None
120 if hasattr(config, 'GotWorkURI'):
121         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
122
123 def submitGotwork(info):
124         try:
125                 gotwork.gotwork(info)
126         except:
127                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
128
129 db = None
130 if hasattr(config, 'DbOptions'):
131         import psycopg2
132         db = psycopg2.connect(**config.DbOptions)
133
134 def getBlockHeader(username):
135         MRD = MM.getMRD()
136         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
137         timestamp = pack('<L', int(time()))
138         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
139         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
140         return hdr
141
142 def getBlockTemplate(username):
143         MC = MM.getMC()
144         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
145         wliLen = coinbase[0]
146         wli = coinbase[1:wliLen+1]
147         workLog.setdefault(username, {})[wli] = (MC, time())
148         return MC
149
150 def YN(b):
151         if b is None:
152                 return None
153         return 'Y' if b else 'N'
154
155 def logShare(share):
156         if db is None:
157                 return
158         dbc = db.cursor()
159         rem_host = share.get('remoteHost', '?')
160         username = share['username']
161         reason = share.get('rejectReason', None)
162         upstreamResult = share.get('upstreamResult', None)
163         if '_origdata' in share:
164                 solution = share['_origdata']
165         else:
166                 solution = b2a_hex(swap32(share['data'])).decode('utf8')
167         #solution = b2a_hex(solution).decode('utf8')
168         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
169         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
170         dbc.execute(stmt, params)
171         db.commit()
172
173 RBDs = []
174 RBPs = []
175
176 from bitcoin.varlen import varlenEncode, varlenDecode
177 import bitcoin.txn
178 def assembleBlock(blkhdr, txlist):
179         payload = blkhdr
180         payload += varlenEncode(len(txlist))
181         for tx in txlist:
182                 payload += tx.data
183         return payload
184
185 def blockSubmissionThread(payload):
186         while True:
187                 try:
188                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
189                         break
190                 except:
191                         pass
192
193 def checkShare(share):
194         data = share['data']
195         data = data[:80]
196         (prevBlock, bits) = MM.currentBlock
197         sharePrevBlock = data[4:36]
198         if sharePrevBlock != prevBlock:
199                 if sharePrevBlock == MM.lastBlock[0]:
200                         raise RejectedShare('stale-prevblk')
201                 raise RejectedShare('bad-prevblk')
202         
203         # TODO: use userid
204         username = share['username']
205         if username not in workLog:
206                 raise RejectedShare('unknown-user')
207         
208         if data[72:76] != bits:
209                 raise RejectedShare('bad-diffbits')
210         if data[:4] != b'\1\0\0\0':
211                 raise RejectedShare('bad-version')
212         
213         shareMerkleRoot = data[36:68]
214         if 'blkdata' in share:
215                 pl = share['blkdata']
216                 (txncount, pl) = varlenDecode(pl)
217                 cbtxn = bitcoin.txn.Txn(pl)
218                 cbtxn.disassemble(retExtra=True)
219                 coinbase = cbtxn.getCoinbase()
220                 wliLen = coinbase[0]
221                 wli = coinbase[1:wliLen+1]
222                 mode = 'MC'
223                 moden = 1
224         else:
225                 wli = shareMerkleRoot
226                 mode = 'MRD'
227                 moden = 0
228         
229         MWL = workLog[username]
230         if wli not in MWL:
231                 raise RejectedShare('unknown-work')
232         (wld, t) = MWL[wli]
233         share[mode] = wld
234         
235         if data in DupeShareHACK:
236                 raise RejectedShare('duplicate')
237         DupeShareHACK[data] = None
238         
239         shareTimestamp = unpack('<L', data[68:72])[0]
240         shareTime = share['time'] = time()
241         if shareTime < t - 120:
242                 raise RejectedShare('stale-work')
243         if shareTimestamp < shareTime - 300:
244                 raise RejectedShare('time-too-old')
245         if shareTimestamp > shareTime + 7200:
246                 raise RejectedShare('time-too-new')
247         
248         blkhash = dblsha(data)
249         if blkhash[28:] != b'\0\0\0\0':
250                 raise RejectedShare('H-not-zero')
251         blkhashn = hash2int(blkhash)
252         
253         global networkTarget
254         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
255         logfunc('BLKHASH: %64x' % (blkhashn,))
256         logfunc(' TARGET: %64x' % (networkTarget,))
257         
258         workMerkleTree = wld[1]
259         workCoinbase = wld[2]
260         
261         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
262         txlist = workMerkleTree.data
263         cbtxn = txlist[0]
264         cbtxn.setCoinbase(workCoinbase)
265         cbtxn.assemble()
266         
267         if blkhashn <= networkTarget:
268                 logfunc("Submitting upstream")
269                 if not moden:
270                         RBDs.append( deepcopy( (data, txlist) ) )
271                         payload = assembleBlock(data, txlist)
272                 else:
273                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
274                         payload = share['data'] + share['blkdata']
275                 logfunc('Real block payload: %s' % (payload,))
276                 RBPs.append(payload)
277                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
278                 bcnode.submitBlock(payload)
279                 share['upstreamResult'] = True
280                 MM.updateBlock(blkhash)
281         
282         # Gotwork hack...
283         if gotwork and blkhashn <= config.GotWorkTarget:
284                 try:
285                         coinbaseMrkl = cbtxn.data
286                         coinbaseMrkl += blkhash
287                         steps = workMerkleTree._steps
288                         coinbaseMrkl += pack('B', len(steps))
289                         for step in steps:
290                                 coinbaseMrkl += step
291                         coinbaseMrkl += b"\0\0\0\0"
292                         info = {}
293                         info['hash'] = b2a_hex(blkhash).decode('ascii')
294                         info['header'] = b2a_hex(data).decode('ascii')
295                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
296                         thr = threading.Thread(target=submitGotwork, args=(info,))
297                         thr.daemon = True
298                         thr.start()
299                 except:
300                         checkShare.logger.warning('Failed to build gotwork request')
301         
302         if moden:
303                 cbpre = cbtxn.getCoinbase()
304                 if coinbase[:len(cbpre)] != cbpre:
305                         raise RejectedShare('bad-cb-prefix')
306                 
307                 # Filter out known "I support" flags, to prevent exploits
308                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
309                         if ff in coinbase:
310                                 raise RejectedShare('bad-cb-flag')
311                 
312                 if len(coinbase) > 100:
313                         raise RejectedShare('bad-cb-length')
314                 
315                 cbtxn = deepcopy(cbtxn)
316                 cbtxn.setCoinbase(coinbase)
317                 cbtxn.assemble()
318                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
319                         raise RejectedShare('bad-txnmrklroot')
320                 
321                 txlist = [cbtxn,] + txlist[1:]
322                 allowed = assembleBlock(data, txlist)
323                 if allowed != share['data'] + share['blkdata']:
324                         raise RejectedShare('bad-txns')
325         
326         logShare(share)
327 checkShare.logger = logging.getLogger('checkShare')
328
329 def receiveShare(share):
330         # TODO: username => userid
331         try:
332                 checkShare(share)
333         except RejectedShare as rej:
334                 share['rejectReason'] = str(rej)
335                 logShare(share)
336                 raise
337         # TODO
338
339 def newBlockNotification(signum, frame):
340         logging.getLogger('newBlockNotification').info('Received new block notification')
341         MM.updateMerkleTree()
342         # TODO: Force RESPOND TO LONGPOLLS?
343         pass
344
345 from signal import signal, SIGUSR1
346 signal(SIGUSR1, newBlockNotification)
347
348
349 import os
350 import os.path
351 import pickle
352 import signal
353 import sys
354 from time import sleep
355 import traceback
356
357 SAVE_STATE_FILENAME = 'eloipool.worklog'
358
359 def stopServers():
360         logger = logging.getLogger('stopServers')
361         
362         logger.info('Stopping servers...')
363         global bcnode, server
364         servers = (bcnode, server)
365         for s in servers:
366                 s.keepgoing = False
367         for s in servers:
368                 s.wakeup()
369         i = 0
370         while True:
371                 sl = []
372                 for s in servers:
373                         if s.running:
374                                 sl.append(s.__class__.__name__)
375                 if not sl:
376                         break
377                 i += 1
378                 if i >= 0x100:
379                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
380                         break
381                 sleep(0.01)
382         
383         for s in servers:
384                 for fd in s._fd.keys():
385                         os.close(fd)
386
387 def saveState():
388         logger = logging.getLogger('saveState')
389         
390         # Then, save data needed to resume work
391         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
392         i = 0
393         while True:
394                 try:
395                         with open(SAVE_STATE_FILENAME, 'wb') as f:
396                                 pickle.dump( (workLog, DupeShareHACK), f )
397                         break
398                 except:
399                         i += 1
400                         if i >= 0x10000:
401                                 logger.error('Failed to save work\n' + traceback.format_exc())
402                                 try:
403                                         os.unlink(SAVE_STATE_FILENAME)
404                                 except:
405                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
406
407 def exit():
408         stopServers()
409         saveState()
410         logging.getLogger('exit').info('Goodbye...')
411         os.kill(os.getpid(), signal.SIGTERM)
412         sys.exit(0)
413
414 def restart():
415         stopServers()
416         saveState()
417         logging.getLogger('restart').info('Restarting...')
418         try:
419                 os.execv(sys.argv[0], sys.argv)
420         except:
421                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
422
423 def restoreState():
424         if not os.path.exists(SAVE_STATE_FILENAME):
425                 return
426         
427         global workLog, DupeShareHACK
428         
429         logger = logging.getLogger('restoreState')
430         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
431         try:
432                 with open(SAVE_STATE_FILENAME, 'rb') as f:
433                         data = pickle.load(f)
434                         workLog = data[0]
435                         DupeShareHACK = data[1]
436         except:
437                 logger.error('Failed to restore state\n' + traceback.format_exc())
438                 return
439         logger.info('State restored successfully')
440
441
442 from jsonrpcserver import JSONRPCListener, JSONRPCServer
443 import interactivemode
444 from networkserver import NetworkListener
445 import threading
446
447 if __name__ == "__main__":
448         LSbc = []
449         if not hasattr(config, 'BitcoinNodeAddresses'):
450                 config.BitcoinNodeAddresses = ()
451         for a in config.BitcoinNodeAddresses:
452                 LSbc.append(NetworkListener(bcnode, a))
453         
454         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
455                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
456         
457         server = JSONRPCServer()
458         if hasattr(config, 'JSONRPCAddress'):
459                 if not hasattr(config, 'JSONRPCAddresses'):
460                         config.JSONRPCAddresses = []
461                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
462         LS = []
463         for a in config.JSONRPCAddresses:
464                 LS.append(JSONRPCListener(server, a))
465         if hasattr(config, 'SecretUser'):
466                 server.SecretUser = config.SecretUser
467         server.aux = MM.CoinbaseAux
468         server.getBlockHeader = getBlockHeader
469         server.getBlockTemplate = getBlockTemplate
470         server.receiveShare = receiveShare
471         server.RaiseRedFlags = RaiseRedFlags
472         
473         restoreState()
474         
475         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
476         bcnode_thr.daemon = True
477         bcnode_thr.start()
478         
479         server.serve_forever()