Dump block payloads in hex
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 def assembleBlock(blkhdr, txlist):
167         payload = blkhdr
168         payload += varlenEncode(len(txlist))
169         for tx in txlist:
170                 payload += tx.data
171         return payload
172
173 def blockSubmissionThread(payload):
174         while True:
175                 try:
176                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
177                         break
178                 except:
179                         pass
180         if not rv:
181                 RaiseRedFlags('Upstream rejected block!')
182
183 _STA = '%064x' % (config.ShareTarget,)
184 def checkShare(share):
185         shareTime = share['time'] = time()
186         
187         data = share['data']
188         data = data[:80]
189         (prevBlock, height, bits) = MM.currentBlock
190         sharePrevBlock = data[4:36]
191         if sharePrevBlock != prevBlock:
192                 if sharePrevBlock == MM.lastBlock[0]:
193                         raise RejectedShare('stale-prevblk')
194                 raise RejectedShare('bad-prevblk')
195         
196         # TODO: use userid
197         username = share['username']
198         if username not in workLog:
199                 raise RejectedShare('unknown-user')
200         
201         if data[72:76] != bits:
202                 raise RejectedShare('bad-diffbits')
203         
204         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
205         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
206         if data[1:4] != b'\0\0\0' or data[0] > 2:
207                 raise RejectedShare('bad-version')
208         
209         shareMerkleRoot = data[36:68]
210         if 'blkdata' in share:
211                 pl = share['blkdata']
212                 (txncount, pl) = varlenDecode(pl)
213                 cbtxn = bitcoin.txn.Txn(pl)
214                 cbtxn.disassemble(retExtra=True)
215                 coinbase = cbtxn.getCoinbase()
216                 wliLen = coinbase[0]
217                 wli = coinbase[1:wliLen+1]
218                 mode = 'MC'
219                 moden = 1
220         else:
221                 wli = shareMerkleRoot
222                 mode = 'MRD'
223                 moden = 0
224         
225         MWL = workLog[username]
226         if wli not in MWL:
227                 raise RejectedShare('unknown-work')
228         (wld, issueT) = MWL[wli]
229         share[mode] = wld
230         
231         if data in DupeShareHACK:
232                 raise RejectedShare('duplicate')
233         DupeShareHACK[data] = None
234         
235         blkhash = dblsha(data)
236         if blkhash[28:] != b'\0\0\0\0':
237                 raise RejectedShare('H-not-zero')
238         blkhashn = hash2int(blkhash)
239         
240         global networkTarget
241         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
242         logfunc('BLKHASH: %64x' % (blkhashn,))
243         logfunc(' TARGET: %64x' % (networkTarget,))
244         
245         workMerkleTree = wld[1]
246         workCoinbase = wld[2]
247         
248         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
249         txlist = workMerkleTree.data
250         txlist = [deepcopy(txlist[0]),] + txlist[1:]
251         cbtxn = txlist[0]
252         cbtxn.setCoinbase(workCoinbase)
253         cbtxn.assemble()
254         
255         if blkhashn <= networkTarget:
256                 logfunc("Submitting upstream")
257                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
258                 if not moden:
259                         payload = assembleBlock(data, txlist)
260                 else:
261                         payload = share['data'] + share['blkdata']
262                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
263                 RBPs.append(payload)
264                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
265                 bcnode.submitBlock(payload)
266                 share['upstreamResult'] = True
267                 MM.updateBlock(blkhash)
268         
269         # Gotwork hack...
270         if gotwork and blkhashn <= config.GotWorkTarget:
271                 try:
272                         coinbaseMrkl = cbtxn.data
273                         coinbaseMrkl += blkhash
274                         steps = workMerkleTree._steps
275                         coinbaseMrkl += pack('B', len(steps))
276                         for step in steps:
277                                 coinbaseMrkl += step
278                         coinbaseMrkl += b"\0\0\0\0"
279                         info = {}
280                         info['hash'] = b2a_hex(blkhash).decode('ascii')
281                         info['header'] = b2a_hex(data).decode('ascii')
282                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
283                         thr = threading.Thread(target=submitGotwork, args=(info,))
284                         thr.daemon = True
285                         thr.start()
286                 except:
287                         checkShare.logger.warning('Failed to build gotwork request')
288         
289         if blkhashn > config.ShareTarget:
290                 raise RejectedShare('high-hash')
291         share['target'] = config.ShareTarget
292         share['_targethex'] = _STA
293         
294         shareTimestamp = unpack('<L', data[68:72])[0]
295         if shareTime < issueT - 120:
296                 raise RejectedShare('stale-work')
297         if shareTimestamp < shareTime - 300:
298                 raise RejectedShare('time-too-old')
299         if shareTimestamp > shareTime + 7200:
300                 raise RejectedShare('time-too-new')
301         
302         if moden:
303                 cbpre = cbtxn.getCoinbase()
304                 cbpreLen = len(cbpre)
305                 if coinbase[:cbpreLen] != cbpre:
306                         raise RejectedShare('bad-cb-prefix')
307                 
308                 # Filter out known "I support" flags, to prevent exploits
309                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
310                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
311                                 raise RejectedShare('bad-cb-flag')
312                 
313                 if len(coinbase) > 100:
314                         raise RejectedShare('bad-cb-length')
315                 
316                 cbtxn.setCoinbase(coinbase)
317                 cbtxn.assemble()
318                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
319                         raise RejectedShare('bad-txnmrklroot')
320                 
321                 allowed = assembleBlock(data, txlist)
322                 if allowed != share['data'] + share['blkdata']:
323                         raise RejectedShare('bad-txns')
324 checkShare.logger = logging.getLogger('checkShare')
325
326 def receiveShare(share):
327         # TODO: username => userid
328         try:
329                 checkShare(share)
330         except RejectedShare as rej:
331                 share['rejectReason'] = str(rej)
332                 raise
333         finally:
334                 if '_origdata' in share:
335                         share['solution'] = share['_origdata']
336                 else:
337                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
338                 for i in loggersShare:
339                         i(share)
340
341 def newBlockNotification():
342         logging.getLogger('newBlockNotification').info('Received new block notification')
343         MM.updateMerkleTree()
344         # TODO: Force RESPOND TO LONGPOLLS?
345         pass
346
347 def newBlockNotificationSIGNAL(signum, frame):
348         # Use a new thread, in case the signal handler is called with locks held
349         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
350         thr.daemon = True
351         thr.start()
352
353 from signal import signal, SIGUSR1
354 signal(SIGUSR1, newBlockNotificationSIGNAL)
355
356
357 import os
358 import os.path
359 import pickle
360 import signal
361 import sys
362 from time import sleep
363 import traceback
364
365 SAVE_STATE_FILENAME = 'eloipool.worklog'
366
367 def stopServers():
368         logger = logging.getLogger('stopServers')
369         
370         if hasattr(stopServers, 'already'):
371                 logger.debug('Already tried to stop servers before')
372                 return
373         stopServers.already = True
374         
375         logger.info('Stopping servers...')
376         global bcnode, server
377         servers = (bcnode, server)
378         for s in servers:
379                 s.keepgoing = False
380         for s in servers:
381                 try:
382                         s.wakeup()
383                 except:
384                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
385         i = 0
386         while True:
387                 sl = []
388                 for s in servers:
389                         if s.running:
390                                 sl.append(s.__class__.__name__)
391                 if not sl:
392                         break
393                 i += 1
394                 if i >= 0x100:
395                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
396                         break
397                 sleep(0.01)
398         
399         for s in servers:
400                 for fd in s._fd.keys():
401                         os.close(fd)
402
403 def saveState(t = None):
404         logger = logging.getLogger('saveState')
405         
406         # Then, save data needed to resume work
407         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
408         i = 0
409         while True:
410                 try:
411                         with open(SAVE_STATE_FILENAME, 'wb') as f:
412                                 pickle.dump(t, f)
413                                 pickle.dump(DupeShareHACK, f)
414                                 pickle.dump(workLog, f)
415                         break
416                 except:
417                         i += 1
418                         if i >= 0x10000:
419                                 logger.error('Failed to save work\n' + traceback.format_exc())
420                                 try:
421                                         os.unlink(SAVE_STATE_FILENAME)
422                                 except:
423                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
424
425 def exit():
426         t = time()
427         stopServers()
428         saveState(t)
429         logging.getLogger('exit').info('Goodbye...')
430         os.kill(os.getpid(), signal.SIGTERM)
431         sys.exit(0)
432
433 def restart():
434         t = time()
435         stopServers()
436         saveState(t)
437         logging.getLogger('restart').info('Restarting...')
438         try:
439                 os.execv(sys.argv[0], sys.argv)
440         except:
441                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
442
443 def restoreState():
444         if not os.path.exists(SAVE_STATE_FILENAME):
445                 return
446         
447         global workLog, DupeShareHACK
448         
449         logger = logging.getLogger('restoreState')
450         s = os.stat(SAVE_STATE_FILENAME)
451         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
452         try:
453                 with open(SAVE_STATE_FILENAME, 'rb') as f:
454                         t = pickle.load(f)
455                         if type(t) == tuple:
456                                 if len(t) > 2:
457                                         # Future formats, not supported here
458                                         ver = t[3]
459                                         # TODO
460                                 
461                                 # Old format, from 2012-02-02 to 2012-02-03
462                                 workLog = t[0]
463                                 DupeShareHACK = t[1]
464                                 t = None
465                         else:
466                                 if isinstance(t, dict):
467                                         # Old format, from 2012-02-03 to 2012-02-03
468                                         DupeShareHACK = t
469                                         t = None
470                                 else:
471                                         # Current format, from 2012-02-03 onward
472                                         DupeShareHACK = pickle.load(f)
473                                 
474                                 if t + 120 >= time():
475                                         workLog = pickle.load(f)
476                                 else:
477                                         logger.debug('Skipping restore of expired workLog')
478         except:
479                 logger.error('Failed to restore state\n' + traceback.format_exc())
480                 return
481         logger.info('State restored successfully')
482         if t:
483                 logger.info('Total downtime: %g seconds' % (time() - t,))
484
485
486 from jsonrpcserver import JSONRPCListener, JSONRPCServer
487 import interactivemode
488 from networkserver import NetworkListener
489 import threading
490 import sharelogging
491 import imp
492
493 if __name__ == "__main__":
494         if not hasattr(config, 'ShareLogging'):
495                 config.ShareLogging = ()
496         if hasattr(config, 'DbOptions'):
497                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
498                 config.ShareLogging = list(config.ShareLogging)
499                 config.ShareLogging.append( {
500                         'type': 'sql',
501                         'engine': 'postgres',
502                         'dbopts': config.DbOptions,
503                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
504                 } )
505         for i in config.ShareLogging:
506                 if not hasattr(i, 'keys'):
507                         name, parameters = i
508                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
509                         if name == 'postgres':
510                                 name = 'sql'
511                                 i = {
512                                         'engine': 'postgres',
513                                         'dbopts': parameters,
514                                 }
515                         elif name == 'logfile':
516                                 i = {}
517                                 i['thropts'] = parameters
518                                 if 'filename' in parameters:
519                                         i['filename'] = parameters['filename']
520                                         i['thropts'] = dict(i['thropts'])
521                                         del i['thropts']['filename']
522                         else:
523                                 i = parameters
524                         i['type'] = name
525                 
526                 name = i['type']
527                 parameters = i
528                 try:
529                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
530                         m = imp.load_module(name, fp, pathname, description)
531                         lo = getattr(m, name)(**parameters)
532                         loggersShare.append(lo.logShare)
533                 except:
534                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
535
536         LSbc = []
537         if not hasattr(config, 'BitcoinNodeAddresses'):
538                 config.BitcoinNodeAddresses = ()
539         for a in config.BitcoinNodeAddresses:
540                 LSbc.append(NetworkListener(bcnode, a))
541         
542         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
543                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
544         
545         import jsonrpc_getmemorypool
546         import jsonrpc_getwork
547         import jsonrpc_setworkaux
548         
549         server = JSONRPCServer()
550         if hasattr(config, 'JSONRPCAddress'):
551                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
552                 if not hasattr(config, 'JSONRPCAddresses'):
553                         config.JSONRPCAddresses = []
554                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
555         LS = []
556         for a in config.JSONRPCAddresses:
557                 LS.append(JSONRPCListener(server, a))
558         if hasattr(config, 'SecretUser'):
559                 server.SecretUser = config.SecretUser
560         server.aux = MM.CoinbaseAux
561         server.getBlockHeader = getBlockHeader
562         server.getBlockTemplate = getBlockTemplate
563         server.receiveShare = receiveShare
564         server.RaiseRedFlags = RaiseRedFlags
565         server.ShareTarget = config.ShareTarget
566         
567         if hasattr(config, 'TrustedForwarders'):
568                 server.TrustedForwarders = config.TrustedForwarders
569         server.ServerName = config.ServerName
570         
571         MM.start()
572         
573         restoreState()
574         
575         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
576         bcnode_thr.daemon = True
577         bcnode_thr.start()
578         
579         server.serve_forever()