2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 if not hasattr(config, 'ServerName'):
21 config.ServerName = 'Unnamed Eloipool'
23 if not hasattr(config, 'ShareTarget'):
24 config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
29 if len(logging.root.handlers) == 0:
31 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
34 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35 logging.getLogger(infoOnly).setLevel(logging.INFO)
37 def RaiseRedFlags(reason):
38 logging.getLogger('redflag').critical(reason)
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
60 if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
63 cmd = config.CoinbaserCmd
64 cmd = cmd.replace('%d', str(coinbaseValue))
65 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66 nout = int(p.stdout.readline())
68 amount = int(p.stdout.readline())
69 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70 pkScript = BitcoinScript.toAddress(addr)
71 txn.addOutput(amount, pkScript)
74 coinbased = coinbaseValue + 1
75 if coinbased >= coinbaseValue:
76 logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
79 coinbaseValue -= coinbased
81 pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82 txn.addOutput(coinbaseValue, pkScript)
85 # TODO: red flag on dupe coinbase
89 import jsonrpc_getwork
90 from util import Bits2Target
104 jsonrpc_getwork._CheckForDupesHACK = {}
105 global MM, networkTarget, server
106 bits = MM.currentBlock[2]
110 networkTarget = Bits2Target(bits)
115 from merklemaker import merkleMaker
117 MM.__dict__.update(config.__dict__)
118 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False) # FIXME
119 MM.clearCoinbaseTxn.assemble()
120 MM.makeCoinbaseTxn = makeCoinbaseTxn
121 MM.onBlockChange = blockChanged
122 MM.onBlockUpdate = updateBlocks
125 from binascii import b2a_hex
126 from copy import deepcopy
127 from math import ceil, log
128 from struct import pack, unpack
130 from time import time
131 from util import RejectedShare, dblsha, hash2int, swap32
136 if hasattr(config, 'GotWorkURI'):
137 gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
139 if not hasattr(config, 'DynamicTargetting'):
140 config.DynamicTargetting = 0
142 config.DynamicTargetGoal *= 2
144 def submitGotwork(info):
146 gotwork.gotwork(info)
148 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
150 def getTarget(username, now):
151 if not config.DynamicTargetting:
153 if username in userStatus:
154 status = userStatus[username]
156 userStatus[username] = [None, now, 0]
158 (targetIn, lastUpdate, work) = status
159 if work <= config.DynamicTargetGoal:
160 if now < lastUpdate + 120 and (targetIn is None or targetIn >= networkTarget):
164 getTarget.logger.debug("No shares from '%s', resetting to minimum target")
165 userStatus[username] = [None, now, 0]
168 deltaSec = now - lastUpdate
169 target = targetIn or config.ShareTarget
170 target = int(target * config.DynamicTargetGoal * deltaSec / 120 / work)
171 if target >= config.ShareTarget:
174 if target < networkTarget:
175 target = networkTarget
176 if config.DynamicTargetting == 2:
177 # Round target to a power of two :)
178 truebits = log(target, 2)
179 if target <= 2**int(truebits):
180 # Workaround for bug in Python's math.log function
181 truebits = int(truebits)
182 target = 2**ceil(truebits) - 1
183 if target == config.ShareTarget:
185 if target != targetIn:
186 pfx = 'Retargetting %s' % (repr(username),)
187 getTarget.logger.debug("%s from: %064x" % (pfx, targetIn or config.ShareTarget,))
188 getTarget.logger.debug("%s to: %064x" % (pfx, target or config.ShareTarget,))
189 userStatus[username] = [target, now, 0]
191 getTarget.logger = logging.getLogger('getTarget')
193 def RegisterWork(username, wli, wld):
195 target = getTarget(username, now)
196 wld = tuple(wld) + (target,)
197 workLog.setdefault(username, {})[wli] = (wld, now)
198 return target or config.ShareTarget
200 def getBlockHeader(username):
202 (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD[:6]
203 timestamp = pack('<L', int(time()))
204 hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
205 workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
206 target = RegisterWork(username, merkleRoot, MRD)
207 return (hdr, workLog[username][merkleRoot], target)
209 def getBlockTemplate(username):
211 (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
212 wliPos = coinbase[0] + 2
213 wliLen = coinbase[wliPos - 1]
214 wli = coinbase[wliPos:wliPos+wliLen]
215 target = RegisterWork(username, wli, MC)
216 return (MC, workLog[username][wli], target)
223 from bitcoin.varlen import varlenEncode, varlenDecode
225 def assembleBlock(blkhdr, txlist):
227 payload += varlenEncode(len(txlist))
232 def blockSubmissionThread(payload, blkhash):
233 myblock = (blkhash, payload[4:36])
234 payload = b2a_hex(payload).decode('ascii')
238 rv = UpstreamBitcoindJSONRPC.submitblock(payload)
242 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
252 # FIXME: This will show "Method not found" on pre-BIP22 servers
253 RaiseRedFlags(traceback.format_exc())
255 if MM.currentBlock[0] not in myblock:
256 RaiseRedFlags('Giving up on submitting block upstream')
259 # FIXME: The returned value could be a list of multiple responses
260 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
262 def checkShare(share):
263 shareTime = share['time'] = time()
267 (prevBlock, height, bits) = MM.currentBlock
268 sharePrevBlock = data[4:36]
269 if sharePrevBlock != prevBlock:
270 if sharePrevBlock == MM.lastBlock[0]:
271 raise RejectedShare('stale-prevblk')
272 raise RejectedShare('bad-prevblk')
275 username = share['username']
276 if username not in workLog:
277 raise RejectedShare('unknown-user')
279 if data[72:76] != bits:
280 raise RejectedShare('bad-diffbits')
282 # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
283 # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
284 if data[1:4] != b'\0\0\0' or data[0] > 2:
285 raise RejectedShare('bad-version')
287 shareMerkleRoot = data[36:68]
288 if 'blkdata' in share:
289 pl = share['blkdata']
290 (txncount, pl) = varlenDecode(pl)
291 cbtxn = bitcoin.txn.Txn(pl)
292 cbtxn.disassemble(retExtra=True)
293 coinbase = cbtxn.getCoinbase()
294 wliPos = coinbase[0] + 2
295 wliLen = coinbase[wliPos - 1]
296 wli = coinbase[wliPos:wliPos+wliLen]
300 wli = shareMerkleRoot
304 MWL = workLog[username]
306 raise RejectedShare('unknown-work')
307 (wld, issueT) = MWL[wli]
310 if data in DupeShareHACK:
311 raise RejectedShare('duplicate')
312 DupeShareHACK[data] = None
314 blkhash = dblsha(data)
315 if blkhash[28:] != b'\0\0\0\0':
316 raise RejectedShare('H-not-zero')
317 blkhashn = hash2int(blkhash)
320 logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
321 logfunc('BLKHASH: %64x' % (blkhashn,))
322 logfunc(' TARGET: %64x' % (networkTarget,))
324 workMerkleTree = wld[1]
325 workCoinbase = wld[2]
328 # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
329 txlist = workMerkleTree.data
330 txlist = [deepcopy(txlist[0]),] + txlist[1:]
332 cbtxn.setCoinbase(workCoinbase)
335 if blkhashn <= networkTarget:
336 logfunc("Submitting upstream")
337 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
339 payload = assembleBlock(data, txlist)
341 payload = share['data'] + share['blkdata']
342 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
344 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
345 bcnode.submitBlock(payload)
346 share['upstreamResult'] = True
347 MM.updateBlock(blkhash)
350 if gotwork and blkhashn <= config.GotWorkTarget:
352 coinbaseMrkl = cbtxn.data
353 coinbaseMrkl += blkhash
354 steps = workMerkleTree._steps
355 coinbaseMrkl += pack('B', len(steps))
358 coinbaseMrkl += b"\0\0\0\0"
360 info['hash'] = b2a_hex(blkhash).decode('ascii')
361 info['header'] = b2a_hex(data).decode('ascii')
362 info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
363 thr = threading.Thread(target=submitGotwork, args=(info,))
367 checkShare.logger.warning('Failed to build gotwork request')
369 if workTarget is None:
370 workTarget = config.ShareTarget
371 if blkhashn > workTarget:
372 raise RejectedShare('high-hash')
373 share['target'] = workTarget
374 share['_targethex'] = '%064x' % (workTarget,)
376 shareTimestamp = unpack('<L', data[68:72])[0]
377 if shareTime < issueT - 120:
378 raise RejectedShare('stale-work')
379 if shareTimestamp < shareTime - 300:
380 raise RejectedShare('time-too-old')
381 if shareTimestamp > shareTime + 7200:
382 raise RejectedShare('time-too-new')
384 if config.DynamicTargetting and username in userStatus:
385 # NOTE: userStatus[username] only doesn't exist across restarts
386 status = userStatus[username]
387 target = status[0] or config.ShareTarget
388 if target == workTarget:
389 userStatus[username][2] += 1
391 userStatus[username][2] += float(target) / workTarget
394 cbpre = cbtxn.getCoinbase()
395 cbpreLen = len(cbpre)
396 if coinbase[:cbpreLen] != cbpre:
397 raise RejectedShare('bad-cb-prefix')
399 # Filter out known "I support" flags, to prevent exploits
400 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
401 if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
402 raise RejectedShare('bad-cb-flag')
404 if len(coinbase) > 100:
405 raise RejectedShare('bad-cb-length')
407 cbtxn.setCoinbase(coinbase)
409 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
410 raise RejectedShare('bad-txnmrklroot')
412 allowed = assembleBlock(data, txlist)
413 if allowed != share['data'] + share['blkdata']:
414 raise RejectedShare('bad-txns')
415 checkShare.logger = logging.getLogger('checkShare')
417 def receiveShare(share):
418 # TODO: username => userid
421 except RejectedShare as rej:
422 share['rejectReason'] = str(rej)
425 if '_origdata' in share:
426 share['solution'] = share['_origdata']
428 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
429 for i in loggersShare:
432 def newBlockNotification():
433 logging.getLogger('newBlockNotification').info('Received new block notification')
434 MM.updateMerkleTree()
435 # TODO: Force RESPOND TO LONGPOLLS?
438 def newBlockNotificationSIGNAL(signum, frame):
439 # Use a new thread, in case the signal handler is called with locks held
440 thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
444 from signal import signal, SIGUSR1
445 signal(SIGUSR1, newBlockNotificationSIGNAL)
453 from time import sleep
456 SAVE_STATE_FILENAME = 'eloipool.worklog'
459 logger = logging.getLogger('stopServers')
461 if hasattr(stopServers, 'already'):
462 logger.debug('Already tried to stop servers before')
464 stopServers.already = True
466 logger.info('Stopping servers...')
467 global bcnode, server
468 servers = (bcnode, server)
475 logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
481 sl.append(s.__class__.__name__)
486 logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
491 for fd in s._fd.keys():
494 def saveState(t = None):
495 logger = logging.getLogger('saveState')
497 # Then, save data needed to resume work
498 logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
502 with open(SAVE_STATE_FILENAME, 'wb') as f:
504 pickle.dump(DupeShareHACK, f)
505 pickle.dump(workLog, f)
510 logger.error('Failed to save work\n' + traceback.format_exc())
512 os.unlink(SAVE_STATE_FILENAME)
514 logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
520 logging.getLogger('exit').info('Goodbye...')
521 os.kill(os.getpid(), signal.SIGTERM)
528 logging.getLogger('restart').info('Restarting...')
530 os.execv(sys.argv[0], sys.argv)
532 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
535 if not os.path.exists(SAVE_STATE_FILENAME):
538 global workLog, DupeShareHACK
540 logger = logging.getLogger('restoreState')
541 s = os.stat(SAVE_STATE_FILENAME)
542 logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
544 with open(SAVE_STATE_FILENAME, 'rb') as f:
548 # Future formats, not supported here
552 # Old format, from 2012-02-02 to 2012-02-03
557 if isinstance(t, dict):
558 # Old format, from 2012-02-03 to 2012-02-03
562 # Current format, from 2012-02-03 onward
563 DupeShareHACK = pickle.load(f)
565 if t + 120 >= time():
566 workLog = pickle.load(f)
568 logger.debug('Skipping restore of expired workLog')
570 logger.error('Failed to restore state\n' + traceback.format_exc())
572 logger.info('State restored successfully')
574 logger.info('Total downtime: %g seconds' % (time() - t,))
577 from jsonrpcserver import JSONRPCListener, JSONRPCServer
578 import interactivemode
579 from networkserver import NetworkListener
584 if __name__ == "__main__":
585 if not hasattr(config, 'ShareLogging'):
586 config.ShareLogging = ()
587 if hasattr(config, 'DbOptions'):
588 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
589 config.ShareLogging = list(config.ShareLogging)
590 config.ShareLogging.append( {
592 'engine': 'postgres',
593 'dbopts': config.DbOptions,
594 'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
596 for i in config.ShareLogging:
597 if not hasattr(i, 'keys'):
599 logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
600 if name == 'postgres':
603 'engine': 'postgres',
604 'dbopts': parameters,
606 elif name == 'logfile':
608 i['thropts'] = parameters
609 if 'filename' in parameters:
610 i['filename'] = parameters['filename']
611 i['thropts'] = dict(i['thropts'])
612 del i['thropts']['filename']
620 fp, pathname, description = imp.find_module(name, sharelogging.__path__)
621 m = imp.load_module(name, fp, pathname, description)
622 lo = getattr(m, name)(**parameters)
623 loggersShare.append(lo.logShare)
625 logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name, sys.exc_info())
628 if not hasattr(config, 'BitcoinNodeAddresses'):
629 config.BitcoinNodeAddresses = ()
630 for a in config.BitcoinNodeAddresses:
631 LSbc.append(NetworkListener(bcnode, a))
633 if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
634 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
636 import jsonrpc_getblocktemplate
637 import jsonrpc_getwork
638 import jsonrpc_setworkaux
640 server = JSONRPCServer()
641 if hasattr(config, 'JSONRPCAddress'):
642 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
643 if not hasattr(config, 'JSONRPCAddresses'):
644 config.JSONRPCAddresses = []
645 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
647 for a in config.JSONRPCAddresses:
648 LS.append(JSONRPCListener(server, a))
649 if hasattr(config, 'SecretUser'):
650 server.SecretUser = config.SecretUser
651 server.aux = MM.CoinbaseAux
652 server.getBlockHeader = getBlockHeader
653 server.getBlockTemplate = getBlockTemplate
654 server.receiveShare = receiveShare
655 server.RaiseRedFlags = RaiseRedFlags
656 server.ShareTarget = config.ShareTarget
658 if hasattr(config, 'TrustedForwarders'):
659 server.TrustedForwarders = config.TrustedForwarders
660 server.ServerName = config.ServerName
666 bcnode_thr = threading.Thread(target=bcnode.serve_forever)
667 bcnode_thr.daemon = True
670 server.serve_forever()