2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 if not hasattr(config, 'ServerName'):
21 config.ServerName = 'Unnamed Eloipool'
23 if not hasattr(config, 'ShareTarget'):
24 config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
29 if len(logging.root.handlers) == 0:
31 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
34 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35 logging.getLogger(infoOnly).setLevel(logging.INFO)
37 def RaiseRedFlags(reason):
38 logging.getLogger('redflag').critical(reason)
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
60 if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
63 cmd = config.CoinbaserCmd
64 cmd = cmd.replace('%d', str(coinbaseValue))
65 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66 nout = int(p.stdout.readline())
68 amount = int(p.stdout.readline())
69 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70 pkScript = BitcoinScript.toAddress(addr)
71 txn.addOutput(amount, pkScript)
74 coinbased = coinbaseValue + 1
75 if coinbased >= coinbaseValue:
76 logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
79 coinbaseValue -= coinbased
81 pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82 txn.addOutput(coinbaseValue, pkScript)
85 # TODO: red flag on dupe coinbase
89 import jsonrpc_getwork
90 from util import Bits2Target
104 jsonrpc_getwork._CheckForDupesHACK = {}
105 global MM, networkTarget, server
106 bits = MM.currentBlock[2]
110 networkTarget = Bits2Target(bits)
112 server.wakeLongpoll(wantClear=True)
115 from time import sleep, time
118 def _WorkLogPruner_I(wl):
122 userwork = wl[username]
123 for wli in tuple(userwork.keys()):
124 if now > userwork[wli][1] + 120:
127 WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
129 def WorkLogPruner(wl):
135 WorkLogPruner.logger.error(traceback.format_exc())
136 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
139 from merklemaker import merkleMaker
141 MM.__dict__.update(config.__dict__)
142 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False) # FIXME
143 MM.clearCoinbaseTxn.assemble()
144 MM.makeCoinbaseTxn = makeCoinbaseTxn
145 MM.onBlockChange = blockChanged
146 MM.onBlockUpdate = updateBlocks
149 from binascii import b2a_hex
150 from copy import deepcopy
152 from merklemaker import MakeBlockHeader
153 from struct import pack, unpack
155 from time import time
156 from util import RejectedShare, dblsha, hash2int, swap32, target2pdiff
161 if hasattr(config, 'GotWorkURI'):
162 gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
164 if not hasattr(config, 'DynamicTargetting'):
165 config.DynamicTargetting = 0
167 if not hasattr(config, 'DynamicTargetWindow'):
168 config.DynamicTargetWindow = 120
169 config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
171 def submitGotwork(info):
173 gotwork.gotwork(info)
175 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
177 def getTarget(username, now):
178 if not config.DynamicTargetting:
180 if username in userStatus:
181 status = userStatus[username]
183 userStatus[username] = [None, now, 0]
185 (targetIn, lastUpdate, work) = status
186 if work <= config.DynamicTargetGoal:
187 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
191 getTarget.logger.debug("No shares from '%s', resetting to minimum target")
192 userStatus[username] = [None, now, 0]
195 deltaSec = now - lastUpdate
196 target = targetIn or config.ShareTarget
197 target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
198 if target >= config.ShareTarget:
201 if target < networkTarget:
202 target = networkTarget
203 if config.DynamicTargetting == 2:
204 # Round target to a power of two :)
205 target = 2**int(log(target, 2) + 1) - 1
206 if target == config.ShareTarget:
208 if target != targetIn:
209 pfx = 'Retargetting %s' % (repr(username),)
210 tin = targetIn or config.ShareTarget
211 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
212 tgt = target or config.ShareTarget
213 getTarget.logger.debug("%s to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
214 userStatus[username] = [target, now, 0]
216 getTarget.logger = logging.getLogger('getTarget')
218 def RegisterWork(username, wli, wld):
220 target = getTarget(username, now)
221 wld = tuple(wld) + (target,)
222 workLog.setdefault(username, {})[wli] = (wld, now)
223 return target or config.ShareTarget
225 def getBlockHeader(username):
228 hdr = MakeBlockHeader(MRD)
229 workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
230 target = RegisterWork(username, merkleRoot, MRD)
231 return (hdr, workLog[username][merkleRoot], target)
233 def getBlockTemplate(username, p_magic = None):
234 if server.tls.wantClear:
236 elif p_magic and username not in workLog:
241 MC = MM.getMC(wantClear)
242 (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
243 wliPos = coinbase[0] + 2
244 wliLen = coinbase[wliPos - 1]
245 wli = coinbase[wliPos:wliPos+wliLen]
246 target = RegisterWork(username, wli, MC)
247 return (MC, workLog[username][wli], target)
254 from bitcoin.varlen import varlenEncode, varlenDecode
256 from merklemaker import assembleBlock
259 def blockSubmissionThread(payload, blkhash, share):
260 myblock = (blkhash, payload[4:36])
261 payload = b2a_hex(payload).decode('ascii')
266 rv = UpstreamBitcoindJSONRPC.submitblock(payload)
268 except BaseException as gbterr:
270 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
276 except BaseException as e2:
280 # FIXME: This will show "Method not found" on pre-BIP22 servers
281 RaiseRedFlags(traceback.format_exc())
283 if MM.currentBlock[0] not in myblock:
284 RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
285 RaiseRedFlags('Giving up on submitting block upstream')
288 # FIXME: The returned value could be a list of multiple responses
289 RBFs.append( (('upstream reject', rv, time()), payload, blkhash, share) )
290 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
292 def checkShare(share):
293 shareTime = share['time'] = time()
297 (prevBlock, height, bits) = MM.currentBlock
298 sharePrevBlock = data[4:36]
299 if sharePrevBlock != prevBlock:
300 if sharePrevBlock == MM.lastBlock[0]:
301 raise RejectedShare('stale-prevblk')
302 raise RejectedShare('bad-prevblk')
305 username = share['username']
306 if username not in workLog:
307 raise RejectedShare('unknown-user')
309 if data[72:76] != bits:
310 raise RejectedShare('bad-diffbits')
312 # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
313 # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
314 if data[1:4] != b'\0\0\0' or data[0] > 2:
315 raise RejectedShare('bad-version')
317 shareMerkleRoot = data[36:68]
318 if 'blkdata' in share:
319 pl = share['blkdata']
320 (txncount, pl) = varlenDecode(pl)
321 cbtxn = bitcoin.txn.Txn(pl)
322 othertxndata = cbtxn.disassemble(retExtra=True)
323 coinbase = cbtxn.getCoinbase()
324 wliPos = coinbase[0] + 2
325 wliLen = coinbase[wliPos - 1]
326 wli = coinbase[wliPos:wliPos+wliLen]
330 wli = shareMerkleRoot
335 MWL = workLog[username]
337 raise RejectedShare('unknown-work')
338 (wld, issueT) = MWL[wli]
341 share['issuetime'] = issueT
343 if data in DupeShareHACK:
344 raise RejectedShare('duplicate')
345 DupeShareHACK[data] = None
347 blkhash = dblsha(data)
348 if blkhash[28:] != b'\0\0\0\0':
349 raise RejectedShare('H-not-zero')
350 blkhashn = hash2int(blkhash)
353 logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
354 logfunc('BLKHASH: %64x' % (blkhashn,))
355 logfunc(' TARGET: %64x' % (networkTarget,))
357 workMerkleTree = wld[1]
358 workCoinbase = wld[2]
359 workTarget = wld[6] if len(wld) > 6 else None
361 # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
362 txlist = workMerkleTree.data
363 txlist = [deepcopy(txlist[0]),] + txlist[1:]
365 cbtxn.setCoinbase(coinbase or workCoinbase)
368 if blkhashn <= networkTarget:
369 logfunc("Submitting upstream")
370 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
372 payload = assembleBlock(data, txlist)
374 payload = share['data']
375 if len(othertxndata):
376 payload += share['blkdata']
378 payload += assembleBlock(data, txlist)[80:]
379 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
381 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
382 bcnode.submitBlock(payload)
383 share['upstreamResult'] = True
384 MM.updateBlock(blkhash)
387 if gotwork and blkhashn <= config.GotWorkTarget:
389 coinbaseMrkl = cbtxn.data
390 coinbaseMrkl += blkhash
391 steps = workMerkleTree._steps
392 coinbaseMrkl += pack('B', len(steps))
395 coinbaseMrkl += b"\0\0\0\0"
397 info['hash'] = b2a_hex(blkhash).decode('ascii')
398 info['header'] = b2a_hex(data).decode('ascii')
399 info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
400 thr = threading.Thread(target=submitGotwork, args=(info,))
404 checkShare.logger.warning('Failed to build gotwork request')
406 if workTarget is None:
407 workTarget = config.ShareTarget
408 if blkhashn > workTarget:
409 raise RejectedShare('high-hash')
410 share['target'] = workTarget
411 share['_targethex'] = '%064x' % (workTarget,)
413 shareTimestamp = unpack('<L', data[68:72])[0]
414 if shareTime < issueT - 120:
415 raise RejectedShare('stale-work')
416 if shareTimestamp < shareTime - 300:
417 raise RejectedShare('time-too-old')
418 if shareTimestamp > shareTime + 7200:
419 raise RejectedShare('time-too-new')
421 if config.DynamicTargetting and username in userStatus:
422 # NOTE: userStatus[username] only doesn't exist across restarts
423 status = userStatus[username]
424 target = status[0] or config.ShareTarget
425 if target == workTarget:
426 userStatus[username][2] += 1
428 userStatus[username][2] += float(target) / workTarget
432 cbpreLen = len(cbpre)
433 if coinbase[:cbpreLen] != cbpre:
434 raise RejectedShare('bad-cb-prefix')
436 # Filter out known "I support" flags, to prevent exploits
437 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
438 if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
439 raise RejectedShare('bad-cb-flag')
441 if len(coinbase) > 100:
442 raise RejectedShare('bad-cb-length')
444 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
445 raise RejectedShare('bad-txnmrklroot')
447 if len(othertxndata):
448 allowed = assembleBlock(data, txlist)[80:]
449 if allowed != share['blkdata']:
450 raise RejectedShare('bad-txns')
451 checkShare.logger = logging.getLogger('checkShare')
453 def receiveShare(share):
454 # TODO: username => userid
457 except RejectedShare as rej:
458 share['rejectReason'] = str(rej)
461 if '_origdata' in share:
462 share['solution'] = share['_origdata']
464 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
465 for i in loggersShare:
468 def newBlockNotification():
469 logging.getLogger('newBlockNotification').info('Received new block notification')
470 MM.updateMerkleTree()
471 # TODO: Force RESPOND TO LONGPOLLS?
474 def newBlockNotificationSIGNAL(signum, frame):
475 # Use a new thread, in case the signal handler is called with locks held
476 thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
480 from signal import signal, SIGUSR1
481 signal(SIGUSR1, newBlockNotificationSIGNAL)
489 from time import sleep
492 SAVE_STATE_FILENAME = 'eloipool.worklog'
495 logger = logging.getLogger('stopServers')
497 if hasattr(stopServers, 'already'):
498 logger.debug('Already tried to stop servers before')
500 stopServers.already = True
502 logger.info('Stopping servers...')
503 global bcnode, server
504 servers = (bcnode, server)
511 logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
517 sl.append(s.__class__.__name__)
522 logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
527 for fd in s._fd.keys():
531 for i in loggersShare:
532 if hasattr(i, 'stop'):
535 def saveState(t = None):
536 logger = logging.getLogger('saveState')
538 # Then, save data needed to resume work
539 logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
543 with open(SAVE_STATE_FILENAME, 'wb') as f:
545 pickle.dump(DupeShareHACK, f)
546 pickle.dump(workLog, f)
551 logger.error('Failed to save work\n' + traceback.format_exc())
553 os.unlink(SAVE_STATE_FILENAME)
555 logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
562 logging.getLogger('exit').info('Goodbye...')
563 os.kill(os.getpid(), signal.SIGTERM)
571 logging.getLogger('restart').info('Restarting...')
573 os.execv(sys.argv[0], sys.argv)
575 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
578 if not os.path.exists(SAVE_STATE_FILENAME):
581 global workLog, DupeShareHACK
583 logger = logging.getLogger('restoreState')
584 s = os.stat(SAVE_STATE_FILENAME)
585 logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
587 with open(SAVE_STATE_FILENAME, 'rb') as f:
591 # Future formats, not supported here
595 # Old format, from 2012-02-02 to 2012-02-03
600 if isinstance(t, dict):
601 # Old format, from 2012-02-03 to 2012-02-03
605 # Current format, from 2012-02-03 onward
606 DupeShareHACK = pickle.load(f)
608 if t + 120 >= time():
609 workLog = pickle.load(f)
611 logger.debug('Skipping restore of expired workLog')
613 logger.error('Failed to restore state\n' + traceback.format_exc())
615 logger.info('State restored successfully')
617 logger.info('Total downtime: %g seconds' % (time() - t,))
620 from jsonrpcserver import JSONRPCListener, JSONRPCServer
621 import interactivemode
622 from networkserver import NetworkListener
627 if __name__ == "__main__":
628 if not hasattr(config, 'ShareLogging'):
629 config.ShareLogging = ()
630 if hasattr(config, 'DbOptions'):
631 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
632 config.ShareLogging = list(config.ShareLogging)
633 config.ShareLogging.append( {
635 'engine': 'postgres',
636 'dbopts': config.DbOptions,
637 'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
639 for i in config.ShareLogging:
640 if not hasattr(i, 'keys'):
642 logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
643 if name == 'postgres':
646 'engine': 'postgres',
647 'dbopts': parameters,
649 elif name == 'logfile':
651 i['thropts'] = parameters
652 if 'filename' in parameters:
653 i['filename'] = parameters['filename']
654 i['thropts'] = dict(i['thropts'])
655 del i['thropts']['filename']
663 fp, pathname, description = imp.find_module(name, sharelogging.__path__)
664 m = imp.load_module(name, fp, pathname, description)
665 lo = getattr(m, name)(**parameters)
666 loggersShare.append(lo)
668 logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name, sys.exc_info())
671 if not hasattr(config, 'BitcoinNodeAddresses'):
672 config.BitcoinNodeAddresses = ()
673 for a in config.BitcoinNodeAddresses:
674 LSbc.append(NetworkListener(bcnode, a))
676 if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
677 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
679 import jsonrpc_getblocktemplate
680 import jsonrpc_getwork
681 import jsonrpc_setworkaux
683 server = JSONRPCServer()
684 server.tls = threading.local()
685 server.tls.wantClear = False
686 if hasattr(config, 'JSONRPCAddress'):
687 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
688 if not hasattr(config, 'JSONRPCAddresses'):
689 config.JSONRPCAddresses = []
690 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
692 for a in config.JSONRPCAddresses:
693 LS.append(JSONRPCListener(server, a))
694 if hasattr(config, 'SecretUser'):
695 server.SecretUser = config.SecretUser
696 server.aux = MM.CoinbaseAux
697 server.getBlockHeader = getBlockHeader
698 server.getBlockTemplate = getBlockTemplate
699 server.receiveShare = receiveShare
700 server.RaiseRedFlags = RaiseRedFlags
701 server.ShareTarget = config.ShareTarget
703 if hasattr(config, 'TrustedForwarders'):
704 server.TrustedForwarders = config.TrustedForwarders
705 server.ServerName = config.ServerName
711 prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
712 prune_thr.daemon = True
715 bcnode_thr = threading.Thread(target=bcnode.serve_forever)
716 bcnode_thr.daemon = True
719 server.serve_forever()