Merge branch 'bugfix_race_coinbase_2' into bugfix_race_coinbase_3
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpcserver
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpcserver._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int, swap32
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 db = None
132 if hasattr(config, 'DbOptions'):
133         import psycopg2
134         db = psycopg2.connect(**config.DbOptions)
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return hdr
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 def YN(b):
153         if b is None:
154                 return None
155         return 'Y' if b else 'N'
156
157 def logShare(share):
158         if db is None:
159                 return
160         dbc = db.cursor()
161         rem_host = share.get('remoteHost', '?')
162         username = share['username']
163         reason = share.get('rejectReason', None)
164         upstreamResult = share.get('upstreamResult', None)
165         if '_origdata' in share:
166                 solution = share['_origdata']
167         else:
168                 solution = b2a_hex(swap32(share['data'])).decode('utf8')
169         #solution = b2a_hex(solution).decode('utf8')
170         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
171         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
172         dbc.execute(stmt, params)
173         db.commit()
174
175 RBDs = []
176 RBPs = []
177
178 from bitcoin.varlen import varlenEncode, varlenDecode
179 import bitcoin.txn
180 def assembleBlock(blkhdr, txlist):
181         payload = blkhdr
182         payload += varlenEncode(len(txlist))
183         for tx in txlist:
184                 payload += tx.data
185         return payload
186
187 def blockSubmissionThread(payload):
188         while True:
189                 try:
190                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
191                         break
192                 except:
193                         pass
194
195 def checkShare(share):
196         data = share['data']
197         data = data[:80]
198         (prevBlock, bits) = MM.currentBlock
199         sharePrevBlock = data[4:36]
200         if sharePrevBlock != prevBlock:
201                 if sharePrevBlock == MM.lastBlock[0]:
202                         raise RejectedShare('stale-prevblk')
203                 raise RejectedShare('bad-prevblk')
204         
205         # TODO: use userid
206         username = share['username']
207         if username not in workLog:
208                 raise RejectedShare('unknown-user')
209         
210         if data[72:76] != bits:
211                 raise RejectedShare('bad-diffbits')
212         if data[:4] != b'\1\0\0\0':
213                 raise RejectedShare('bad-version')
214         
215         shareMerkleRoot = data[36:68]
216         if 'blkdata' in share:
217                 pl = share['blkdata']
218                 (txncount, pl) = varlenDecode(pl)
219                 cbtxn = bitcoin.txn.Txn(pl)
220                 cbtxn.disassemble(retExtra=True)
221                 coinbase = cbtxn.getCoinbase()
222                 wliLen = coinbase[0]
223                 wli = coinbase[1:wliLen+1]
224                 mode = 'MC'
225                 moden = 1
226         else:
227                 wli = shareMerkleRoot
228                 mode = 'MRD'
229                 moden = 0
230         
231         MWL = workLog[username]
232         if wli not in MWL:
233                 raise RejectedShare('unknown-work')
234         (wld, issueT) = MWL[wli]
235         share[mode] = wld
236         
237         if data in DupeShareHACK:
238                 raise RejectedShare('duplicate')
239         DupeShareHACK[data] = None
240         
241         shareTime = share['time'] = time()
242         
243         blkhash = dblsha(data)
244         if blkhash[28:] != b'\0\0\0\0':
245                 raise RejectedShare('H-not-zero')
246         blkhashn = hash2int(blkhash)
247         
248         global networkTarget
249         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
250         logfunc('BLKHASH: %64x' % (blkhashn,))
251         logfunc(' TARGET: %64x' % (networkTarget,))
252         
253         workMerkleTree = wld[1]
254         workCoinbase = wld[2]
255         
256         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
257         txlist = workMerkleTree.data
258         txlist = [deepcopy(txlist[0]),] + txlist[1:]
259         cbtxn = txlist[0]
260         cbtxn.setCoinbase(workCoinbase)
261         cbtxn.assemble()
262         
263         if blkhashn <= networkTarget:
264                 logfunc("Submitting upstream")
265                 if not moden:
266                         RBDs.append( deepcopy( (data, txlist) ) )
267                         payload = assembleBlock(data, txlist)
268                 else:
269                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
270                         payload = share['data'] + share['blkdata']
271                 logfunc('Real block payload: %s' % (payload,))
272                 RBPs.append(payload)
273                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
274                 bcnode.submitBlock(payload)
275                 share['upstreamResult'] = True
276                 MM.updateBlock(blkhash)
277         
278         # Gotwork hack...
279         if gotwork and blkhashn <= config.GotWorkTarget:
280                 try:
281                         coinbaseMrkl = cbtxn.data
282                         coinbaseMrkl += blkhash
283                         steps = workMerkleTree._steps
284                         coinbaseMrkl += pack('B', len(steps))
285                         for step in steps:
286                                 coinbaseMrkl += step
287                         coinbaseMrkl += b"\0\0\0\0"
288                         info = {}
289                         info['hash'] = b2a_hex(blkhash).decode('ascii')
290                         info['header'] = b2a_hex(data).decode('ascii')
291                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
292                         thr = threading.Thread(target=submitGotwork, args=(info,))
293                         thr.daemon = True
294                         thr.start()
295                 except:
296                         checkShare.logger.warning('Failed to build gotwork request')
297         
298         shareTimestamp = unpack('<L', data[68:72])[0]
299         if shareTime < issueT - 120:
300                 raise RejectedShare('stale-work')
301         if shareTimestamp < shareTime - 300:
302                 raise RejectedShare('time-too-old')
303         if shareTimestamp > shareTime + 7200:
304                 raise RejectedShare('time-too-new')
305         
306         if moden:
307                 cbpre = cbtxn.getCoinbase()
308                 cbpreLen = len(cbpre)
309                 if coinbase[:cbpreLen] != cbpre:
310                         raise RejectedShare('bad-cb-prefix')
311                 
312                 # Filter out known "I support" flags, to prevent exploits
313                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
314                         if coinbase.find(ff) > cbpreLen - len(ff):
315                                 raise RejectedShare('bad-cb-flag')
316                 
317                 if len(coinbase) > 100:
318                         raise RejectedShare('bad-cb-length')
319                 
320                 cbtxn.setCoinbase(coinbase)
321                 cbtxn.assemble()
322                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
323                         raise RejectedShare('bad-txnmrklroot')
324                 
325                 allowed = assembleBlock(data, txlist)
326                 if allowed != share['data'] + share['blkdata']:
327                         raise RejectedShare('bad-txns')
328         
329         logShare(share)
330 checkShare.logger = logging.getLogger('checkShare')
331
332 def receiveShare(share):
333         # TODO: username => userid
334         try:
335                 checkShare(share)
336         except RejectedShare as rej:
337                 share['rejectReason'] = str(rej)
338                 logShare(share)
339                 raise
340         # TODO
341
342 def newBlockNotification(signum, frame):
343         logging.getLogger('newBlockNotification').info('Received new block notification')
344         MM.updateMerkleTree()
345         # TODO: Force RESPOND TO LONGPOLLS?
346         pass
347
348 from signal import signal, SIGUSR1
349 signal(SIGUSR1, newBlockNotification)
350
351
352 import os
353 import os.path
354 import pickle
355 import signal
356 import sys
357 from time import sleep
358 import traceback
359
360 SAVE_STATE_FILENAME = 'eloipool.worklog'
361
362 def stopServers():
363         logger = logging.getLogger('stopServers')
364         
365         logger.info('Stopping servers...')
366         global bcnode, server
367         servers = (bcnode, server)
368         for s in servers:
369                 s.keepgoing = False
370         for s in servers:
371                 s.wakeup()
372         i = 0
373         while True:
374                 sl = []
375                 for s in servers:
376                         if s.running:
377                                 sl.append(s.__class__.__name__)
378                 if not sl:
379                         break
380                 i += 1
381                 if i >= 0x100:
382                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
383                         break
384                 sleep(0.01)
385         
386         for s in servers:
387                 for fd in s._fd.keys():
388                         os.close(fd)
389
390 def saveState(t = None):
391         logger = logging.getLogger('saveState')
392         
393         # Then, save data needed to resume work
394         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
395         i = 0
396         while True:
397                 try:
398                         with open(SAVE_STATE_FILENAME, 'wb') as f:
399                                 pickle.dump(t, f)
400                                 pickle.dump(DupeShareHACK, f)
401                                 pickle.dump(workLog, f)
402                         break
403                 except:
404                         i += 1
405                         if i >= 0x10000:
406                                 logger.error('Failed to save work\n' + traceback.format_exc())
407                                 try:
408                                         os.unlink(SAVE_STATE_FILENAME)
409                                 except:
410                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
411
412 def exit():
413         t = time()
414         stopServers()
415         saveState(t)
416         logging.getLogger('exit').info('Goodbye...')
417         os.kill(os.getpid(), signal.SIGTERM)
418         sys.exit(0)
419
420 def restart():
421         t = time()
422         stopServers()
423         saveState(t)
424         logging.getLogger('restart').info('Restarting...')
425         try:
426                 os.execv(sys.argv[0], sys.argv)
427         except:
428                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
429
430 def restoreState():
431         if not os.path.exists(SAVE_STATE_FILENAME):
432                 return
433         
434         global workLog, DupeShareHACK
435         
436         logger = logging.getLogger('restoreState')
437         s = os.stat(SAVE_STATE_FILENAME)
438         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
439         try:
440                 with open(SAVE_STATE_FILENAME, 'rb') as f:
441                         t = pickle.load(f)
442                         if type(t) == tuple:
443                                 workLog = t[0]
444                                 DupeShareHACK = t[1]
445                                 t = None
446                         else:
447                                 if isinstance(t, dict):
448                                         DupeShareHACK = t
449                                         t = None
450                                 else:
451                                         DupeShareHACK = pickle.load(f)
452                                 
453                                 if s.st_mtime + 120 >= time():
454                                         workLog = pickle.load(f)
455                                 else:
456                                         logger.debug('Skipping restore of expired workLog')
457         except:
458                 logger.error('Failed to restore state\n' + traceback.format_exc())
459                 return
460         logger.info('State restored successfully')
461         if t:
462                 logger.info('Total downtime: %g seconds' % (time() - t,))
463
464
465 from jsonrpcserver import JSONRPCListener, JSONRPCServer
466 import interactivemode
467 from networkserver import NetworkListener
468 import threading
469
470 if __name__ == "__main__":
471         LSbc = []
472         if not hasattr(config, 'BitcoinNodeAddresses'):
473                 config.BitcoinNodeAddresses = ()
474         for a in config.BitcoinNodeAddresses:
475                 LSbc.append(NetworkListener(bcnode, a))
476         
477         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
478                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
479         
480         server = JSONRPCServer()
481         if hasattr(config, 'JSONRPCAddress'):
482                 if not hasattr(config, 'JSONRPCAddresses'):
483                         config.JSONRPCAddresses = []
484                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
485         LS = []
486         for a in config.JSONRPCAddresses:
487                 LS.append(JSONRPCListener(server, a))
488         if hasattr(config, 'SecretUser'):
489                 server.SecretUser = config.SecretUser
490         server.aux = MM.CoinbaseAux
491         server.getBlockHeader = getBlockHeader
492         server.getBlockTemplate = getBlockTemplate
493         server.receiveShare = receiveShare
494         server.RaiseRedFlags = RaiseRedFlags
495         
496         restoreState()
497         
498         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
499         bcnode_thr.daemon = True
500         bcnode_thr.start()
501         
502         server.serve_forever()