1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012 Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU Affero General Public License for more details.
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 from binascii import b2a_hex
18 from bitcoin.script import countSigOps
19 from bitcoin.txn import Txn
20 from collections import deque
21 from copy import deepcopy
22 from queue import Queue
26 from merkletree import MerkleTree
27 from struct import pack
29 from time import sleep, time
32 _makeCoinbase = [0, 0]
34 class merkleMaker(threading.Thread):
43 'transactions/remove',
49 def __init__(self, *a, **k):
50 super().__init__(*a, **k)
52 self.logger = logging.getLogger('merkleMaker')
53 self.CoinbasePrefix = b''
55 self.isOverflowed = False
57 self.MinimumTxnUpdateWait = 5
61 self.access = jsonrpc.ServiceProxy(self.UpstreamURI)
63 self.currentBlock = (None, None)
64 self.currentMerkleTree = None
65 self.merkleRoots = deque(maxlen=self.WorkQueueSizeRegular[1])
66 self.LowestMerkleRoots = self.WorkQueueSizeRegular[1]
67 self.clearMerkleTree = MerkleTree([self.clearCoinbaseTxn])
68 self.clearMerkleRoots = Queue(self.WorkQueueSizeLongpoll[1])
69 self.LowestClearMerkleRoots = self.WorkQueueSizeLongpoll[1]
71 if not hasattr(self, 'WarningDelay'):
72 self.WarningDelay = max(15, self.MinimumTxnUpdateWait * 2)
73 if not hasattr(self, 'WarningDelayTxnLongpoll'):
74 self.WarningDelayTxnLongpoll = self.WarningDelay
75 if not hasattr(self, 'WarningDelayMerkleUpdate'):
76 self.WarningDelayMerkleUpdate = self.WarningDelay
78 self.lastMerkleUpdate = 0
79 self.nextMerkleUpdate = 0
82 self.updateMerkleTree()
84 def updateBlock(self, newBlock, bits = None, _HBH = None):
85 if newBlock == self.currentBlock[0]:
86 if bits in (None, self.currentBlock[1]):
88 self.logger.error('Was working on block with wrong specs: %s (bits: %s->%s)' % (
89 b2a_hex(newBlock[::-1]).decode('utf8'),
90 b2a_hex(self.currentBlock[1][::-1]).decode('utf8'),
91 b2a_hex(bits[::-1]).decode('utf8'),
95 bits = self.currentBlock[1]
97 _HBH = (b2a_hex(newBlock[::-1]).decode('utf8'), b2a_hex(bits[::-1]).decode('utf8'))
98 self.logger.info('New block: %s (bits: %s)' % _HBH)
99 self.merkleRoots.clear()
100 self.currentMerkleTree = self.clearMerkleTree
101 if self.currentBlock[0] != newBlock:
102 self.lastBlock = self.currentBlock
103 self.currentBlock = (newBlock, bits)
107 def _trimBlock(self, MP, txnlist, txninfo, floodn, msgf):
108 fee = txninfo[-1].get('fee', None)
110 raise self._floodCritical(now, floodn, doin=msgf('fees unknown'))
112 # FIXME: coinbasevalue is *not* guaranteed to exist here
113 MP['coinbasevalue'] -= fee
120 def _APOT(self, txninfopot, MP, POTInfo):
123 for txn in txninfopot:
124 if txn.get('fee') is None:
125 self._floodWarning(now, 'APOT-No-Fees', doin='Upstream didn\'t provide fee information required for aggressive POT', logf=self.logger.info)
129 feesTrimmed += txn['fee']
131 MP['coinbasevalue'] -= feesTrimmed
133 POTInfo[2] = [feeTxnsTrimmed, feesTrimmed]
134 self._floodWarning(now, 'POT-Trimming-Fees', doin='Aggressive POT trimming %d transactions with %d.%08d BTC total fees' % (feeTxnsTrimmed, feesTrimmed//100000000, feesTrimmed % 100000000), logf=self.logger.debug)
138 def _makeBlockSafe(self, MP, txnlist, txninfo):
139 blocksize = sum(map(len, txnlist)) + 80
140 while blocksize > 934464: # 1 "MB" limit - 64 KB breathing room
141 txnsize = len(txnlist[-1])
142 self._trimBlock(MP, txnlist, txninfo, 'SizeLimit', lambda x: 'Making blocks over 1 MB size limit (%d bytes; %s)' % (blocksize, x))
145 # NOTE: This check doesn't work at all without BIP22 transaction obj format
146 blocksigops = sum(a.get('sigops', 0) for a in txninfo)
147 while blocksigops > 19488: # 20k limit - 0x200 breathing room
148 txnsigops = txninfo[-1]['sigops']
149 self._trimBlock(MP, txnlist, txninfo, 'SigOpLimit', lambda x: 'Making blocks over 20k SigOp limit (%d; %s)' % (blocksigops, x))
150 blocksigops -= txnsigops
152 POTMode = getattr(self, 'POT', 1)
153 txncount = len(txnlist) + 1
155 feetxncount = txncount
156 for i in range(txncount - 2, -1, -1):
157 if 'fee' not in txninfo[i] or txninfo[i]['fee']:
161 if getattr(self, 'Greedy', None):
162 # Aim to cut off extra zero-fee transactions on the end
163 # NOTE: not cutting out ones intermixed, in case of dependencies
164 idealtxncount = feetxncount
166 idealtxncount = txncount
168 pot = 2**int(log(idealtxncount, 2))
169 POTInfo = MP['POTInfo'] = [[idealtxncount, feetxncount, txncount], [pot, None], None]
170 if pot < idealtxncount:
171 if pot * 2 <= txncount:
173 elif pot >= feetxncount:
175 elif POTMode > 1 and self._APOT(txninfo[pot-1:], MP, POTInfo):
176 # Trimmed even transactions with fees
180 self._floodWarning(now, 'Non-POT', doin='Making merkle tree with %d transactions (ideal: %d; max: %d)' % (pot, idealtxncount, txncount))
186 def updateMerkleTree(self):
188 self.logger.debug('Polling bitcoind for memorypool')
189 self.nextMerkleUpdate = now + self.TxnUpdateRetryWait
192 MP = self.access.getmemorypool(self.GMPReq)
198 oMP = self.access.getmemorypool()
202 # This way, we get the error from the BIP22 call if the old one fails too
205 # Pre-BIP22 server (bitcoind <0.7 or Eloipool <20120513)
208 self.logger.warning('Upstream server is not BIP 22 compliant')
209 MP = oMP or self.access.getmemorypool()
213 prevBlock = bytes.fromhex(MP['previousblockhash'])[::-1]
214 bits = bytes.fromhex(MP['bits'])[::-1]
215 if (prevBlock, bits) != self.currentBlock:
216 self.updateBlock(prevBlock, bits, _HBH=(MP['previousblockhash'], MP['bits']))
218 txnlist = MP['transactions']
219 if len(txnlist) and isinstance(txnlist[0], dict):
221 txnlist = tuple(a['data'] for a in txnlist)
224 elif 'transactionfees' in MP:
225 # Backward compatibility with pre-BIP22 gmp_fees branch
226 txninfo = [{'fee':a} for a in MP['transactionfees']]
228 # Backward compatibility with pre-BIP22 hex-only (bitcoind <0.7, Eloipool <future)
229 txninfo = [{}] * len(txnlist)
230 # TODO: cache Txn or at least txid from previous merkle roots?
231 txnlist = [a for a in map(bytes.fromhex, txnlist)]
233 self._makeBlockSafe(MP, txnlist, txninfo)
235 cbtxn = self.makeCoinbaseTxn(MP['coinbasevalue'])
236 cbtxn.setCoinbase(b'\0\0')
238 txnlist.insert(0, cbtxn.data)
240 txnlist = [a for a in map(Txn, txnlist[1:])]
241 txnlist.insert(0, cbtxn)
242 txnlist = list(txnlist)
243 newMerkleTree = MerkleTree(txnlist)
244 if newMerkleTree.merkleRoot() != self.currentMerkleTree.merkleRoot():
245 newMerkleTree.POTInfo = MP.get('POTInfo')
246 newMerkleTree.oMP = oMP
247 self.logger.debug('Updating merkle tree')
248 self.currentMerkleTree = newMerkleTree
249 self.lastMerkleUpdate = now
250 self.nextMerkleUpdate = now + self.MinimumTxnUpdateWait
252 if self.needMerkle == 2:
254 self.needMerkleSince = now
256 def makeCoinbase(self):
258 if now > _makeCoinbase[0]:
259 _makeCoinbase[0] = now
262 _makeCoinbase[1] += 1
263 rv = self.CoinbasePrefix
264 rv += pack('>L', now) + pack('>Q', _makeCoinbase[1]).lstrip(b'\0')
265 # NOTE: Not using varlenEncode, since this is always guaranteed to be < 100
266 rv = bytes( (len(rv),) ) + rv
267 for v in self.CoinbaseAux.values():
271 if self.overflowed < t - 300:
272 self.logger.warning('Overflowing coinbase data! %d bytes long' % (len(rv),))
274 self.isOverflowed = True
277 self.isOverflowed = False
280 def makeMerkleRoot(self, merkleTree):
281 cbtxn = merkleTree.data[0]
282 cb = self.makeCoinbase()
283 cbtxn.setCoinbase(cb)
285 merkleRoot = merkleTree.merkleRoot()
286 return (merkleRoot, merkleTree, cb)
289 def _doing(self, what):
290 if self._doing_last == what:
295 self.logger.debug("Switching from (%4dx in %5.3f seconds) %s => %s" % (self._doing_i, now - self._doing_s, self._doing_last, what))
296 self._doing_last = what
300 def _floodWarning(self, now, wid, wmsgf = None, doin = True, logf = None):
302 doin = self._doing_last
304 return lambda: "%s (doing %s)" % (f(), doin)
306 winfo = self.lastWarning.setdefault(wid, [0, None])
307 (lastTime, lastDoing) = winfo
308 if now <= lastTime + max(5, self.MinimumTxnUpdateWait) and doin == lastDoing:
314 logf = self.logger.warning
315 logf(wmsgf() if wmsgf else doin)
317 def merkleMaker_I(self):
320 # First, update merkle tree if we haven't for a while and aren't crunched for time
322 if self.nextMerkleUpdate <= now and self.clearMerkleRoots.qsize() > self.WorkQueueSizeLongpoll[0] and len(self.merkleRoots) > self.WorkQueueSizeRegular[0]:
323 self.updateMerkleTree()
324 # Next, fill up the longpoll queue first, since it can be used as a failover for the main queue
325 elif not self.clearMerkleRoots.full():
326 self._doing('blank merkle roots')
327 self.clearMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree))
328 # Next, fill up the main queue (until they're all current)
329 elif len(self.merkleRoots) < self.WorkQueueSizeRegular[1] or self.merkleRoots[0][1] != self.currentMerkleTree:
330 if self.needMerkle == 1 and len(self.merkleRoots) >= self.WorkQueueSizeRegular[1]:
332 self.needMerkle = False
333 self._doing('regular merkle roots')
334 self.merkleRoots.append(self.makeMerkleRoot(self.currentMerkleTree))
336 if self.needMerkle == 1:
338 self.needMerkle = False
340 # TODO: rather than sleepspin, block until MinimumTxnUpdateWait expires or threading.Condition(?)
341 sleep(self.IdleSleepTime)
342 if self.needMerkle == 1 and now > self.needMerkleSince + self.WarningDelayTxnLongpoll:
343 self._floodWarning(now, 'NeedMerkle', lambda: 'Transaction-longpoll requested %d seconds ago, and still not ready. Is your server fast enough to keep up with your configured WorkQueueSizeRegular maximum?' % (now - self.needMerkleSince,))
344 if now > self.nextMerkleUpdate + self.WarningDelayMerkleUpdate:
345 self._floodWarning(now, 'MerkleUpdate', lambda: "Haven't updated the merkle tree in at least %d seconds! Is your server fast enough to keep up with your configured work queue minimums?" % (now - self.lastMerkleUpdate,))
352 self.logger.critical(traceback.format_exc())
354 def start(self, *a, **k):
356 super().start(*a, **k)
359 (prevBlock, bits) = self.currentBlock
361 MRD = self.merkleRoots.pop()
362 self.LowestMerkleRoots = min(len(self.merkleRoots), self.LowestMerkleRoots)
365 qsz = self.clearMerkleRoots.qsize()
367 self.logger.warning('clearMerkleRoots running out! only %d left' % (qsz,))
368 MRD = self.clearMerkleRoots.get()
369 self.LowestClearMerkleRoots = min(self.clearMerkleRoots.qsize(), self.LowestClearMerkleRoots)
371 (merkleRoot, merkleTree, cb) = MRD
372 return (merkleRoot, merkleTree, cb, prevBlock, bits, rollPrevBlk)
375 (prevBlock, bits) = self.currentBlock
376 mt = self.currentMerkleTree
377 cb = self.makeCoinbase()
378 return (None, mt, cb, prevBlock, bits)
385 reallogger = MM.logger
388 def critical(self, *a):
389 if self.LO > 1: return
390 reallogger.critical(*a)
391 def warning(self, *a):
393 reallogger.warning(*a)
396 MM.logger = fakelogger()
400 # _makeBlockSafe tests
401 from copy import deepcopy
405 txnlist = [b'\0', b'\x01', b'\x02']
406 txninfo = [{'fee':0, 'sigops':1}, {'fee':5, 'sigops':10000}, {'fee':0, 'sigops':10001}]
408 m = deepcopy( (MP, txnlist, txninfo) )
411 MM._makeBlockSafe(*m)
416 assert LO < 2 # An expected error wasn't thrown
417 if 'POTInfo' in m[0]:
421 assert MBS() == (MP, txnlist[:2], txninfo[:2])
422 txninfo[2]['fee'] = 1
424 MPx['coinbasevalue'] -= 1
425 assert MBS() == (MPx, txnlist[:2], txninfo[:2])
426 txninfo[2]['sigops'] = 1
427 assert MBS(1) == (MP, txnlist, txninfo)
430 txnlist.append(b'\x03')
431 txninfo.append({'fee':1, 'sigops':0})
433 MPx['coinbasevalue'] -= 1
434 assert MBS() == (MPx, txnlist[:3], txninfo[:3])