2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU Affero General Public License for more details.
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25 logging.getLogger(infoOnly).setLevel(logging.INFO)
27 def RaiseRedFlags(reason):
28 logging.getLogger('redflag').critical(reason)
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
50 if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
53 cmd = config.CoinbaserCmd
54 cmd = cmd.replace('%d', str(coinbaseValue))
55 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56 nout = int(p.stdout.readline())
58 amount = int(p.stdout.readline())
59 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60 pkScript = BitcoinScript.toAddress(addr)
61 t.addOutput(amount, pkScript)
64 coinbased = coinbaseValue + 1
65 if coinbased >= coinbaseValue:
66 logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
69 coinbaseValue -= coinbased
71 pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72 t.addOutput(coinbaseValue, pkScript)
75 # TODO: red flag on dupe coinbase
79 from util import Bits2Target
93 global MM, networkTarget, server
94 networkTarget = Bits2Target(MM.currentBlock[1])
99 from time import sleep, time
102 def _WorkLogPruner_I(wl):
106 userwork = wl[username]
107 for wli in tuple(userwork.keys()):
108 if now > userwork[wli][1] + 120:
111 WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
113 def WorkLogPruner(wl):
119 WorkLogPruner.logger.error(traceback.format_exc())
120 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
123 from merklemaker import merkleMaker
125 MM.__dict__.update(config.__dict__)
126 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False) # FIXME
127 MM.clearCoinbaseTxn.assemble()
128 MM.makeCoinbaseTxn = makeCoinbaseTxn
129 MM.onBlockChange = blockChanged
130 MM.onBlockUpdate = updateBlocks
134 from binascii import b2a_hex
135 from copy import deepcopy
136 from struct import pack, unpack
137 from time import time
138 from util import RejectedShare, dblsha, hash2int
144 if hasattr(config, 'GotWorkURI'):
145 gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
147 def submitGotwork(info):
149 gotwork.gotwork(info)
151 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
154 if hasattr(config, 'DbOptions'):
156 db = psycopg2.connect(**config.DbOptions)
158 def getBlockHeader(username):
160 (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
161 timestamp = pack('<L', int(time()))
162 hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
163 workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
169 return 'Y' if b else 'N'
175 rem_host = share.get('remoteHost', '?')
176 username = share['username']
177 reason = share.get('rejectReason', None)
178 upstreamResult = share.get('upstreamResult', None)
179 solution = share['_origdata']
180 #solution = b2a_hex(solution).decode('utf8')
181 stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
182 params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
183 dbc.execute(stmt, params)
189 from bitcoin.varlen import varlenEncode
190 def assembleBlock(blkhdr, txlist):
192 payload += varlenEncode(len(txlist))
197 def blockSubmissionThread(payload):
200 UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
205 def checkShare(share):
208 (prevBlock, bits) = MM.currentBlock
209 sharePrevBlock = data[4:36]
210 if sharePrevBlock != prevBlock:
211 if sharePrevBlock == MM.lastBlock[0]:
212 raise RejectedShare('stale-prevblk')
213 raise RejectedShare('bad-prevblk')
215 shareMerkleRoot = data[36:68]
217 username = share['username']
218 if username not in workLog:
219 raise RejectedShare('unknown-user')
221 if data[72:76] != bits:
222 raise RejectedShare('bad-diffbits')
223 if data[:4] != b'\1\0\0\0':
224 raise RejectedShare('bad-version')
226 MWL = workLog[username]
227 if shareMerkleRoot not in MWL:
228 raise RejectedShare('unknown-work')
229 (MRD, t) = MWL[shareMerkleRoot]
232 if data in DupeShareHACK:
233 raise RejectedShare('duplicate')
234 DupeShareHACK[data] = None
236 shareTimestamp = unpack('<L', data[68:72])[0]
237 shareTime = share['time'] = time()
238 if shareTime < t - 120:
239 raise RejectedShare('stale-work')
240 if shareTimestamp < shareTime - 300:
241 raise RejectedShare('time-too-old')
242 if shareTimestamp > shareTime + 7200:
243 raise RejectedShare('time-too-new')
245 blkhash = dblsha(data)
246 if blkhash[28:] != b'\0\0\0\0':
247 raise RejectedShare('H-not-zero')
248 blkhashn = hash2int(blkhash)
251 logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
252 logfunc('BLKHASH: %64x' % (blkhashn,))
253 logfunc(' TARGET: %64x' % (networkTarget,))
257 t.setCoinbase(MRD[2])
260 if blkhashn <= networkTarget:
261 logfunc("Submitting upstream")
262 RBDs.append( deepcopy( (data, txlist) ) )
263 payload = assembleBlock(data, txlist)
264 logfunc('Real block payload: %s' % (payload,))
266 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
267 bcnode.submitBlock(payload)
268 share['upstreamResult'] = True
269 MM.updateBlock(blkhash)
272 if gotwork and blkhashn <= config.GotWorkTarget:
274 coinbaseMrkl = t.data
275 coinbaseMrkl += blkhash
276 steps = MRD[1]._steps
277 coinbaseMrkl += pack('B', len(steps))
280 coinbaseMrkl += b"\0\0\0\0"
282 info['hash'] = b2a_hex(blkhash).decode('ascii')
283 info['header'] = b2a_hex(data).decode('ascii')
284 info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
285 thr = threading.Thread(target=submitGotwork, args=(info,))
289 checkShare.logger.warning('Failed to build gotwork request')
292 checkShare.logger = logging.getLogger('checkShare')
294 def receiveShare(share):
295 # TODO: username => userid
298 except RejectedShare as rej:
299 share['rejectReason'] = str(rej)
304 def newBlockNotification(signum, frame):
305 logging.getLogger('newBlockNotification').info('Received new block notification')
306 MM.updateMerkleTree()
307 # TODO: Force RESPOND TO LONGPOLLS?
310 from signal import signal, SIGUSR1
311 signal(SIGUSR1, newBlockNotification)
319 from time import sleep
322 SAVE_STATE_FILENAME = 'eloipool.worklog'
325 logger = logging.getLogger('stopServers')
327 logger.info('Stopping servers...')
328 global bcnode, server
329 servers = (bcnode, server)
339 sl.append(s.__class__.__name__)
344 logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
349 for fd in s._fd.keys():
353 logger = logging.getLogger('saveState')
355 # Then, save data needed to resume work
356 logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
360 with open(SAVE_STATE_FILENAME, 'wb') as f:
361 pickle.dump( (workLog, DupeShareHACK), f )
366 logger.error('Failed to save work\n' + traceback.format_exc())
368 os.unlink(SAVE_STATE_FILENAME)
370 logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
375 logging.getLogger('exit').info('Goodbye...')
376 os.kill(os.getpid(), signal.SIGTERM)
382 logging.getLogger('restart').info('Restarting...')
384 os.execv(sys.argv[0], sys.argv)
386 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
389 if not os.path.exists(SAVE_STATE_FILENAME):
392 global workLog, DupeShareHACK
394 logger = logging.getLogger('restoreState')
395 logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
397 with open(SAVE_STATE_FILENAME, 'rb') as f:
398 data = pickle.load(f)
400 DupeShareHACK = data[1]
402 logger.error('Failed to restore state\n' + traceback.format_exc())
404 logger.info('State restored successfully')
407 from jsonrpcserver import JSONRPCListener, JSONRPCServer
408 import interactivemode
409 from networkserver import NetworkListener
412 if __name__ == "__main__":
414 if not hasattr(config, 'BitcoinNodeAddresses'):
415 config.BitcoinNodeAddresses = ()
416 for a in config.BitcoinNodeAddresses:
417 LSbc.append(NetworkListener(bcnode, a))
419 if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
420 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
422 server = JSONRPCServer()
423 if hasattr(config, 'JSONRPCAddress'):
424 if not hasattr(config, 'JSONRPCAddresses'):
425 config.JSONRPCAddresses = []
426 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
428 for a in config.JSONRPCAddresses:
429 LS.append(JSONRPCListener(server, a))
430 if hasattr(config, 'SecretUser'):
431 server.SecretUser = config.SecretUser
432 server.aux = MM.CoinbaseAux
433 server.getBlockHeader = getBlockHeader
434 server.receiveShare = receiveShare
435 server.RaiseRedFlags = RaiseRedFlags
439 prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
440 prune_thr.daemon = True
443 bcnode_thr = threading.Thread(target=bcnode.serve_forever)
444 bcnode_thr.daemon = True
447 server.serve_forever()