Merge branch 'bugfix_notrim'
[bitcoin:eloipool.git] / merklemaker.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 from bitcoin.script import countSigOps
19 from bitcoin.txn import Txn
20 from collections import deque
21 from queue import Queue
22 import jsonrpc
23 import logging
24 from math import log
25 from merkletree import MerkleTree
26 from struct import pack
27 import threading
28 from time import sleep, time
29 import traceback
30
31 _makeCoinbase = [0, 0]
32
33 class merkleMaker(threading.Thread):
34         def __init__(self, *a, **k):
35                 super().__init__(*a, **k)
36                 self.daemon = True
37                 self.logger = logging.getLogger('merkleMaker')
38                 self.CoinbasePrefix = b''
39                 self.CoinbaseAux = {}
40                 self.isOverflowed = False
41                 self.overflowed = 0
42         
43         def _prepare(self):
44                 self.access = jsonrpc.ServiceProxy(self.UpstreamURI)
45                 
46                 self.currentBlock = (None, None)
47                 self.currentMerkleTree = None
48                 self.merkleRoots = deque(maxlen=self.WorkQueueSizeRegular[1])
49                 self.LowestMerkleRoots = self.WorkQueueSizeRegular[1]
50                 self.clearMerkleTree = MerkleTree([self.clearCoinbaseTxn])
51                 self.clearMerkleRoots = Queue(self.WorkQueueSizeLongpoll[1])
52                 self.LowestClearMerkleRoots = self.WorkQueueSizeLongpoll[1]
53                 
54                 if not hasattr(self, 'WarningDelay'):
55                         self.WarningDelay = max(15, self.MinimumTxnUpdateWait * 2)
56                 if not hasattr(self, 'WarningDelayTxnLongpoll'):
57                         self.WarningDelayTxnLongpoll = self.WarningDelay
58                 if not hasattr(self, 'WarningDelayMerkleUpdate'):
59                         self.WarningDelayMerkleUpdate = self.WarningDelay
60                 
61                 self.lastMerkleUpdate = 0
62                 self.nextMerkleUpdate = 0
63                 self.lastWarning = {}
64                 global now
65                 now = time()
66                 self.updateMerkleTree()
67         
68         def updateBlock(self, newBlock, bits = None, _HBH = None):
69                 if newBlock == self.currentBlock[0]:
70                         if bits in (None, self.currentBlock[1]):
71                                 return
72                         self.logger.error('Was working on block with wrong specs: %s (bits: %s->%s)' % (
73                                 b2a_hex(newBlock[::-1]).decode('utf8'),
74                                 b2a_hex(self.currentBlock[1][::-1]).decode('utf8'),
75                                 b2a_hex(bits[::-1]).decode('utf8'),
76                         ))
77                 
78                 if bits is None:
79                         bits = self.currentBlock[1]
80                 if _HBH is None:
81                         _HBH = (b2a_hex(newBlock[::-1]).decode('utf8'), b2a_hex(bits[::-1]).decode('utf8'))
82                 self.logger.info('New block: %s (bits: %s)' % _HBH)
83                 self.merkleRoots.clear()
84                 self.currentMerkleTree = self.clearMerkleTree
85                 if self.currentBlock[0] != newBlock:
86                         self.lastBlock = self.currentBlock
87                 self.currentBlock = (newBlock, bits)
88                 self.needMerkle = 2
89                 self.onBlockChange()
90         
91         def updateMerkleTree(self):
92                 global now
93                 self.logger.debug('Polling bitcoind for memorypool')
94                 self.nextMerkleUpdate = now + self.TxnUpdateRetryWait
95                 MP = self.access.getmemorypool()
96                 prevBlock = bytes.fromhex(MP['previousblockhash'])[::-1]
97                 bits = bytes.fromhex(MP['bits'])[::-1]
98                 if (prevBlock, bits) != self.currentBlock:
99                         self.updateBlock(prevBlock, bits, _HBH=(MP['previousblockhash'], MP['bits']))
100                 # TODO: cache Txn or at least txid from previous merkle roots?
101                 txnlist = [a for a in map(bytes.fromhex, MP['transactions'])]
102                 
103                 cbtxn = self.makeCoinbaseTxn(MP['coinbasevalue'])
104                 cbtxn.setCoinbase(b'\0\0')
105                 cbtxn.assemble()
106                 txnlist.insert(0, cbtxn.data)
107                 
108                 txnlistsz = sum(map(len, txnlist))
109                 if txnlistsz > 934464:  # 1 "MB" limit - 64 KB breathing room
110                         # FIXME: Try to safely truncate the block
111                         W = 'Making blocks over 1 MB size limit (%d bytes)' % (txnlistsz,)
112                         self._floodWarning(now, 'SizeLimit', lambda: W, W, logf=self.logger.error)
113                 
114                 txnlistsz = sum(map(countSigOps, txnlist))
115                 if txnlistsz > 19488:  # 20k limit - 0x200 breathing room
116                         # FIXME: Try to safely truncate the block
117                         W = 'Making blocks over 20k SigOp limit (%d)' % (txnlistsz,)
118                         self._floodWarning(now, 'SigOpLimit', lambda: W, W, logf=self.logger.error)
119                 
120                 txncount = len(txnlist)
121                 idealtxncount = txncount
122                 if hasattr(self, 'Greedy') and self.Greedy and 'transactionfees' in MP:
123                         feeinfo = MP['transactionfees']
124                         feeinfo.insert(0, -MP['coinbasevalue'])
125                         # Aim to cut off extra zero-fee transactions on the end
126                         # NOTE: not cutting out ones intermixed, in case of dependencies
127                         feeinfoLen = len(feeinfo)
128                         if feeinfoLen > txncount:
129                                 feeinfoLen = txncount
130                         elif feeinfoLen < txncount:
131                                 idealtxncount -= txncount - feeinfoLen
132                         for i in range(feeinfoLen - 1, 0, -1):
133                                 if feeinfo[i]:
134                                         break
135                                 idealtxncount -= 1
136                 
137                 pot = 2**int(log(idealtxncount, 2))
138                 if pot < idealtxncount:
139                         if pot * 2 <= txncount:
140                                 pot *= 2
141                         else:
142                                 pot = idealtxncount
143                                 POTWarn = "Making merkle tree with %d transactions (ideal: %d; max: %d)" % (pot, idealtxncount, txncount)
144                                 self._floodWarning(now, 'Non-POT', lambda: POTWarn, POTWarn)
145                 txnlist = txnlist[:pot]
146                 
147                 txnlist = [a for a in map(Txn, txnlist[1:])]
148                 txnlist.insert(0, cbtxn)
149                 txnlist = list(txnlist)
150                 newMerkleTree = MerkleTree(txnlist)
151                 if newMerkleTree.merkleRoot() != self.currentMerkleTree.merkleRoot():
152                         self.logger.debug('Updating merkle tree')
153                         self.currentMerkleTree = newMerkleTree
154                 self.lastMerkleUpdate = now
155                 self.nextMerkleUpdate = now + self.MinimumTxnUpdateWait
156                 
157                 if self.needMerkle == 2:
158                         self.needMerkle = 1
159                         self.needMerkleSince = now
160         
161         def makeCoinbase(self):
162                 now = int(time())
163                 if now > _makeCoinbase[0]:
164                         _makeCoinbase[0] = now
165                         _makeCoinbase[1] = 0
166                 else:
167                         _makeCoinbase[1] += 1
168                 rv = self.CoinbasePrefix
169                 rv += pack('>L', now) + pack('>Q', _makeCoinbase[1]).lstrip(b'\0')
170                 # NOTE: Not using varlenEncode, since this is always guaranteed to be < 100
171                 rv = bytes( (len(rv),) ) + rv
172                 for v in self.CoinbaseAux.values():
173                         rv += v
174                 if len(rv) > 100:
175                         t = time()
176                         if self.overflowed < t - 300:
177                                 self.logger.warning('Overflowing coinbase data! %d bytes long' % (len(rv),))
178                                 self.overflowed = t
179                                 self.isOverflowed = True
180                         rv = rv[:100]
181                 else:
182                         self.isOverflowed = False
183                 return rv
184         
185         def makeMerkleRoot(self, merkleTree):
186                 cbtxn = merkleTree.data[0]
187                 cb = self.makeCoinbase()
188                 cbtxn.setCoinbase(cb)
189                 cbtxn.assemble()
190                 merkleRoot = merkleTree.merkleRoot()
191                 return (merkleRoot, merkleTree, cb)
192         
193         _doing_last = None
194         def _doing(self, what):
195                 if self._doing_last == what:
196                         self._doing_i += 1
197                         return
198                 global now
199                 if self._doing_last:
200                         self.logger.debug("Switching from (%4dx in %5.3f seconds) %s => %s" % (self._doing_i, now - self._doing_s, self._doing_last, what))
201                 self._doing_last = what
202                 self._doing_i = 1
203                 self._doing_s = now
204         
205         def _floodWarning(self, now, wid, wmsgf, doin = True, logf = None):
206                 if doin is True:
207                         doin = self._doing_last
208                         def a(f = wmsgf):
209                                 return lambda: "%s (doing %s)" % (f(), doin)
210                         wmsgf = a()
211                 winfo = self.lastWarning.setdefault(wid, [0, None])
212                 (lastTime, lastDoing) = winfo
213                 if now <= lastTime + max(5, self.MinimumTxnUpdateWait) and doin == lastDoing:
214                         return
215                 winfo[0] = now
216                 nowDoing = doin
217                 winfo[1] = nowDoing
218                 if logf is None:
219                         logf = self.logger.warning
220                 logf(wmsgf())
221         
222         def merkleMaker_I(self):
223                 global now
224                 
225                 # First, update merkle tree if we haven't for a while and aren't crunched for time
226                 now = time()
227                 if self.nextMerkleUpdate <= now and self.clearMerkleRoots.qsize() > self.WorkQueueSizeLongpoll[0] and len(self.merkleRoots) > self.WorkQueueSizeRegular[0]:
228                         self.updateMerkleTree()
229                 # Next, fill up the longpoll queue first, since it can be used as a failover for the main queue
230                 elif not self.clearMerkleRoots.full():
231                         self._doing('blank merkle roots')
232                         self.clearMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree))
233                 # Next, fill up the main queue (until they're all current)
234                 elif len(self.merkleRoots) < self.WorkQueueSizeRegular[1] or self.merkleRoots[0][1] != self.currentMerkleTree:
235                         if self.needMerkle == 1 and len(self.merkleRoots) >= self.WorkQueueSizeRegular[1]:
236                                 self.onBlockUpdate()
237                                 self.needMerkle = False
238                         self._doing('regular merkle roots')
239                         self.merkleRoots.append(self.makeMerkleRoot(self.currentMerkleTree))
240                 else:
241                         if self.needMerkle == 1:
242                                 self.onBlockUpdate()
243                                 self.needMerkle = False
244                         self._doing('idle')
245                         # TODO: rather than sleepspin, block until MinimumTxnUpdateWait expires or threading.Condition(?)
246                         sleep(self.IdleSleepTime)
247                 if self.needMerkle == 1 and now > self.needMerkleSince + self.WarningDelayTxnLongpoll:
248                         self._floodWarning(now, 'NeedMerkle', lambda: 'Transaction-longpoll requested %d seconds ago, and still not ready. Is your server fast enough to keep up with your configured WorkQueueSizeRegular maximum?' % (now - self.needMerkleSince,))
249                 if now > self.nextMerkleUpdate + self.WarningDelayMerkleUpdate:
250                         self._floodWarning(now, 'MerkleUpdate', lambda: "Haven't updated the merkle tree in at least %d seconds! Is your server fast enough to keep up with your configured work queue minimums?" % (now - self.lastMerkleUpdate,))
251         
252         def run(self):
253                 while True:
254                         try:
255                                 self.merkleMaker_I()
256                         except:
257                                 self.logger.critical(traceback.format_exc())
258         
259         def start(self, *a, **k):
260                 self._prepare()
261                 super().start(*a, **k)
262         
263         def getMRD(self):
264                 (prevBlock, bits) = self.currentBlock
265                 try:
266                         MRD = self.merkleRoots.pop()
267                         self.LowestMerkleRoots = min(len(self.merkleRoots), self.LowestMerkleRoots)
268                         rollPrevBlk = False
269                 except IndexError:
270                         qsz = self.clearMerkleRoots.qsize()
271                         if qsz < 0x10:
272                                 self.logger.warning('clearMerkleRoots running out! only %d left' % (qsz,))
273                         MRD = self.clearMerkleRoots.get()
274                         self.LowestClearMerkleRoots = min(self.clearMerkleRoots.qsize(), self.LowestClearMerkleRoots)
275                         rollPrevBlk = True
276                 (merkleRoot, merkleTree, cb) = MRD
277                 return (merkleRoot, merkleTree, cb, prevBlock, bits, rollPrevBlk)
278         
279         def getMC(self):
280                 (prevBlock, bits) = self.currentBlock
281                 mt = self.currentMerkleTree
282                 cb = self.makeCoinbase()
283                 return (None, mt, cb, prevBlock, bits)