Save original getmemorypool response on merkletree objects, for debugging
[bitcoin:eloipool.git] / merklemaker.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 from bitcoin.script import countSigOps
19 from bitcoin.txn import Txn
20 from collections import deque
21 from copy import deepcopy
22 from queue import Queue
23 import jsonrpc
24 import logging
25 from math import log
26 from merkletree import MerkleTree
27 from struct import pack
28 import threading
29 from time import sleep, time
30 import traceback
31
32 _makeCoinbase = [0, 0]
33
34 class merkleMaker(threading.Thread):
35         OldGMP = None
36         GMPReq = {
37                 'capabilities': [
38                         'coinbasevalue',
39                         'coinbase/append',
40                         'coinbase',
41                         'generation',
42                         'time',
43                         'transactions/remove',
44                         'prevblock',
45                 ],
46                 'tx': 'obj',
47         }
48         
49         def __init__(self, *a, **k):
50                 super().__init__(*a, **k)
51                 self.daemon = True
52                 self.logger = logging.getLogger('merkleMaker')
53                 self.CoinbasePrefix = b''
54                 self.CoinbaseAux = {}
55                 self.isOverflowed = False
56                 self.lastWarning = {}
57                 self.MinimumTxnUpdateWait = 5
58                 self.overflowed = 0
59         
60         def _prepare(self):
61                 self.access = jsonrpc.ServiceProxy(self.UpstreamURI)
62                 
63                 self.currentBlock = (None, None)
64                 self.currentMerkleTree = None
65                 self.merkleRoots = deque(maxlen=self.WorkQueueSizeRegular[1])
66                 self.LowestMerkleRoots = self.WorkQueueSizeRegular[1]
67                 self.clearMerkleTree = MerkleTree([self.clearCoinbaseTxn])
68                 self.clearMerkleRoots = Queue(self.WorkQueueSizeLongpoll[1])
69                 self.LowestClearMerkleRoots = self.WorkQueueSizeLongpoll[1]
70                 
71                 if not hasattr(self, 'WarningDelay'):
72                         self.WarningDelay = max(15, self.MinimumTxnUpdateWait * 2)
73                 if not hasattr(self, 'WarningDelayTxnLongpoll'):
74                         self.WarningDelayTxnLongpoll = self.WarningDelay
75                 if not hasattr(self, 'WarningDelayMerkleUpdate'):
76                         self.WarningDelayMerkleUpdate = self.WarningDelay
77                 
78                 self.lastMerkleUpdate = 0
79                 self.nextMerkleUpdate = 0
80                 global now
81                 now = time()
82                 self.updateMerkleTree()
83         
84         def updateBlock(self, newBlock, bits = None, _HBH = None):
85                 if newBlock == self.currentBlock[0]:
86                         if bits in (None, self.currentBlock[1]):
87                                 return
88                         self.logger.error('Was working on block with wrong specs: %s (bits: %s->%s)' % (
89                                 b2a_hex(newBlock[::-1]).decode('utf8'),
90                                 b2a_hex(self.currentBlock[1][::-1]).decode('utf8'),
91                                 b2a_hex(bits[::-1]).decode('utf8'),
92                         ))
93                 
94                 if bits is None:
95                         bits = self.currentBlock[1]
96                 if _HBH is None:
97                         _HBH = (b2a_hex(newBlock[::-1]).decode('utf8'), b2a_hex(bits[::-1]).decode('utf8'))
98                 self.logger.info('New block: %s (bits: %s)' % _HBH)
99                 self.merkleRoots.clear()
100                 self.currentMerkleTree = self.clearMerkleTree
101                 if self.currentBlock[0] != newBlock:
102                         self.lastBlock = self.currentBlock
103                 self.currentBlock = (newBlock, bits)
104                 self.needMerkle = 2
105                 self.onBlockChange()
106         
107         def _trimBlock(self, MP, txnlist, txninfo, floodn, msgf):
108                 fee = txninfo[-1].get('fee', None)
109                 if fee is None:
110                         raise self._floodCritical(now, floodn, doin=msgf('fees unknown'))
111                 if fee:
112                         # FIXME: coinbasevalue is *not* guaranteed to exist here
113                         MP['coinbasevalue'] -= fee
114                 
115                 txnlist[-1:] = ()
116                 txninfo[-1:] = ()
117                 
118                 return True
119         
120         def _APOT(self, txninfopot, MP, POTInfo):
121                 feeTxnsTrimmed = 0
122                 feesTrimmed = 0
123                 for txn in txninfopot:
124                         if txn.get('fee') is None:
125                                 self._floodWarning(now, 'APOT-No-Fees', doin='Upstream didn\'t provide fee information required for aggressive POT', logf=self.logger.info)
126                                 return
127                         if not txn['fee']:
128                                 continue
129                         feesTrimmed += txn['fee']
130                         feeTxnsTrimmed += 1
131                 MP['coinbasevalue'] -= feesTrimmed
132                 
133                 POTInfo[2] = [feeTxnsTrimmed, feesTrimmed]
134                 self._floodWarning(now, 'POT-Trimming-Fees', doin='Aggressive POT trimming %d transactions with %d.%08d BTC total fees' % (feeTxnsTrimmed, feesTrimmed//100000000, feesTrimmed % 100000000), logf=self.logger.debug)
135                 
136                 return True
137         
138         def _makeBlockSafe(self, MP, txnlist, txninfo):
139                 blocksize = sum(map(len, txnlist)) + 80
140                 while blocksize > 934464:  # 1 "MB" limit - 64 KB breathing room
141                         txnsize = len(txnlist[-1])
142                         self._trimBlock(MP, txnlist, txninfo, 'SizeLimit', lambda x: 'Making blocks over 1 MB size limit (%d bytes; %s)' % (blocksize, x))
143                         blocksize -= txnsize
144                 
145                 # NOTE: This check doesn't work at all without BIP22 transaction obj format
146                 blocksigops = sum(a.get('sigops', 0) for a in txninfo)
147                 while blocksigops > 19488:  # 20k limit - 0x200 breathing room
148                         txnsigops = txninfo[-1]['sigops']
149                         self._trimBlock(MP, txnlist, txninfo, 'SigOpLimit', lambda x: 'Making blocks over 20k SigOp limit (%d; %s)' % (blocksigops, x))
150                         blocksigops -= txnsigops
151                 
152                 POTMode = getattr(self, 'POT', 1)
153                 txncount = len(txnlist) + 1
154                 if POTMode:
155                         feetxncount = txncount
156                         for i in range(txncount - 2, -1, -1):
157                                 if 'fee' not in txninfo[i] or txninfo[i]['fee']:
158                                         break
159                                 feetxncount -= 1
160                         
161                         if getattr(self, 'Greedy', None):
162                                 # Aim to cut off extra zero-fee transactions on the end
163                                 # NOTE: not cutting out ones intermixed, in case of dependencies
164                                 idealtxncount = feetxncount
165                         else:
166                                 idealtxncount = txncount
167                         
168                         pot = 2**int(log(idealtxncount, 2))
169                         POTInfo = MP['POTInfo'] = [[idealtxncount, feetxncount, txncount], [pot, None], None]
170                         if pot < idealtxncount:
171                                 if pot * 2 <= txncount:
172                                         pot *= 2
173                                 elif pot >= feetxncount:
174                                         pass
175                                 elif POTMode > 1 and self._APOT(txninfo[pot-1:], MP, POTInfo):
176                                         # Trimmed even transactions with fees
177                                         pass
178                                 else:
179                                         pot = idealtxncount
180                                         self._floodWarning(now, 'Non-POT', doin='Making merkle tree with %d transactions (ideal: %d; max: %d)' % (pot, idealtxncount, txncount))
181                         POTInfo[1][1] = pot
182                         pot -= 1
183                         txnlist[pot:] = ()
184                         txninfo[pot:] = ()
185         
186         def updateMerkleTree(self):
187                 global now
188                 self.logger.debug('Polling bitcoind for memorypool')
189                 self.nextMerkleUpdate = now + self.TxnUpdateRetryWait
190                 
191                 try:
192                         MP = self.access.getmemorypool(self.GMPReq)
193                         self.OldGMP = False
194                         oMP = None
195                 except:
196                         MP = False
197                         try:
198                                 oMP = self.access.getmemorypool()
199                         except:
200                                 oMP = False
201                         if oMP is False:
202                                 # This way, we get the error from the BIP22 call if the old one fails too
203                                 raise
204                 if MP is False:
205                         # Pre-BIP22 server (bitcoind <0.7 or Eloipool <20120513)
206                         if not self.OldGMP:
207                                 self.OldGMP = True
208                                 self.logger.warning('Upstream server is not BIP 22 compliant')
209                         MP = oMP or self.access.getmemorypool()
210                 
211                 oMP = deepcopy(MP)
212                 
213                 prevBlock = bytes.fromhex(MP['previousblockhash'])[::-1]
214                 bits = bytes.fromhex(MP['bits'])[::-1]
215                 if (prevBlock, bits) != self.currentBlock:
216                         self.updateBlock(prevBlock, bits, _HBH=(MP['previousblockhash'], MP['bits']))
217                 
218                 txnlist = MP['transactions']
219                 if len(txnlist) and isinstance(txnlist[0], dict):
220                         txninfo = txnlist
221                         txnlist = tuple(a['data'] for a in txnlist)
222                         txninfo.insert(0, {
223                         })
224                 elif 'transactionfees' in MP:
225                         # Backward compatibility with pre-BIP22 gmp_fees branch
226                         txninfo = [{'fee':a} for a in MP['transactionfees']]
227                 else:
228                         # Backward compatibility with pre-BIP22 hex-only (bitcoind <0.7, Eloipool <future)
229                         txninfo = [{}] * len(txnlist)
230                 # TODO: cache Txn or at least txid from previous merkle roots?
231                 txnlist = [a for a in map(bytes.fromhex, txnlist)]
232                 
233                 self._makeBlockSafe(MP, txnlist, txninfo)
234                 
235                 cbtxn = self.makeCoinbaseTxn(MP['coinbasevalue'])
236                 cbtxn.setCoinbase(b'\0\0')
237                 cbtxn.assemble()
238                 txnlist.insert(0, cbtxn.data)
239                 
240                 txnlist = [a for a in map(Txn, txnlist[1:])]
241                 txnlist.insert(0, cbtxn)
242                 txnlist = list(txnlist)
243                 newMerkleTree = MerkleTree(txnlist)
244                 if newMerkleTree.merkleRoot() != self.currentMerkleTree.merkleRoot():
245                         newMerkleTree.POTInfo = MP.get('POTInfo')
246                         newMerkleTree.oMP = oMP
247                         self.logger.debug('Updating merkle tree')
248                         self.currentMerkleTree = newMerkleTree
249                 self.lastMerkleUpdate = now
250                 self.nextMerkleUpdate = now + self.MinimumTxnUpdateWait
251                 
252                 if self.needMerkle == 2:
253                         self.needMerkle = 1
254                         self.needMerkleSince = now
255         
256         def makeCoinbase(self):
257                 now = int(time())
258                 if now > _makeCoinbase[0]:
259                         _makeCoinbase[0] = now
260                         _makeCoinbase[1] = 0
261                 else:
262                         _makeCoinbase[1] += 1
263                 rv = self.CoinbasePrefix
264                 rv += pack('>L', now) + pack('>Q', _makeCoinbase[1]).lstrip(b'\0')
265                 # NOTE: Not using varlenEncode, since this is always guaranteed to be < 100
266                 rv = bytes( (len(rv),) ) + rv
267                 for v in self.CoinbaseAux.values():
268                         rv += v
269                 if len(rv) > 100:
270                         t = time()
271                         if self.overflowed < t - 300:
272                                 self.logger.warning('Overflowing coinbase data! %d bytes long' % (len(rv),))
273                                 self.overflowed = t
274                                 self.isOverflowed = True
275                         rv = rv[:100]
276                 else:
277                         self.isOverflowed = False
278                 return rv
279         
280         def makeMerkleRoot(self, merkleTree):
281                 cbtxn = merkleTree.data[0]
282                 cb = self.makeCoinbase()
283                 cbtxn.setCoinbase(cb)
284                 cbtxn.assemble()
285                 merkleRoot = merkleTree.merkleRoot()
286                 return (merkleRoot, merkleTree, cb)
287         
288         _doing_last = None
289         def _doing(self, what):
290                 if self._doing_last == what:
291                         self._doing_i += 1
292                         return
293                 global now
294                 if self._doing_last:
295                         self.logger.debug("Switching from (%4dx in %5.3f seconds) %s => %s" % (self._doing_i, now - self._doing_s, self._doing_last, what))
296                 self._doing_last = what
297                 self._doing_i = 1
298                 self._doing_s = now
299         
300         def _floodWarning(self, now, wid, wmsgf = None, doin = True, logf = None):
301                 if doin is True:
302                         doin = self._doing_last
303                         def a(f = wmsgf):
304                                 return lambda: "%s (doing %s)" % (f(), doin)
305                         wmsgf = a()
306                 winfo = self.lastWarning.setdefault(wid, [0, None])
307                 (lastTime, lastDoing) = winfo
308                 if now <= lastTime + max(5, self.MinimumTxnUpdateWait) and doin == lastDoing:
309                         return
310                 winfo[0] = now
311                 nowDoing = doin
312                 winfo[1] = nowDoing
313                 if logf is None:
314                         logf = self.logger.warning
315                 logf(wmsgf() if wmsgf else doin)
316         
317         def merkleMaker_I(self):
318                 global now
319                 
320                 # First, update merkle tree if we haven't for a while and aren't crunched for time
321                 now = time()
322                 if self.nextMerkleUpdate <= now and self.clearMerkleRoots.qsize() > self.WorkQueueSizeLongpoll[0] and len(self.merkleRoots) > self.WorkQueueSizeRegular[0]:
323                         self.updateMerkleTree()
324                 # Next, fill up the longpoll queue first, since it can be used as a failover for the main queue
325                 elif not self.clearMerkleRoots.full():
326                         self._doing('blank merkle roots')
327                         self.clearMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree))
328                 # Next, fill up the main queue (until they're all current)
329                 elif len(self.merkleRoots) < self.WorkQueueSizeRegular[1] or self.merkleRoots[0][1] != self.currentMerkleTree:
330                         if self.needMerkle == 1 and len(self.merkleRoots) >= self.WorkQueueSizeRegular[1]:
331                                 self.onBlockUpdate()
332                                 self.needMerkle = False
333                         self._doing('regular merkle roots')
334                         self.merkleRoots.append(self.makeMerkleRoot(self.currentMerkleTree))
335                 else:
336                         if self.needMerkle == 1:
337                                 self.onBlockUpdate()
338                                 self.needMerkle = False
339                         self._doing('idle')
340                         # TODO: rather than sleepspin, block until MinimumTxnUpdateWait expires or threading.Condition(?)
341                         sleep(self.IdleSleepTime)
342                 if self.needMerkle == 1 and now > self.needMerkleSince + self.WarningDelayTxnLongpoll:
343                         self._floodWarning(now, 'NeedMerkle', lambda: 'Transaction-longpoll requested %d seconds ago, and still not ready. Is your server fast enough to keep up with your configured WorkQueueSizeRegular maximum?' % (now - self.needMerkleSince,))
344                 if now > self.nextMerkleUpdate + self.WarningDelayMerkleUpdate:
345                         self._floodWarning(now, 'MerkleUpdate', lambda: "Haven't updated the merkle tree in at least %d seconds! Is your server fast enough to keep up with your configured work queue minimums?" % (now - self.lastMerkleUpdate,))
346         
347         def run(self):
348                 while True:
349                         try:
350                                 self.merkleMaker_I()
351                         except:
352                                 self.logger.critical(traceback.format_exc())
353         
354         def start(self, *a, **k):
355                 self._prepare()
356                 super().start(*a, **k)
357         
358         def getMRD(self):
359                 (prevBlock, bits) = self.currentBlock
360                 try:
361                         MRD = self.merkleRoots.pop()
362                         self.LowestMerkleRoots = min(len(self.merkleRoots), self.LowestMerkleRoots)
363                         rollPrevBlk = False
364                 except IndexError:
365                         qsz = self.clearMerkleRoots.qsize()
366                         if qsz < 0x10:
367                                 self.logger.warning('clearMerkleRoots running out! only %d left' % (qsz,))
368                         MRD = self.clearMerkleRoots.get()
369                         self.LowestClearMerkleRoots = min(self.clearMerkleRoots.qsize(), self.LowestClearMerkleRoots)
370                         rollPrevBlk = True
371                 (merkleRoot, merkleTree, cb) = MRD
372                 return (merkleRoot, merkleTree, cb, prevBlock, bits, rollPrevBlk)
373         
374         def getMC(self):
375                 (prevBlock, bits) = self.currentBlock
376                 mt = self.currentMerkleTree
377                 cb = self.makeCoinbase()
378                 return (None, mt, cb, prevBlock, bits)
379
380 # merkleMaker tests
381 def _test():
382         global now
383         now = 1337039788
384         MM = merkleMaker()
385         reallogger = MM.logger
386         class fakelogger:
387                 LO = False
388                 def critical(self, *a):
389                         if self.LO > 1: return
390                         reallogger.critical(*a)
391                 def warning(self, *a):
392                         if self.LO: return
393                         reallogger.warning(*a)
394                 def debug(self, *a):
395                         pass
396         MM.logger = fakelogger()
397         class NMTClass:
398                 pass
399         
400         # _makeBlockSafe tests
401         from copy import deepcopy
402         MP = {
403                 'coinbasevalue':50,
404         }
405         txnlist = [b'\0', b'\x01', b'\x02']
406         txninfo = [{'fee':0, 'sigops':1}, {'fee':5, 'sigops':10000}, {'fee':0, 'sigops':10001}]
407         def MBS(LO = 0):
408                 m = deepcopy( (MP, txnlist, txninfo) )
409                 MM.logger.LO = LO
410                 try:
411                         MM._makeBlockSafe(*m)
412                 except:
413                         if LO < 2:
414                                 raise
415                 else:
416                         assert LO < 2  # An expected error wasn't thrown
417                 if 'POTInfo' in m[0]:
418                         del m[0]['POTInfo']
419                 return m
420         MM.POT = 0
421         assert MBS() == (MP, txnlist[:2], txninfo[:2])
422         txninfo[2]['fee'] = 1
423         MPx = deepcopy(MP)
424         MPx['coinbasevalue'] -= 1
425         assert MBS() == (MPx, txnlist[:2], txninfo[:2])
426         txninfo[2]['sigops'] = 1
427         assert MBS(1) == (MP, txnlist, txninfo)
428         # APOT tests
429         MM.POT = 2
430         txnlist.append(b'\x03')
431         txninfo.append({'fee':1, 'sigops':0})
432         MPx = deepcopy(MP)
433         MPx['coinbasevalue'] -= 1
434         assert MBS() == (MPx, txnlist[:3], txninfo[:3])
435
436 _test()