Bugfix: getBlockTemplate: Get work identifier after height-in-coinbase
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliPos = coinbase[0] + 2
155         wliLen = coinbase[wliPos - 1]
156         wli = coinbase[wliPos:wliPos+wliLen]
157         workLog.setdefault(username, {})[wli] = (MC, time())
158         return MC
159
160 loggersShare = []
161
162 RBDs = []
163 RBPs = []
164
165 from bitcoin.varlen import varlenEncode, varlenDecode
166 import bitcoin.txn
167 def assembleBlock(blkhdr, txlist):
168         payload = blkhdr
169         payload += varlenEncode(len(txlist))
170         for tx in txlist:
171                 payload += tx.data
172         return payload
173
174 def blockSubmissionThread(payload):
175         while True:
176                 try:
177                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
178                         break
179                 except:
180                         pass
181         if not rv:
182                 RaiseRedFlags('Upstream rejected block!')
183
184 _STA = '%064x' % (config.ShareTarget,)
185 def checkShare(share):
186         shareTime = share['time'] = time()
187         
188         data = share['data']
189         data = data[:80]
190         (prevBlock, height, bits) = MM.currentBlock
191         sharePrevBlock = data[4:36]
192         if sharePrevBlock != prevBlock:
193                 if sharePrevBlock == MM.lastBlock[0]:
194                         raise RejectedShare('stale-prevblk')
195                 raise RejectedShare('bad-prevblk')
196         
197         # TODO: use userid
198         username = share['username']
199         if username not in workLog:
200                 raise RejectedShare('unknown-user')
201         
202         if data[72:76] != bits:
203                 raise RejectedShare('bad-diffbits')
204         if data[:4] != b'\1\0\0\0':
205                 raise RejectedShare('bad-version')
206         
207         shareMerkleRoot = data[36:68]
208         if 'blkdata' in share:
209                 pl = share['blkdata']
210                 (txncount, pl) = varlenDecode(pl)
211                 cbtxn = bitcoin.txn.Txn(pl)
212                 cbtxn.disassemble(retExtra=True)
213                 coinbase = cbtxn.getCoinbase()
214                 wliPos = coinbase[0] + 2
215                 wliLen = coinbase[wliPos - 1]
216                 wli = coinbase[wliPos:wliPos+wliLen]
217                 mode = 'MC'
218                 moden = 1
219         else:
220                 wli = shareMerkleRoot
221                 mode = 'MRD'
222                 moden = 0
223         
224         MWL = workLog[username]
225         if wli not in MWL:
226                 raise RejectedShare('unknown-work')
227         (wld, issueT) = MWL[wli]
228         share[mode] = wld
229         
230         if data in DupeShareHACK:
231                 raise RejectedShare('duplicate')
232         DupeShareHACK[data] = None
233         
234         blkhash = dblsha(data)
235         if blkhash[28:] != b'\0\0\0\0':
236                 raise RejectedShare('H-not-zero')
237         blkhashn = hash2int(blkhash)
238         
239         global networkTarget
240         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
241         logfunc('BLKHASH: %64x' % (blkhashn,))
242         logfunc(' TARGET: %64x' % (networkTarget,))
243         
244         workMerkleTree = wld[1]
245         workCoinbase = wld[2]
246         
247         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
248         txlist = workMerkleTree.data
249         txlist = [deepcopy(txlist[0]),] + txlist[1:]
250         cbtxn = txlist[0]
251         cbtxn.setCoinbase(workCoinbase)
252         cbtxn.assemble()
253         
254         if blkhashn <= networkTarget:
255                 logfunc("Submitting upstream")
256                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
257                 if not moden:
258                         payload = assembleBlock(data, txlist)
259                 else:
260                         payload = share['data'] + share['blkdata']
261                 logfunc('Real block payload: %s' % (payload,))
262                 RBPs.append(payload)
263                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
264                 bcnode.submitBlock(payload)
265                 share['upstreamResult'] = True
266                 MM.updateBlock(blkhash)
267         
268         # Gotwork hack...
269         if gotwork and blkhashn <= config.GotWorkTarget:
270                 try:
271                         coinbaseMrkl = cbtxn.data
272                         coinbaseMrkl += blkhash
273                         steps = workMerkleTree._steps
274                         coinbaseMrkl += pack('B', len(steps))
275                         for step in steps:
276                                 coinbaseMrkl += step
277                         coinbaseMrkl += b"\0\0\0\0"
278                         info = {}
279                         info['hash'] = b2a_hex(blkhash).decode('ascii')
280                         info['header'] = b2a_hex(data).decode('ascii')
281                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
282                         thr = threading.Thread(target=submitGotwork, args=(info,))
283                         thr.daemon = True
284                         thr.start()
285                 except:
286                         checkShare.logger.warning('Failed to build gotwork request')
287         
288         if blkhashn > config.ShareTarget:
289                 raise RejectedShare('high-hash')
290         share['target'] = config.ShareTarget
291         share['_targethex'] = _STA
292         
293         shareTimestamp = unpack('<L', data[68:72])[0]
294         if shareTime < issueT - 120:
295                 raise RejectedShare('stale-work')
296         if shareTimestamp < shareTime - 300:
297                 raise RejectedShare('time-too-old')
298         if shareTimestamp > shareTime + 7200:
299                 raise RejectedShare('time-too-new')
300         
301         if moden:
302                 cbpre = cbtxn.getCoinbase()
303                 cbpreLen = len(cbpre)
304                 if coinbase[:cbpreLen] != cbpre:
305                         raise RejectedShare('bad-cb-prefix')
306                 
307                 # Filter out known "I support" flags, to prevent exploits
308                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
309                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
310                                 raise RejectedShare('bad-cb-flag')
311                 
312                 if len(coinbase) > 100:
313                         raise RejectedShare('bad-cb-length')
314                 
315                 cbtxn.setCoinbase(coinbase)
316                 cbtxn.assemble()
317                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
318                         raise RejectedShare('bad-txnmrklroot')
319                 
320                 allowed = assembleBlock(data, txlist)
321                 if allowed != share['data'] + share['blkdata']:
322                         raise RejectedShare('bad-txns')
323 checkShare.logger = logging.getLogger('checkShare')
324
325 def receiveShare(share):
326         # TODO: username => userid
327         try:
328                 checkShare(share)
329         except RejectedShare as rej:
330                 share['rejectReason'] = str(rej)
331                 raise
332         finally:
333                 if '_origdata' in share:
334                         share['solution'] = share['_origdata']
335                 else:
336                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
337                 for i in loggersShare:
338                         i(share)
339
340 def newBlockNotification():
341         logging.getLogger('newBlockNotification').info('Received new block notification')
342         MM.updateMerkleTree()
343         # TODO: Force RESPOND TO LONGPOLLS?
344         pass
345
346 def newBlockNotificationSIGNAL(signum, frame):
347         # Use a new thread, in case the signal handler is called with locks held
348         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
349         thr.daemon = True
350         thr.start()
351
352 from signal import signal, SIGUSR1
353 signal(SIGUSR1, newBlockNotificationSIGNAL)
354
355
356 import os
357 import os.path
358 import pickle
359 import signal
360 import sys
361 from time import sleep
362 import traceback
363
364 SAVE_STATE_FILENAME = 'eloipool.worklog'
365
366 def stopServers():
367         logger = logging.getLogger('stopServers')
368         
369         if hasattr(stopServers, 'already'):
370                 logger.debug('Already tried to stop servers before')
371                 return
372         stopServers.already = True
373         
374         logger.info('Stopping servers...')
375         global bcnode, server
376         servers = (bcnode, server)
377         for s in servers:
378                 s.keepgoing = False
379         for s in servers:
380                 try:
381                         s.wakeup()
382                 except:
383                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
384         i = 0
385         while True:
386                 sl = []
387                 for s in servers:
388                         if s.running:
389                                 sl.append(s.__class__.__name__)
390                 if not sl:
391                         break
392                 i += 1
393                 if i >= 0x100:
394                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
395                         break
396                 sleep(0.01)
397         
398         for s in servers:
399                 for fd in s._fd.keys():
400                         os.close(fd)
401
402 def saveState(t = None):
403         logger = logging.getLogger('saveState')
404         
405         # Then, save data needed to resume work
406         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
407         i = 0
408         while True:
409                 try:
410                         with open(SAVE_STATE_FILENAME, 'wb') as f:
411                                 pickle.dump(t, f)
412                                 pickle.dump(DupeShareHACK, f)
413                                 pickle.dump(workLog, f)
414                         break
415                 except:
416                         i += 1
417                         if i >= 0x10000:
418                                 logger.error('Failed to save work\n' + traceback.format_exc())
419                                 try:
420                                         os.unlink(SAVE_STATE_FILENAME)
421                                 except:
422                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
423
424 def exit():
425         t = time()
426         stopServers()
427         saveState(t)
428         logging.getLogger('exit').info('Goodbye...')
429         os.kill(os.getpid(), signal.SIGTERM)
430         sys.exit(0)
431
432 def restart():
433         t = time()
434         stopServers()
435         saveState(t)
436         logging.getLogger('restart').info('Restarting...')
437         try:
438                 os.execv(sys.argv[0], sys.argv)
439         except:
440                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
441
442 def restoreState():
443         if not os.path.exists(SAVE_STATE_FILENAME):
444                 return
445         
446         global workLog, DupeShareHACK
447         
448         logger = logging.getLogger('restoreState')
449         s = os.stat(SAVE_STATE_FILENAME)
450         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
451         try:
452                 with open(SAVE_STATE_FILENAME, 'rb') as f:
453                         t = pickle.load(f)
454                         if type(t) == tuple:
455                                 if len(t) > 2:
456                                         # Future formats, not supported here
457                                         ver = t[3]
458                                         # TODO
459                                 
460                                 # Old format, from 2012-02-02 to 2012-02-03
461                                 workLog = t[0]
462                                 DupeShareHACK = t[1]
463                                 t = None
464                         else:
465                                 if isinstance(t, dict):
466                                         # Old format, from 2012-02-03 to 2012-02-03
467                                         DupeShareHACK = t
468                                         t = None
469                                 else:
470                                         # Current format, from 2012-02-03 onward
471                                         DupeShareHACK = pickle.load(f)
472                                 
473                                 if t + 120 >= time():
474                                         workLog = pickle.load(f)
475                                 else:
476                                         logger.debug('Skipping restore of expired workLog')
477         except:
478                 logger.error('Failed to restore state\n' + traceback.format_exc())
479                 return
480         logger.info('State restored successfully')
481         if t:
482                 logger.info('Total downtime: %g seconds' % (time() - t,))
483
484
485 from jsonrpcserver import JSONRPCListener, JSONRPCServer
486 import interactivemode
487 from networkserver import NetworkListener
488 import threading
489 import sharelogging
490 import imp
491
492 if __name__ == "__main__":
493         if not hasattr(config, 'ShareLogging'):
494                 config.ShareLogging = ()
495         if hasattr(config, 'DbOptions'):
496                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
497                 config.ShareLogging = list(config.ShareLogging)
498                 config.ShareLogging.append( {
499                         'type': 'sql',
500                         'engine': 'postgres',
501                         'dbopts': config.DbOptions,
502                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
503                 } )
504         for i in config.ShareLogging:
505                 if not hasattr(i, 'keys'):
506                         name, parameters = i
507                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
508                         if name == 'postgres':
509                                 name = 'sql'
510                                 i = {
511                                         'engine': 'postgres',
512                                         'dbopts': parameters,
513                                 }
514                         elif name == 'logfile':
515                                 i = {}
516                                 i['thropts'] = parameters
517                                 if 'filename' in parameters:
518                                         i['filename'] = parameters['filename']
519                                         i['thropts'] = dict(i['thropts'])
520                                         del i['thropts']['filename']
521                         else:
522                                 i = parameters
523                         i['type'] = name
524                 
525                 name = i['type']
526                 parameters = i
527                 try:
528                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
529                         m = imp.load_module(name, fp, pathname, description)
530                         lo = getattr(m, name)(**parameters)
531                         loggersShare.append(lo.logShare)
532                 except:
533                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
534
535         LSbc = []
536         if not hasattr(config, 'BitcoinNodeAddresses'):
537                 config.BitcoinNodeAddresses = ()
538         for a in config.BitcoinNodeAddresses:
539                 LSbc.append(NetworkListener(bcnode, a))
540         
541         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
542                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
543         
544         import jsonrpc_getmemorypool
545         import jsonrpc_getwork
546         import jsonrpc_setworkaux
547         
548         server = JSONRPCServer()
549         if hasattr(config, 'JSONRPCAddress'):
550                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
551                 if not hasattr(config, 'JSONRPCAddresses'):
552                         config.JSONRPCAddresses = []
553                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
554         LS = []
555         for a in config.JSONRPCAddresses:
556                 LS.append(JSONRPCListener(server, a))
557         if hasattr(config, 'SecretUser'):
558                 server.SecretUser = config.SecretUser
559         server.aux = MM.CoinbaseAux
560         server.getBlockHeader = getBlockHeader
561         server.getBlockTemplate = getBlockTemplate
562         server.receiveShare = receiveShare
563         server.RaiseRedFlags = RaiseRedFlags
564         server.ShareTarget = config.ShareTarget
565         
566         if hasattr(config, 'TrustedForwarders'):
567                 server.TrustedForwarders = config.TrustedForwarders
568         server.ServerName = config.ServerName
569         
570         MM.start()
571         
572         restoreState()
573         
574         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
575         bcnode_thr.daemon = True
576         bcnode_thr.start()
577         
578         server.serve_forever()