2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 if not hasattr(config, 'ServerName'):
21 config.ServerName = 'Unnamed Eloipool'
23 if not hasattr(config, 'ShareTarget'):
24 config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
29 if len(logging.root.handlers) == 0:
31 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
34 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35 logging.getLogger(infoOnly).setLevel(logging.INFO)
37 def RaiseRedFlags(reason):
38 logging.getLogger('redflag').critical(reason)
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
60 if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
63 cmd = config.CoinbaserCmd
64 cmd = cmd.replace('%d', str(coinbaseValue))
65 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66 nout = int(p.stdout.readline())
68 amount = int(p.stdout.readline())
69 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70 pkScript = BitcoinScript.toAddress(addr)
71 txn.addOutput(amount, pkScript)
74 coinbased = coinbaseValue + 1
75 if coinbased >= coinbaseValue:
76 logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
79 coinbaseValue -= coinbased
81 pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82 txn.addOutput(coinbaseValue, pkScript)
85 # TODO: red flag on dupe coinbase
89 import jsonrpc_getwork
90 from util import Bits2Target
104 jsonrpc_getwork._CheckForDupesHACK = {}
105 global MM, networkTarget, server
106 bits = MM.currentBlock[2]
110 networkTarget = Bits2Target(bits)
115 from merklemaker import merkleMaker
117 MM.__dict__.update(config.__dict__)
118 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False) # FIXME
119 MM.clearCoinbaseTxn.assemble()
120 MM.makeCoinbaseTxn = makeCoinbaseTxn
121 MM.onBlockChange = blockChanged
122 MM.onBlockUpdate = updateBlocks
125 from binascii import b2a_hex
126 from copy import deepcopy
128 from struct import pack, unpack
130 from time import time
131 from util import RejectedShare, dblsha, hash2int, swap32, target2pdiff
136 if hasattr(config, 'GotWorkURI'):
137 gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
139 if not hasattr(config, 'DynamicTargetting'):
140 config.DynamicTargetting = 0
142 if not hasattr(config, 'DynamicTargetWindow'):
143 config.DynamicTargetWindow = 120
144 config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
146 def submitGotwork(info):
148 gotwork.gotwork(info)
150 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
152 def getTarget(username, now):
153 if not config.DynamicTargetting:
155 if username in userStatus:
156 status = userStatus[username]
158 userStatus[username] = [None, now, 0]
160 (targetIn, lastUpdate, work) = status
161 if work <= config.DynamicTargetGoal:
162 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
166 getTarget.logger.debug("No shares from '%s', resetting to minimum target")
167 userStatus[username] = [None, now, 0]
170 deltaSec = now - lastUpdate
171 target = targetIn or config.ShareTarget
172 target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
173 if target >= config.ShareTarget:
176 if target < networkTarget:
177 target = networkTarget
178 if config.DynamicTargetting == 2:
179 # Round target to a power of two :)
180 target = 2**int(log(target, 2) + 1) - 1
181 if target == config.ShareTarget:
183 if target != targetIn:
184 pfx = 'Retargetting %s' % (repr(username),)
185 tin = targetIn or config.ShareTarget
186 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
187 tgt = target or config.ShareTarget
188 getTarget.logger.debug("%s to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
189 userStatus[username] = [target, now, 0]
191 getTarget.logger = logging.getLogger('getTarget')
193 def RegisterWork(username, wli, wld):
195 target = getTarget(username, now)
196 wld = tuple(wld) + (target,)
197 workLog.setdefault(username, {})[wli] = (wld, now)
198 return target or config.ShareTarget
200 def getBlockHeader(username):
202 (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD[:6]
203 timestamp = pack('<L', int(time()))
204 hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
205 workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
206 target = RegisterWork(username, merkleRoot, MRD)
207 return (hdr, workLog[username][merkleRoot], target)
209 def getBlockTemplate(username):
211 (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
212 wliPos = coinbase[0] + 2
213 wliLen = coinbase[wliPos - 1]
214 wli = coinbase[wliPos:wliPos+wliLen]
215 target = RegisterWork(username, wli, MC)
216 return (MC, workLog[username][wli], target)
223 from bitcoin.varlen import varlenEncode, varlenDecode
225 def assembleBlock(blkhdr, txlist):
227 payload += varlenEncode(len(txlist))
232 def blockSubmissionThread(payload, blkhash):
233 myblock = (blkhash, payload[4:36])
234 payload = b2a_hex(payload).decode('ascii')
238 rv = UpstreamBitcoindJSONRPC.submitblock(payload)
242 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
252 # FIXME: This will show "Method not found" on pre-BIP22 servers
253 RaiseRedFlags(traceback.format_exc())
255 if MM.currentBlock[0] not in myblock:
256 RaiseRedFlags('Giving up on submitting block upstream')
259 # FIXME: The returned value could be a list of multiple responses
260 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
262 def checkShare(share):
263 shareTime = share['time'] = time()
267 (prevBlock, height, bits) = MM.currentBlock
268 sharePrevBlock = data[4:36]
269 if sharePrevBlock != prevBlock:
270 if sharePrevBlock == MM.lastBlock[0]:
271 raise RejectedShare('stale-prevblk')
272 raise RejectedShare('bad-prevblk')
275 username = share['username']
276 if username not in workLog:
277 raise RejectedShare('unknown-user')
279 if data[72:76] != bits:
280 raise RejectedShare('bad-diffbits')
282 # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
283 # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
284 if data[1:4] != b'\0\0\0' or data[0] > 2:
285 raise RejectedShare('bad-version')
287 shareMerkleRoot = data[36:68]
288 if 'blkdata' in share:
289 pl = share['blkdata']
290 (txncount, pl) = varlenDecode(pl)
291 cbtxn = bitcoin.txn.Txn(pl)
292 cbtxn.disassemble(retExtra=True)
293 coinbase = cbtxn.getCoinbase()
294 wliPos = coinbase[0] + 2
295 wliLen = coinbase[wliPos - 1]
296 wli = coinbase[wliPos:wliPos+wliLen]
300 wli = shareMerkleRoot
304 MWL = workLog[username]
306 raise RejectedShare('unknown-work')
307 (wld, issueT) = MWL[wli]
310 if data in DupeShareHACK:
311 raise RejectedShare('duplicate')
312 DupeShareHACK[data] = None
314 blkhash = dblsha(data)
315 if blkhash[28:] != b'\0\0\0\0':
316 raise RejectedShare('H-not-zero')
317 blkhashn = hash2int(blkhash)
320 logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
321 logfunc('BLKHASH: %64x' % (blkhashn,))
322 logfunc(' TARGET: %64x' % (networkTarget,))
324 workMerkleTree = wld[1]
325 workCoinbase = wld[2]
328 # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
329 txlist = workMerkleTree.data
330 txlist = [deepcopy(txlist[0]),] + txlist[1:]
332 cbtxn.setCoinbase(workCoinbase)
335 if blkhashn <= networkTarget:
336 logfunc("Submitting upstream")
337 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
339 payload = assembleBlock(data, txlist)
341 payload = share['data'] + share['blkdata']
342 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
344 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
345 bcnode.submitBlock(payload)
346 share['upstreamResult'] = True
347 MM.updateBlock(blkhash)
350 if gotwork and blkhashn <= config.GotWorkTarget:
352 coinbaseMrkl = cbtxn.data
353 coinbaseMrkl += blkhash
354 steps = workMerkleTree._steps
355 coinbaseMrkl += pack('B', len(steps))
358 coinbaseMrkl += b"\0\0\0\0"
360 info['hash'] = b2a_hex(blkhash).decode('ascii')
361 info['header'] = b2a_hex(data).decode('ascii')
362 info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
363 thr = threading.Thread(target=submitGotwork, args=(info,))
367 checkShare.logger.warning('Failed to build gotwork request')
369 if workTarget is None:
370 workTarget = config.ShareTarget
371 if blkhashn > workTarget:
372 raise RejectedShare('high-hash')
373 share['target'] = workTarget
374 share['_targethex'] = '%064x' % (workTarget,)
376 shareTimestamp = unpack('<L', data[68:72])[0]
377 if shareTime < issueT - 120:
378 raise RejectedShare('stale-work')
379 if shareTimestamp < shareTime - 300:
380 raise RejectedShare('time-too-old')
381 if shareTimestamp > shareTime + 7200:
382 raise RejectedShare('time-too-new')
384 if config.DynamicTargetting and username in userStatus:
385 # NOTE: userStatus[username] only doesn't exist across restarts
386 status = userStatus[username]
387 target = status[0] or config.ShareTarget
388 if target == workTarget:
389 userStatus[username][2] += 1
391 userStatus[username][2] += float(target) / workTarget
394 cbpre = cbtxn.getCoinbase()
395 cbpreLen = len(cbpre)
396 if coinbase[:cbpreLen] != cbpre:
397 raise RejectedShare('bad-cb-prefix')
399 # Filter out known "I support" flags, to prevent exploits
400 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
401 if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
402 raise RejectedShare('bad-cb-flag')
404 if len(coinbase) > 100:
405 raise RejectedShare('bad-cb-length')
407 cbtxn.setCoinbase(coinbase)
409 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
410 raise RejectedShare('bad-txnmrklroot')
412 allowed = assembleBlock(data, txlist)
413 if allowed != share['data'] + share['blkdata']:
414 raise RejectedShare('bad-txns')
415 checkShare.logger = logging.getLogger('checkShare')
417 def receiveShare(share):
418 # TODO: username => userid
421 except RejectedShare as rej:
422 share['rejectReason'] = str(rej)
425 if '_origdata' in share:
426 share['solution'] = share['_origdata']
428 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
429 for i in loggersShare:
432 def newBlockNotification():
433 logging.getLogger('newBlockNotification').info('Received new block notification')
434 MM.updateMerkleTree()
435 # TODO: Force RESPOND TO LONGPOLLS?
438 def newBlockNotificationSIGNAL(signum, frame):
439 # Use a new thread, in case the signal handler is called with locks held
440 thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
444 from signal import signal, SIGUSR1
445 signal(SIGUSR1, newBlockNotificationSIGNAL)
453 from time import sleep
456 SAVE_STATE_FILENAME = 'eloipool.worklog'
459 logger = logging.getLogger('stopServers')
461 if hasattr(stopServers, 'already'):
462 logger.debug('Already tried to stop servers before')
464 stopServers.already = True
466 logger.info('Stopping servers...')
467 global bcnode, server
468 servers = (bcnode, server)
475 logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
481 sl.append(s.__class__.__name__)
486 logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
491 for fd in s._fd.keys():
494 def saveState(t = None):
495 logger = logging.getLogger('saveState')
497 # Then, save data needed to resume work
498 logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
502 with open(SAVE_STATE_FILENAME, 'wb') as f:
504 pickle.dump(DupeShareHACK, f)
505 pickle.dump(workLog, f)
510 logger.error('Failed to save work\n' + traceback.format_exc())
512 os.unlink(SAVE_STATE_FILENAME)
514 logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
520 logging.getLogger('exit').info('Goodbye...')
521 os.kill(os.getpid(), signal.SIGTERM)
528 logging.getLogger('restart').info('Restarting...')
530 os.execv(sys.argv[0], sys.argv)
532 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
535 if not os.path.exists(SAVE_STATE_FILENAME):
538 global workLog, DupeShareHACK
540 logger = logging.getLogger('restoreState')
541 s = os.stat(SAVE_STATE_FILENAME)
542 logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
544 with open(SAVE_STATE_FILENAME, 'rb') as f:
548 # Future formats, not supported here
552 # Old format, from 2012-02-02 to 2012-02-03
557 if isinstance(t, dict):
558 # Old format, from 2012-02-03 to 2012-02-03
562 # Current format, from 2012-02-03 onward
563 DupeShareHACK = pickle.load(f)
565 if t + 120 >= time():
566 workLog = pickle.load(f)
568 logger.debug('Skipping restore of expired workLog')
570 logger.error('Failed to restore state\n' + traceback.format_exc())
572 logger.info('State restored successfully')
574 logger.info('Total downtime: %g seconds' % (time() - t,))
577 from jsonrpcserver import JSONRPCListener, JSONRPCServer
578 import interactivemode
579 from networkserver import NetworkListener
584 if __name__ == "__main__":
585 if not hasattr(config, 'ShareLogging'):
586 config.ShareLogging = ()
587 if hasattr(config, 'DbOptions'):
588 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
589 config.ShareLogging = list(config.ShareLogging)
590 config.ShareLogging.append( {
592 'engine': 'postgres',
593 'dbopts': config.DbOptions,
594 'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
596 for i in config.ShareLogging:
597 if not hasattr(i, 'keys'):
599 logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
600 if name == 'postgres':
603 'engine': 'postgres',
604 'dbopts': parameters,
606 elif name == 'logfile':
608 i['thropts'] = parameters
609 if 'filename' in parameters:
610 i['filename'] = parameters['filename']
611 i['thropts'] = dict(i['thropts'])
612 del i['thropts']['filename']
620 fp, pathname, description = imp.find_module(name, sharelogging.__path__)
621 m = imp.load_module(name, fp, pathname, description)
622 lo = getattr(m, name)(**parameters)
623 loggersShare.append(lo.logShare)
625 logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name, sys.exc_info())
628 if not hasattr(config, 'BitcoinNodeAddresses'):
629 config.BitcoinNodeAddresses = ()
630 for a in config.BitcoinNodeAddresses:
631 LSbc.append(NetworkListener(bcnode, a))
633 if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
634 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
636 import jsonrpc_getblocktemplate
637 import jsonrpc_getwork
638 import jsonrpc_setworkaux
640 server = JSONRPCServer()
641 if hasattr(config, 'JSONRPCAddress'):
642 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
643 if not hasattr(config, 'JSONRPCAddresses'):
644 config.JSONRPCAddresses = []
645 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
647 for a in config.JSONRPCAddresses:
648 LS.append(JSONRPCListener(server, a))
649 if hasattr(config, 'SecretUser'):
650 server.SecretUser = config.SecretUser
651 server.aux = MM.CoinbaseAux
652 server.getBlockHeader = getBlockHeader
653 server.getBlockTemplate = getBlockTemplate
654 server.receiveShare = receiveShare
655 server.RaiseRedFlags = RaiseRedFlags
656 server.ShareTarget = config.ShareTarget
658 if hasattr(config, 'TrustedForwarders'):
659 server.TrustedForwarders = config.TrustedForwarders
660 server.ServerName = config.ServerName
666 bcnode_thr = threading.Thread(target=bcnode.serve_forever)
667 bcnode_thr.daemon = True
670 server.serve_forever()