2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 if not hasattr(config, 'ServerName'):
21 config.ServerName = 'Unnamed Eloipool'
23 if not hasattr(config, 'ShareTarget'):
24 config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
29 if len(logging.root.handlers) == 0:
31 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
34 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35 logging.getLogger(infoOnly).setLevel(logging.INFO)
37 def RaiseRedFlags(reason):
38 logging.getLogger('redflag').critical(reason)
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
60 if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
63 cmd = config.CoinbaserCmd
64 cmd = cmd.replace('%d', str(coinbaseValue))
65 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66 nout = int(p.stdout.readline())
68 amount = int(p.stdout.readline())
69 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70 pkScript = BitcoinScript.toAddress(addr)
71 txn.addOutput(amount, pkScript)
74 coinbased = coinbaseValue + 1
75 if coinbased >= coinbaseValue:
76 logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
79 coinbaseValue -= coinbased
81 pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82 txn.addOutput(coinbaseValue, pkScript)
85 # TODO: red flag on dupe coinbase
89 import jsonrpc_getwork
90 from util import Bits2Target
104 jsonrpc_getwork._CheckForDupesHACK = {}
105 global MM, networkTarget, server
106 bits = MM.currentBlock[2]
110 networkTarget = Bits2Target(bits)
115 from time import sleep, time
118 def _WorkLogPruner_I(wl):
122 userwork = wl[username]
123 for wli in tuple(userwork.keys()):
124 if now > userwork[wli][1] + 120:
127 WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
129 def WorkLogPruner(wl):
135 WorkLogPruner.logger.error(traceback.format_exc())
136 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
139 from merklemaker import merkleMaker
141 MM.__dict__.update(config.__dict__)
142 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False) # FIXME
143 MM.clearCoinbaseTxn.assemble()
144 MM.makeCoinbaseTxn = makeCoinbaseTxn
145 MM.onBlockChange = blockChanged
146 MM.onBlockUpdate = updateBlocks
149 from binascii import b2a_hex
150 from copy import deepcopy
152 from merklemaker import MakeBlockHeader
153 from struct import pack, unpack
155 from time import time
156 from util import RejectedShare, dblsha, hash2int, swap32, target2pdiff
161 if hasattr(config, 'GotWorkURI'):
162 gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
164 if not hasattr(config, 'DynamicTargetting'):
165 config.DynamicTargetting = 0
167 if not hasattr(config, 'DynamicTargetWindow'):
168 config.DynamicTargetWindow = 120
169 config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
171 def submitGotwork(info):
173 gotwork.gotwork(info)
175 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
177 def getTarget(username, now):
178 if not config.DynamicTargetting:
180 if username in userStatus:
181 status = userStatus[username]
183 userStatus[username] = [None, now, 0]
185 (targetIn, lastUpdate, work) = status
186 if work <= config.DynamicTargetGoal:
187 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
191 getTarget.logger.debug("No shares from '%s', resetting to minimum target")
192 userStatus[username] = [None, now, 0]
195 deltaSec = now - lastUpdate
196 target = targetIn or config.ShareTarget
197 target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
198 if target >= config.ShareTarget:
201 if target < networkTarget:
202 target = networkTarget
203 if config.DynamicTargetting == 2:
204 # Round target to a power of two :)
205 target = 2**int(log(target, 2) + 1) - 1
206 if target == config.ShareTarget:
208 if target != targetIn:
209 pfx = 'Retargetting %s' % (repr(username),)
210 tin = targetIn or config.ShareTarget
211 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
212 tgt = target or config.ShareTarget
213 getTarget.logger.debug("%s to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
214 userStatus[username] = [target, now, 0]
216 getTarget.logger = logging.getLogger('getTarget')
218 def RegisterWork(username, wli, wld):
220 target = getTarget(username, now)
221 wld = tuple(wld) + (target,)
222 workLog.setdefault(username, {})[wli] = (wld, now)
223 return target or config.ShareTarget
225 def getBlockHeader(username):
228 hdr = MakeBlockHeader(MRD)
229 workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
230 target = RegisterWork(username, merkleRoot, MRD)
231 return (hdr, workLog[username][merkleRoot], target)
233 def getBlockTemplate(username):
235 (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
236 wliPos = coinbase[0] + 2
237 wliLen = coinbase[wliPos - 1]
238 wli = coinbase[wliPos:wliPos+wliLen]
239 target = RegisterWork(username, wli, MC)
240 return (MC, workLog[username][wli], target)
247 from bitcoin.varlen import varlenEncode, varlenDecode
249 from merklemaker import assembleBlock
251 def blockSubmissionThread(payload, blkhash):
252 myblock = (blkhash, payload[4:36])
253 payload = b2a_hex(payload).decode('ascii')
257 rv = UpstreamBitcoindJSONRPC.submitblock(payload)
261 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
271 # FIXME: This will show "Method not found" on pre-BIP22 servers
272 RaiseRedFlags(traceback.format_exc())
274 if MM.currentBlock[0] not in myblock:
275 RaiseRedFlags('Giving up on submitting block upstream')
278 # FIXME: The returned value could be a list of multiple responses
279 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
281 def checkShare(share):
282 shareTime = share['time'] = time()
286 (prevBlock, height, bits) = MM.currentBlock
287 sharePrevBlock = data[4:36]
288 if sharePrevBlock != prevBlock:
289 if sharePrevBlock == MM.lastBlock[0]:
290 raise RejectedShare('stale-prevblk')
291 raise RejectedShare('bad-prevblk')
294 username = share['username']
295 if username not in workLog:
296 raise RejectedShare('unknown-user')
298 if data[72:76] != bits:
299 raise RejectedShare('bad-diffbits')
301 # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
302 # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
303 if data[1:4] != b'\0\0\0' or data[0] > 2:
304 raise RejectedShare('bad-version')
306 shareMerkleRoot = data[36:68]
307 if 'blkdata' in share:
308 pl = share['blkdata']
309 (txncount, pl) = varlenDecode(pl)
310 cbtxn = bitcoin.txn.Txn(pl)
311 othertxndata = cbtxn.disassemble(retExtra=True)
312 coinbase = cbtxn.getCoinbase()
313 wliPos = coinbase[0] + 2
314 wliLen = coinbase[wliPos - 1]
315 wli = coinbase[wliPos:wliPos+wliLen]
319 wli = shareMerkleRoot
323 MWL = workLog[username]
325 raise RejectedShare('unknown-work')
326 (wld, issueT) = MWL[wli]
329 if data in DupeShareHACK:
330 raise RejectedShare('duplicate')
331 DupeShareHACK[data] = None
333 blkhash = dblsha(data)
334 if blkhash[28:] != b'\0\0\0\0':
335 raise RejectedShare('H-not-zero')
336 blkhashn = hash2int(blkhash)
339 logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
340 logfunc('BLKHASH: %64x' % (blkhashn,))
341 logfunc(' TARGET: %64x' % (networkTarget,))
343 workMerkleTree = wld[1]
344 workCoinbase = wld[2]
347 # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
348 txlist = workMerkleTree.data
349 txlist = [deepcopy(txlist[0]),] + txlist[1:]
351 cbtxn.setCoinbase(workCoinbase)
354 if blkhashn <= networkTarget:
355 logfunc("Submitting upstream")
356 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
358 payload = assembleBlock(data, txlist)
360 payload = share['data'] + share['blkdata']
361 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
363 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
364 bcnode.submitBlock(payload)
365 share['upstreamResult'] = True
366 MM.updateBlock(blkhash)
369 if gotwork and blkhashn <= config.GotWorkTarget:
371 coinbaseMrkl = cbtxn.data
372 coinbaseMrkl += blkhash
373 steps = workMerkleTree._steps
374 coinbaseMrkl += pack('B', len(steps))
377 coinbaseMrkl += b"\0\0\0\0"
379 info['hash'] = b2a_hex(blkhash).decode('ascii')
380 info['header'] = b2a_hex(data).decode('ascii')
381 info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
382 thr = threading.Thread(target=submitGotwork, args=(info,))
386 checkShare.logger.warning('Failed to build gotwork request')
388 if workTarget is None:
389 workTarget = config.ShareTarget
390 if blkhashn > workTarget:
391 raise RejectedShare('high-hash')
392 share['target'] = workTarget
393 share['_targethex'] = '%064x' % (workTarget,)
395 shareTimestamp = unpack('<L', data[68:72])[0]
396 if shareTime < issueT - 120:
397 raise RejectedShare('stale-work')
398 if shareTimestamp < shareTime - 300:
399 raise RejectedShare('time-too-old')
400 if shareTimestamp > shareTime + 7200:
401 raise RejectedShare('time-too-new')
403 if config.DynamicTargetting and username in userStatus:
404 # NOTE: userStatus[username] only doesn't exist across restarts
405 status = userStatus[username]
406 target = status[0] or config.ShareTarget
407 if target == workTarget:
408 userStatus[username][2] += 1
410 userStatus[username][2] += float(target) / workTarget
413 cbpre = cbtxn.getCoinbase()
414 cbpreLen = len(cbpre)
415 if coinbase[:cbpreLen] != cbpre:
416 raise RejectedShare('bad-cb-prefix')
418 # Filter out known "I support" flags, to prevent exploits
419 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
420 if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
421 raise RejectedShare('bad-cb-flag')
423 if len(coinbase) > 100:
424 raise RejectedShare('bad-cb-length')
426 cbtxn.setCoinbase(coinbase)
428 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
429 raise RejectedShare('bad-txnmrklroot')
431 if len(othertxndata):
432 allowed = assembleBlock(data, txlist)[80:]
433 if allowed != share['blkdata']:
434 raise RejectedShare('bad-txns')
435 checkShare.logger = logging.getLogger('checkShare')
437 def receiveShare(share):
438 # TODO: username => userid
441 except RejectedShare as rej:
442 share['rejectReason'] = str(rej)
445 if '_origdata' in share:
446 share['solution'] = share['_origdata']
448 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
449 for i in loggersShare:
452 def newBlockNotification():
453 logging.getLogger('newBlockNotification').info('Received new block notification')
454 MM.updateMerkleTree()
455 # TODO: Force RESPOND TO LONGPOLLS?
458 def newBlockNotificationSIGNAL(signum, frame):
459 # Use a new thread, in case the signal handler is called with locks held
460 thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
464 from signal import signal, SIGUSR1
465 signal(SIGUSR1, newBlockNotificationSIGNAL)
473 from time import sleep
476 SAVE_STATE_FILENAME = 'eloipool.worklog'
479 logger = logging.getLogger('stopServers')
481 if hasattr(stopServers, 'already'):
482 logger.debug('Already tried to stop servers before')
484 stopServers.already = True
486 logger.info('Stopping servers...')
487 global bcnode, server
488 servers = (bcnode, server)
495 logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
501 sl.append(s.__class__.__name__)
506 logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
511 for fd in s._fd.keys():
514 def saveState(t = None):
515 logger = logging.getLogger('saveState')
517 # Then, save data needed to resume work
518 logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
522 with open(SAVE_STATE_FILENAME, 'wb') as f:
524 pickle.dump(DupeShareHACK, f)
525 pickle.dump(workLog, f)
530 logger.error('Failed to save work\n' + traceback.format_exc())
532 os.unlink(SAVE_STATE_FILENAME)
534 logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
540 logging.getLogger('exit').info('Goodbye...')
541 os.kill(os.getpid(), signal.SIGTERM)
548 logging.getLogger('restart').info('Restarting...')
550 os.execv(sys.argv[0], sys.argv)
552 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
555 if not os.path.exists(SAVE_STATE_FILENAME):
558 global workLog, DupeShareHACK
560 logger = logging.getLogger('restoreState')
561 s = os.stat(SAVE_STATE_FILENAME)
562 logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
564 with open(SAVE_STATE_FILENAME, 'rb') as f:
568 # Future formats, not supported here
572 # Old format, from 2012-02-02 to 2012-02-03
577 if isinstance(t, dict):
578 # Old format, from 2012-02-03 to 2012-02-03
582 # Current format, from 2012-02-03 onward
583 DupeShareHACK = pickle.load(f)
585 if t + 120 >= time():
586 workLog = pickle.load(f)
588 logger.debug('Skipping restore of expired workLog')
590 logger.error('Failed to restore state\n' + traceback.format_exc())
592 logger.info('State restored successfully')
594 logger.info('Total downtime: %g seconds' % (time() - t,))
597 from jsonrpcserver import JSONRPCListener, JSONRPCServer
598 import interactivemode
599 from networkserver import NetworkListener
604 if __name__ == "__main__":
605 if not hasattr(config, 'ShareLogging'):
606 config.ShareLogging = ()
607 if hasattr(config, 'DbOptions'):
608 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
609 config.ShareLogging = list(config.ShareLogging)
610 config.ShareLogging.append( {
612 'engine': 'postgres',
613 'dbopts': config.DbOptions,
614 'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
616 for i in config.ShareLogging:
617 if not hasattr(i, 'keys'):
619 logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
620 if name == 'postgres':
623 'engine': 'postgres',
624 'dbopts': parameters,
626 elif name == 'logfile':
628 i['thropts'] = parameters
629 if 'filename' in parameters:
630 i['filename'] = parameters['filename']
631 i['thropts'] = dict(i['thropts'])
632 del i['thropts']['filename']
640 fp, pathname, description = imp.find_module(name, sharelogging.__path__)
641 m = imp.load_module(name, fp, pathname, description)
642 lo = getattr(m, name)(**parameters)
643 loggersShare.append(lo.logShare)
645 logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name, sys.exc_info())
648 if not hasattr(config, 'BitcoinNodeAddresses'):
649 config.BitcoinNodeAddresses = ()
650 for a in config.BitcoinNodeAddresses:
651 LSbc.append(NetworkListener(bcnode, a))
653 if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
654 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
656 import jsonrpc_getblocktemplate
657 import jsonrpc_getwork
658 import jsonrpc_setworkaux
660 server = JSONRPCServer()
661 if hasattr(config, 'JSONRPCAddress'):
662 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
663 if not hasattr(config, 'JSONRPCAddresses'):
664 config.JSONRPCAddresses = []
665 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
667 for a in config.JSONRPCAddresses:
668 LS.append(JSONRPCListener(server, a))
669 if hasattr(config, 'SecretUser'):
670 server.SecretUser = config.SecretUser
671 server.aux = MM.CoinbaseAux
672 server.getBlockHeader = getBlockHeader
673 server.getBlockTemplate = getBlockTemplate
674 server.receiveShare = receiveShare
675 server.RaiseRedFlags = RaiseRedFlags
676 server.ShareTarget = config.ShareTarget
678 if hasattr(config, 'TrustedForwarders'):
679 server.TrustedForwarders = config.TrustedForwarders
680 server.ServerName = config.ServerName
686 prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
687 prune_thr.daemon = True
690 bcnode_thr = threading.Thread(target=bcnode.serve_forever)
691 bcnode_thr.daemon = True
694 server.serve_forever()