Merge branch 'master' into coinbase_height
[bitcoin:eloipool.git] / merklemaker.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 from bitcoin.script import countSigOps
19 from bitcoin.txn import Txn
20 from collections import deque
21 from copy import deepcopy
22 from queue import Queue
23 import jsonrpc
24 import logging
25 from math import log
26 from merkletree import MerkleTree
27 from struct import pack
28 import threading
29 from time import sleep, time
30 import traceback
31
32 _makeCoinbase = [0, 0]
33
34 class merkleMaker(threading.Thread):
35         OldGMP = None
36         GMPReq = {
37                 'capabilities': [
38                         'coinbasevalue',
39                         'coinbase/append',
40                         'coinbase',
41                         'generation',
42                         'time',
43                         'transactions/remove',
44                         'prevblock',
45                 ],
46                 'tx': 'obj',
47         }
48         
49         def __init__(self, *a, **k):
50                 super().__init__(*a, **k)
51                 self.daemon = True
52                 self.logger = logging.getLogger('merkleMaker')
53                 self.CoinbasePrefix = b''
54                 self.CoinbaseAux = {}
55                 self.isOverflowed = False
56                 self.lastWarning = {}
57                 self.MinimumTxnUpdateWait = 5
58                 self.overflowed = 0
59                 self.DifficultyChangeMod = 2016
60         
61         def _prepare(self):
62                 self.access = jsonrpc.ServiceProxy(self.UpstreamURI)
63                 
64                 self.currentBlock = (None, None, None)
65                 
66                 self.currentMerkleTree = None
67                 self.merkleRoots = deque(maxlen=self.WorkQueueSizeRegular[1])
68                 self.LowestMerkleRoots = self.WorkQueueSizeRegular[1]
69                 
70                 if not hasattr(self, 'WorkQueueSizeClear'):
71                         self.WorkQueueSizeClear = self.WorkQueueSizeLongpoll
72                 self._MaxClearSize = max(self.WorkQueueSizeClear[1], self.WorkQueueSizeLongpoll[1])
73                 self.clearMerkleTree = MerkleTree([self.clearCoinbaseTxn])
74                 self.clearMerkleRoots = Queue(self._MaxClearSize)
75                 self.LowestClearMerkleRoots = self.WorkQueueSizeClear[1]
76                 self.nextMerkleRoots = Queue(self._MaxClearSize)
77                 
78                 if not hasattr(self, 'WarningDelay'):
79                         self.WarningDelay = max(15, self.MinimumTxnUpdateWait * 2)
80                 if not hasattr(self, 'WarningDelayTxnLongpoll'):
81                         self.WarningDelayTxnLongpoll = self.WarningDelay
82                 if not hasattr(self, 'WarningDelayMerkleUpdate'):
83                         self.WarningDelayMerkleUpdate = self.WarningDelay
84                 
85                 self.lastMerkleUpdate = 0
86                 self.nextMerkleUpdate = 0
87                 global now
88                 now = time()
89                 self.updateMerkleTree()
90         
91         def updateBlock(self, newBlock, height = None, bits = None, _HBH = None):
92                 if newBlock == self.currentBlock[0]:
93                         if height in (None, self.currentBlock[1]) and bits in (None, self.currentBlock[2]):
94                                 return
95                         if not self.currentBlock[2] is None:
96                                 self.logger.error('Was working on block with wrong specs: %s (height: %d->%d; bits: %s->%s' % (
97                                         b2a_hex(newBlock[::-1]).decode('utf8'),
98                                         self.currentBlock[1],
99                                         height,
100                                         b2a_hex(self.currentBlock[2][::-1]).decode('utf8'),
101                                         b2a_hex(bits[::-1]).decode('utf8'),
102                                 ))
103                 
104                 if height is None:
105                         height = self.currentBlock[1] + 1
106                 if bits is None:
107                         if height % self.DifficultyChangeMod == 1 or self.currentBlock[2] is None:
108                                 self.logger.warning('New block: %s (height %d; bits: UNKNOWN)' % (b2a_hex(newBlock[::-1]).decode('utf8'), height))
109                         else:
110                                 bits = self.currentBlock[2]
111                         
112                         # Pretend to be 1 lower height, so we possibly retain nextMerkleRoots
113                         height -= 1
114                         self.clearMerkleRoots = Queue(0)
115                 else:
116                         if _HBH is None:
117                                 _HBH = (b2a_hex(newBlock[::-1]).decode('utf8'), b2a_hex(bits[::-1]).decode('utf8'))
118                         self.logger.info('New block: %s (height: %d; bits: %s)' % (_HBH[0], height, _HBH[1]))
119                         
120                         if self.currentBlock[1] != height:
121                                 if self.currentBlock[1] == height - 1:
122                                         self.clearMerkleRoots = self.nextMerkleRoots
123                                 else:
124                                         if self.currentBlock[1]:
125                                                 self.logger.warning('Change from height %d->%d; no longpoll merkleroots available!' % (self.currentBlock[1], height))
126                                         self.clearMerkleRoots = Queue(self.WorkQueueSizeClear[1])
127                                 self.nextMerkleRoots = Queue(self._MaxClearSize)
128                 self.merkleRoots.clear()
129                 self.currentMerkleTree = self.clearMerkleTree
130                 if self.currentBlock[0] != newBlock:
131                         self.lastBlock = self.currentBlock
132                 self.currentBlock = (newBlock, height, bits)
133                 self.needMerkle = 2
134                 self.onBlockChange()
135         
136         def _trimBlock(self, MP, txnlist, txninfo, floodn, msgf):
137                 fee = txninfo[-1].get('fee', None)
138                 if fee is None:
139                         raise self._floodCritical(now, floodn, doin=msgf('fees unknown'))
140                 if fee:
141                         # FIXME: coinbasevalue is *not* guaranteed to exist here
142                         MP['coinbasevalue'] -= fee
143                 
144                 txnlist[-1:] = ()
145                 txninfo[-1:] = ()
146                 
147                 return True
148         
149         def _APOT(self, txninfopot, MP, POTInfo):
150                 feeTxnsTrimmed = 0
151                 feesTrimmed = 0
152                 for txn in txninfopot:
153                         if txn.get('fee') is None:
154                                 self._floodWarning(now, 'APOT-No-Fees', doin='Upstream didn\'t provide fee information required for aggressive POT', logf=self.logger.info)
155                                 return
156                         if not txn['fee']:
157                                 continue
158                         feesTrimmed += txn['fee']
159                         feeTxnsTrimmed += 1
160                 MP['coinbasevalue'] -= feesTrimmed
161                 
162                 POTInfo[2] = [feeTxnsTrimmed, feesTrimmed]
163                 self._floodWarning(now, 'POT-Trimming-Fees', doin='Aggressive POT trimming %d transactions with %d.%08d BTC total fees' % (feeTxnsTrimmed, feesTrimmed//100000000, feesTrimmed % 100000000), logf=self.logger.debug)
164                 
165                 return True
166         
167         def _makeBlockSafe(self, MP, txnlist, txninfo):
168                 blocksize = sum(map(len, txnlist)) + 80
169                 while blocksize > 934464:  # 1 "MB" limit - 64 KB breathing room
170                         txnsize = len(txnlist[-1])
171                         self._trimBlock(MP, txnlist, txninfo, 'SizeLimit', lambda x: 'Making blocks over 1 MB size limit (%d bytes; %s)' % (blocksize, x))
172                         blocksize -= txnsize
173                 
174                 # NOTE: This check doesn't work at all without BIP22 transaction obj format
175                 blocksigops = sum(a.get('sigops', 0) for a in txninfo)
176                 while blocksigops > 19488:  # 20k limit - 0x200 breathing room
177                         txnsigops = txninfo[-1]['sigops']
178                         self._trimBlock(MP, txnlist, txninfo, 'SigOpLimit', lambda x: 'Making blocks over 20k SigOp limit (%d; %s)' % (blocksigops, x))
179                         blocksigops -= txnsigops
180                 
181                 POTMode = getattr(self, 'POT', 1)
182                 txncount = len(txnlist) + 1
183                 if POTMode:
184                         feetxncount = txncount
185                         for i in range(txncount - 2, -1, -1):
186                                 if 'fee' not in txninfo[i] or txninfo[i]['fee']:
187                                         break
188                                 feetxncount -= 1
189                         
190                         if getattr(self, 'Greedy', None):
191                                 # Aim to cut off extra zero-fee transactions on the end
192                                 # NOTE: not cutting out ones intermixed, in case of dependencies
193                                 idealtxncount = feetxncount
194                         else:
195                                 idealtxncount = txncount
196                         
197                         pot = 2**int(log(idealtxncount, 2))
198                         POTInfo = MP['POTInfo'] = [[idealtxncount, feetxncount, txncount], [pot, None], None]
199                         if pot < idealtxncount:
200                                 if pot * 2 <= txncount:
201                                         pot *= 2
202                                 elif pot >= feetxncount:
203                                         pass
204                                 elif POTMode > 1 and self._APOT(txninfo[pot-1:], MP, POTInfo):
205                                         # Trimmed even transactions with fees
206                                         pass
207                                 else:
208                                         pot = idealtxncount
209                                         self._floodWarning(now, 'Non-POT', doin='Making merkle tree with %d transactions (ideal: %d; max: %d)' % (pot, idealtxncount, txncount))
210                         POTInfo[1][1] = pot
211                         pot -= 1
212                         txnlist[pot:] = ()
213                         txninfo[pot:] = ()
214         
215         def updateMerkleTree(self):
216                 global now
217                 self.logger.debug('Polling bitcoind for memorypool')
218                 self.nextMerkleUpdate = now + self.TxnUpdateRetryWait
219                 
220                 try:
221                         MP = self.access.getmemorypool(self.GMPReq)
222                         self.OldGMP = False
223                         oMP = None
224                 except:
225                         MP = False
226                         try:
227                                 oMP = self.access.getmemorypool()
228                         except:
229                                 oMP = False
230                         if oMP is False:
231                                 # This way, we get the error from the BIP22 call if the old one fails too
232                                 raise
233                 if MP is False:
234                         # Pre-BIP22 server (bitcoind <0.7 or Eloipool <20120513)
235                         if not self.OldGMP:
236                                 self.OldGMP = True
237                                 self.logger.warning('Upstream server is not BIP 22 compliant')
238                         MP = oMP or self.access.getmemorypool()
239                 
240                 oMP = deepcopy(MP)
241                 
242                 prevBlock = bytes.fromhex(MP['previousblockhash'])[::-1]
243                 if 'height' in MP:
244                         height = MP['height']
245                 else:
246                         height = self.access.getinfo()['blocks'] + 1
247                 bits = bytes.fromhex(MP['bits'])[::-1]
248                 if (prevBlock, bits) != self.currentBlock:
249                         self.updateBlock(prevBlock, height, bits, _HBH=(MP['previousblockhash'], MP['bits']))
250                 
251                 txnlist = MP['transactions']
252                 if len(txnlist) and isinstance(txnlist[0], dict):
253                         txninfo = txnlist
254                         txnlist = tuple(a['data'] for a in txnlist)
255                         txninfo.insert(0, {
256                         })
257                 elif 'transactionfees' in MP:
258                         # Backward compatibility with pre-BIP22 gmp_fees branch
259                         txninfo = [{'fee':a} for a in MP['transactionfees']]
260                 else:
261                         # Backward compatibility with pre-BIP22 hex-only (bitcoind <0.7, Eloipool <future)
262                         txninfo = [{}] * len(txnlist)
263                 # TODO: cache Txn or at least txid from previous merkle roots?
264                 txnlist = [a for a in map(bytes.fromhex, txnlist)]
265                 
266                 self._makeBlockSafe(MP, txnlist, txninfo)
267                 
268                 cbtxn = self.makeCoinbaseTxn(MP['coinbasevalue'])
269                 cbtxn.setCoinbase(b'\0\0')
270                 cbtxn.assemble()
271                 txnlist.insert(0, cbtxn.data)
272                 
273                 txnlist = [a for a in map(Txn, txnlist[1:])]
274                 txnlist.insert(0, cbtxn)
275                 txnlist = list(txnlist)
276                 newMerkleTree = MerkleTree(txnlist)
277                 if newMerkleTree.merkleRoot() != self.currentMerkleTree.merkleRoot():
278                         newMerkleTree.POTInfo = MP.get('POTInfo')
279                         newMerkleTree.oMP = oMP
280                         self.logger.debug('Updating merkle tree')
281                         self.currentMerkleTree = newMerkleTree
282                 self.lastMerkleUpdate = now
283                 self.nextMerkleUpdate = now + self.MinimumTxnUpdateWait
284                 
285                 if self.needMerkle == 2:
286                         self.needMerkle = 1
287                         self.needMerkleSince = now
288         
289         def makeCoinbase(self):
290                 now = int(time())
291                 if now > _makeCoinbase[0]:
292                         _makeCoinbase[0] = now
293                         _makeCoinbase[1] = 0
294                 else:
295                         _makeCoinbase[1] += 1
296                 rv = self.CoinbasePrefix
297                 rv += pack('>L', now) + pack('>Q', _makeCoinbase[1]).lstrip(b'\0')
298                 # NOTE: Not using varlenEncode, since this is always guaranteed to be < 100
299                 rv = bytes( (len(rv),) ) + rv
300                 for v in self.CoinbaseAux.values():
301                         rv += v
302                 if len(rv) > 95:
303                         t = time()
304                         if self.overflowed < t - 300:
305                                 self.logger.warning('Overflowing coinbase data! %d bytes long' % (len(rv),))
306                                 self.overflowed = t
307                                 self.isOverflowed = True
308                         rv = rv[:95]
309                 else:
310                         self.isOverflowed = False
311                 return rv
312         
313         def makeMerkleRoot(self, merkleTree, height):
314                 cbtxn = merkleTree.data[0]
315                 cb = self.makeCoinbase()
316                 cbtxn.setCoinbase(cb, height=height)
317                 cbtxn.assemble()
318                 merkleRoot = merkleTree.merkleRoot()
319                 return (merkleRoot, merkleTree, cbtxn.getCoinbase())
320         
321         _doing_last = None
322         def _doing(self, what):
323                 if self._doing_last == what:
324                         self._doing_i += 1
325                         return
326                 global now
327                 if self._doing_last:
328                         self.logger.debug("Switching from (%4dx in %5.3f seconds) %s => %s" % (self._doing_i, now - self._doing_s, self._doing_last, what))
329                 self._doing_last = what
330                 self._doing_i = 1
331                 self._doing_s = now
332         
333         def _floodWarning(self, now, wid, wmsgf = None, doin = True, logf = None):
334                 if doin is True:
335                         doin = self._doing_last
336                         def a(f = wmsgf):
337                                 return lambda: "%s (doing %s)" % (f(), doin)
338                         wmsgf = a()
339                 winfo = self.lastWarning.setdefault(wid, [0, None])
340                 (lastTime, lastDoing) = winfo
341                 if now <= lastTime + max(5, self.MinimumTxnUpdateWait) and doin == lastDoing:
342                         return
343                 winfo[0] = now
344                 nowDoing = doin
345                 winfo[1] = nowDoing
346                 if logf is None:
347                         logf = self.logger.warning
348                 logf(wmsgf() if wmsgf else doin)
349         
350         def makeClear(self):
351                 self._doing('clear merkle roots')
352                 self.clearMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree, height=self.currentBlock[1]))
353         
354         def makeNext(self):
355                 self._doing('longpoll merkle roots')
356                 self.nextMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree, height=self.currentBlock[1] + 1))
357         
358         def makeRegular(self):
359                 self._doing('regular merkle roots')
360                 self.merkleRoots.append(self.makeMerkleRoot(self.currentMerkleTree, height=self.currentBlock[1]))
361         
362         def merkleMaker_II(self):
363                 global now
364                 
365                 # No bits = no mining :(
366                 if self.currentBlock[2] is None:
367                         return self.updateMerkleTree()
368                 
369                 # First, ensure we have the minimum clear, next, and regular (in that order)
370                 if self.clearMerkleRoots.qsize() < self.WorkQueueSizeClear[0]:
371                         return self.makeClear()
372                 if self.nextMerkleRoots.qsize() < self.WorkQueueSizeLongpoll[0]:
373                         return self.makeNext()
374                 if len(self.merkleRoots) < self.WorkQueueSizeRegular[0]:
375                         return self.makeRegular()
376                 
377                 # If we've met the minimum requirements, consider updating the merkle tree
378                 if self.nextMerkleUpdate <= now:
379                         return self.updateMerkleTree()
380                 
381                 # Finally, fill up clear, next, and regular until we've met the maximums
382                 if self.clearMerkleRoots.qsize() < self.WorkQueueSizeClear[1]:
383                         return self.makeClear()
384                 if self.nextMerkleRoots.qsize() < self.WorkQueueSizeLongpoll[1]:
385                         return self.makeNext()
386                 if len(self.merkleRoots) < self.WorkQueueSizeRegular[1] or self.merkleRoots[0][1] != self.currentMerkleTree:
387                         return self.makeRegular()
388                 
389                 # Nothing left to do, fire onBlockUpdate event (if appropriate) and sleep
390                 if self.needMerkle == 1:
391                         self.onBlockUpdate()
392                         self.needMerkle = False
393                 self._doing('idle')
394                 # TODO: rather than sleepspin, block until MinimumTxnUpdateWait expires or threading.Condition(?)
395                 sleep(self.IdleSleepTime)
396         
397         def merkleMaker_I(self):
398                 global now
399                 now = time()
400                 
401                 self.merkleMaker_II()
402                 
403                 if self.needMerkle == 1 and now > self.needMerkleSince + self.WarningDelayTxnLongpoll:
404                         self._floodWarning(now, 'NeedMerkle', lambda: 'Transaction-longpoll requested %d seconds ago, and still not ready. Is your server fast enough to keep up with your configured WorkQueueSizeRegular maximum?' % (now - self.needMerkleSince,))
405                 if now > self.nextMerkleUpdate + self.WarningDelayMerkleUpdate:
406                         self._floodWarning(now, 'MerkleUpdate', lambda: "Haven't updated the merkle tree in at least %d seconds! Is your server fast enough to keep up with your configured work queue minimums?" % (now - self.lastMerkleUpdate,))
407         
408         def run(self):
409                 while True:
410                         try:
411                                 self.merkleMaker_I()
412                         except:
413                                 self.logger.critical(traceback.format_exc())
414         
415         def start(self, *a, **k):
416                 self._prepare()
417                 super().start(*a, **k)
418         
419         def getMRD(self):
420                 try:
421                         MRD = self.merkleRoots.pop()
422                         self.LowestMerkleRoots = min(len(self.merkleRoots), self.LowestMerkleRoots)
423                         rollPrevBlk = False
424                 except IndexError:
425                         qsz = self.clearMerkleRoots.qsize()
426                         if qsz < 0x10:
427                                 self.logger.warning('clearMerkleRoots running out! only %d left' % (qsz,))
428                         MRD = self.clearMerkleRoots.get()
429                         self.LowestClearMerkleRoots = min(self.clearMerkleRoots.qsize(), self.LowestClearMerkleRoots)
430                         rollPrevBlk = True
431                 (merkleRoot, merkleTree, cb) = MRD
432                 (prevBlock, height, bits) = self.currentBlock
433                 return (merkleRoot, merkleTree, cb, prevBlock, bits, rollPrevBlk)
434         
435         def getMC(self):
436                 (prevBlock, bits) = self.currentBlock
437                 mt = self.currentMerkleTree
438                 cb = self.makeCoinbase()
439                 return (None, mt, cb, prevBlock, bits)
440
441 # merkleMaker tests
442 def _test():
443         global now
444         now = 1337039788
445         MM = merkleMaker()
446         reallogger = MM.logger
447         class fakelogger:
448                 LO = False
449                 def critical(self, *a):
450                         if self.LO > 1: return
451                         reallogger.critical(*a)
452                 def warning(self, *a):
453                         if self.LO: return
454                         reallogger.warning(*a)
455                 def debug(self, *a):
456                         pass
457         MM.logger = fakelogger()
458         class NMTClass:
459                 pass
460         
461         # _makeBlockSafe tests
462         from copy import deepcopy
463         MP = {
464                 'coinbasevalue':50,
465         }
466         txnlist = [b'\0', b'\x01', b'\x02']
467         txninfo = [{'fee':0, 'sigops':1}, {'fee':5, 'sigops':10000}, {'fee':0, 'sigops':10001}]
468         def MBS(LO = 0):
469                 m = deepcopy( (MP, txnlist, txninfo) )
470                 MM.logger.LO = LO
471                 try:
472                         MM._makeBlockSafe(*m)
473                 except:
474                         if LO < 2:
475                                 raise
476                 else:
477                         assert LO < 2  # An expected error wasn't thrown
478                 if 'POTInfo' in m[0]:
479                         del m[0]['POTInfo']
480                 return m
481         MM.POT = 0
482         assert MBS() == (MP, txnlist[:2], txninfo[:2])
483         txninfo[2]['fee'] = 1
484         MPx = deepcopy(MP)
485         MPx['coinbasevalue'] -= 1
486         assert MBS() == (MPx, txnlist[:2], txninfo[:2])
487         txninfo[2]['sigops'] = 1
488         assert MBS(1) == (MP, txnlist, txninfo)
489         # APOT tests
490         MM.POT = 2
491         txnlist.append(b'\x03')
492         txninfo.append({'fee':1, 'sigops':0})
493         MPx = deepcopy(MP)
494         MPx['coinbasevalue'] -= 1
495         assert MBS() == (MPx, txnlist[:3], txninfo[:3])
496
497 _test()