Use BIP22 "obj" transactions when possible
[bitcoin:eloipool.git] / merklemaker.py
1 # Eloipool - Python Bitcoin pool server
2 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
3 #
4 # This program is free software: you can redistribute it and/or modify
5 # it under the terms of the GNU Affero General Public License as
6 # published by the Free Software Foundation, either version 3 of the
7 # License, or (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU Affero General Public License for more details.
13 #
14 # You should have received a copy of the GNU Affero General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16
17 from binascii import b2a_hex
18 from bitcoin.script import countSigOps
19 from bitcoin.txn import Txn
20 from collections import deque
21 from queue import Queue
22 import jsonrpc
23 import logging
24 from math import log
25 from merkletree import MerkleTree
26 from struct import pack
27 import threading
28 from time import sleep, time
29 import traceback
30
31 _makeCoinbase = [0, 0]
32
33 class merkleMaker(threading.Thread):
34         OldGMP = None
35         GMPReq = {
36                 'capabilities': [
37                         'coinbasevalue',
38                         'coinbase/append',
39                         'coinbase',
40                         'generation',
41                         'time',
42                         'transactions/remove',
43                         'prevblock',
44                 ],
45                 'tx': 'obj',
46         }
47         
48         def __init__(self, *a, **k):
49                 super().__init__(*a, **k)
50                 self.daemon = True
51                 self.logger = logging.getLogger('merkleMaker')
52                 self.CoinbasePrefix = b''
53                 self.CoinbaseAux = {}
54                 self.isOverflowed = False
55                 self.overflowed = 0
56         
57         def _prepare(self):
58                 self.access = jsonrpc.ServiceProxy(self.UpstreamURI)
59                 
60                 self.currentBlock = (None, None)
61                 self.currentMerkleTree = None
62                 self.merkleRoots = deque(maxlen=self.WorkQueueSizeRegular[1])
63                 self.LowestMerkleRoots = self.WorkQueueSizeRegular[1]
64                 self.clearMerkleTree = MerkleTree([self.clearCoinbaseTxn])
65                 self.clearMerkleRoots = Queue(self.WorkQueueSizeLongpoll[1])
66                 self.LowestClearMerkleRoots = self.WorkQueueSizeLongpoll[1]
67                 
68                 if not hasattr(self, 'WarningDelay'):
69                         self.WarningDelay = max(15, self.MinimumTxnUpdateWait * 2)
70                 if not hasattr(self, 'WarningDelayTxnLongpoll'):
71                         self.WarningDelayTxnLongpoll = self.WarningDelay
72                 if not hasattr(self, 'WarningDelayMerkleUpdate'):
73                         self.WarningDelayMerkleUpdate = self.WarningDelay
74                 
75                 self.lastMerkleUpdate = 0
76                 self.nextMerkleUpdate = 0
77                 self.lastWarning = {}
78                 global now
79                 now = time()
80                 self.updateMerkleTree()
81         
82         def updateBlock(self, newBlock, bits = None, _HBH = None):
83                 if newBlock == self.currentBlock[0]:
84                         if bits in (None, self.currentBlock[1]):
85                                 return
86                         self.logger.error('Was working on block with wrong specs: %s (bits: %s->%s)' % (
87                                 b2a_hex(newBlock[::-1]).decode('utf8'),
88                                 b2a_hex(self.currentBlock[1][::-1]).decode('utf8'),
89                                 b2a_hex(bits[::-1]).decode('utf8'),
90                         ))
91                 
92                 if bits is None:
93                         bits = self.currentBlock[1]
94                 if _HBH is None:
95                         _HBH = (b2a_hex(newBlock[::-1]).decode('utf8'), b2a_hex(bits[::-1]).decode('utf8'))
96                 self.logger.info('New block: %s (bits: %s)' % _HBH)
97                 self.merkleRoots.clear()
98                 self.currentMerkleTree = self.clearMerkleTree
99                 if self.currentBlock[0] != newBlock:
100                         self.lastBlock = self.currentBlock
101                 self.currentBlock = (newBlock, bits)
102                 self.needMerkle = 2
103                 self.onBlockChange()
104         
105         def updateMerkleTree(self):
106                 global now
107                 self.logger.debug('Polling bitcoind for memorypool')
108                 self.nextMerkleUpdate = now + self.TxnUpdateRetryWait
109                 
110                 try:
111                         MP = self.access.getmemorypool(self.GMPReq)
112                         self.OldGMP = False
113                         oMP = None
114                 except:
115                         MP = False
116                         try:
117                                 oMP = self.access.getmemorypool()
118                         except:
119                                 oMP = False
120                         if oMP is False:
121                                 # This way, we get the error from the BIP22 call if the old one fails too
122                                 raise
123                 if MP is False:
124                         # Pre-BIP22 server (bitcoind <0.7 or Eloipool <20120513)
125                         if not self.OldGMP:
126                                 self.OldGMP = True
127                                 self.logger.warning('Upstream server is not BIP 22 compliant')
128                         MP = oMP or self.access.getmemorypool()
129                 
130                 prevBlock = bytes.fromhex(MP['previousblockhash'])[::-1]
131                 bits = bytes.fromhex(MP['bits'])[::-1]
132                 if (prevBlock, bits) != self.currentBlock:
133                         self.updateBlock(prevBlock, bits, _HBH=(MP['previousblockhash'], MP['bits']))
134                 
135                 txnlist = MP['transactions']
136                 if len(txnlist) and isinstance(txnlist[0], dict):
137                         txninfo = txnlist
138                         txnlist = tuple(a['data'] for a in txnlist)
139                         txninfo.insert(0, {
140                         })
141                 elif 'transactionfees' in MP:
142                         # Backward compatibility with pre-BIP22 gmp_fees branch
143                         txninfo = [{'fee':a} for a in MP['transactionfees']]
144                 else:
145                         # Backward compatibility with pre-BIP22 hex-only (bitcoind <0.7, Eloipool <future)
146                         txninfo = [{}] * len(txnlist)
147                 # TODO: cache Txn or at least txid from previous merkle roots?
148                 txnlist = [a for a in map(bytes.fromhex, txnlist)]
149                 
150                 cbtxn = self.makeCoinbaseTxn(MP['coinbasevalue'])
151                 cbtxn.setCoinbase(b'\0\0')
152                 cbtxn.assemble()
153                 txnlist.insert(0, cbtxn.data)
154                 
155                 txnlistsz = sum(map(len, txnlist))
156                 if txnlistsz > 934464:  # 1 "MB" limit - 64 KB breathing room
157                         # FIXME: Try to safely truncate the block
158                         W = 'Making blocks over 1 MB size limit (%d bytes)' % (txnlistsz,)
159                         self._floodWarning(now, 'SizeLimit', lambda: W, W, logf=self.logger.error)
160                 
161                 txnlistsz = sum(map(countSigOps, txnlist))
162                 if txnlistsz > 19488:  # 20k limit - 0x200 breathing room
163                         # FIXME: Try to safely truncate the block
164                         W = 'Making blocks over 20k SigOp limit (%d)' % (txnlistsz,)
165                         self._floodWarning(now, 'SigOpLimit', lambda: W, W, logf=self.logger.error)
166                 
167                 txncount = len(txnlist)
168                 idealtxncount = txncount
169                 if hasattr(self, 'Greedy') and self.Greedy:
170                         # Aim to cut off extra zero-fee transactions on the end
171                         # NOTE: not cutting out ones intermixed, in case of dependencies
172                         for i in range(len(txninfo) - 1, 0, -1):
173                                 if 'fee' not in txninfo[i] or txninfo[i]['fee']:
174                                         break
175                                 idealtxncount -= 1
176                 
177                 pot = 2**int(log(idealtxncount, 2))
178                 if pot < idealtxncount:
179                         if pot * 2 <= txncount:
180                                 pot *= 2
181                         else:
182                                 pot = idealtxncount
183                                 POTWarn = "Making merkle tree with %d transactions (ideal: %d; max: %d)" % (pot, idealtxncount, txncount)
184                                 self._floodWarning(now, 'Non-POT', lambda: POTWarn, POTWarn)
185                 txnlist = txnlist[:pot]
186                 
187                 txnlist = [a for a in map(Txn, txnlist[1:])]
188                 txnlist.insert(0, cbtxn)
189                 txnlist = list(txnlist)
190                 newMerkleTree = MerkleTree(txnlist)
191                 if newMerkleTree.merkleRoot() != self.currentMerkleTree.merkleRoot():
192                         self.logger.debug('Updating merkle tree')
193                         self.currentMerkleTree = newMerkleTree
194                 self.lastMerkleUpdate = now
195                 self.nextMerkleUpdate = now + self.MinimumTxnUpdateWait
196                 
197                 if self.needMerkle == 2:
198                         self.needMerkle = 1
199                         self.needMerkleSince = now
200         
201         def makeCoinbase(self):
202                 now = int(time())
203                 if now > _makeCoinbase[0]:
204                         _makeCoinbase[0] = now
205                         _makeCoinbase[1] = 0
206                 else:
207                         _makeCoinbase[1] += 1
208                 rv = self.CoinbasePrefix
209                 rv += pack('>L', now) + pack('>Q', _makeCoinbase[1]).lstrip(b'\0')
210                 # NOTE: Not using varlenEncode, since this is always guaranteed to be < 100
211                 rv = bytes( (len(rv),) ) + rv
212                 for v in self.CoinbaseAux.values():
213                         rv += v
214                 if len(rv) > 100:
215                         t = time()
216                         if self.overflowed < t - 300:
217                                 self.logger.warning('Overflowing coinbase data! %d bytes long' % (len(rv),))
218                                 self.overflowed = t
219                                 self.isOverflowed = True
220                         rv = rv[:100]
221                 else:
222                         self.isOverflowed = False
223                 return rv
224         
225         def makeMerkleRoot(self, merkleTree):
226                 cbtxn = merkleTree.data[0]
227                 cb = self.makeCoinbase()
228                 cbtxn.setCoinbase(cb)
229                 cbtxn.assemble()
230                 merkleRoot = merkleTree.merkleRoot()
231                 return (merkleRoot, merkleTree, cb)
232         
233         _doing_last = None
234         def _doing(self, what):
235                 if self._doing_last == what:
236                         self._doing_i += 1
237                         return
238                 global now
239                 if self._doing_last:
240                         self.logger.debug("Switching from (%4dx in %5.3f seconds) %s => %s" % (self._doing_i, now - self._doing_s, self._doing_last, what))
241                 self._doing_last = what
242                 self._doing_i = 1
243                 self._doing_s = now
244         
245         def _floodWarning(self, now, wid, wmsgf, doin = True, logf = None):
246                 if doin is True:
247                         doin = self._doing_last
248                         def a(f = wmsgf):
249                                 return lambda: "%s (doing %s)" % (f(), doin)
250                         wmsgf = a()
251                 winfo = self.lastWarning.setdefault(wid, [0, None])
252                 (lastTime, lastDoing) = winfo
253                 if now <= lastTime + max(5, self.MinimumTxnUpdateWait) and doin == lastDoing:
254                         return
255                 winfo[0] = now
256                 nowDoing = doin
257                 winfo[1] = nowDoing
258                 if logf is None:
259                         logf = self.logger.warning
260                 logf(wmsgf())
261         
262         def merkleMaker_I(self):
263                 global now
264                 
265                 # First, update merkle tree if we haven't for a while and aren't crunched for time
266                 now = time()
267                 if self.nextMerkleUpdate <= now and self.clearMerkleRoots.qsize() > self.WorkQueueSizeLongpoll[0] and len(self.merkleRoots) > self.WorkQueueSizeRegular[0]:
268                         self.updateMerkleTree()
269                 # Next, fill up the longpoll queue first, since it can be used as a failover for the main queue
270                 elif not self.clearMerkleRoots.full():
271                         self._doing('blank merkle roots')
272                         self.clearMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree))
273                 # Next, fill up the main queue (until they're all current)
274                 elif len(self.merkleRoots) < self.WorkQueueSizeRegular[1] or self.merkleRoots[0][1] != self.currentMerkleTree:
275                         if self.needMerkle == 1 and len(self.merkleRoots) >= self.WorkQueueSizeRegular[1]:
276                                 self.onBlockUpdate()
277                                 self.needMerkle = False
278                         self._doing('regular merkle roots')
279                         self.merkleRoots.append(self.makeMerkleRoot(self.currentMerkleTree))
280                 else:
281                         if self.needMerkle == 1:
282                                 self.onBlockUpdate()
283                                 self.needMerkle = False
284                         self._doing('idle')
285                         # TODO: rather than sleepspin, block until MinimumTxnUpdateWait expires or threading.Condition(?)
286                         sleep(self.IdleSleepTime)
287                 if self.needMerkle == 1 and now > self.needMerkleSince + self.WarningDelayTxnLongpoll:
288                         self._floodWarning(now, 'NeedMerkle', lambda: 'Transaction-longpoll requested %d seconds ago, and still not ready. Is your server fast enough to keep up with your configured WorkQueueSizeRegular maximum?' % (now - self.needMerkleSince,))
289                 if now > self.nextMerkleUpdate + self.WarningDelayMerkleUpdate:
290                         self._floodWarning(now, 'MerkleUpdate', lambda: "Haven't updated the merkle tree in at least %d seconds! Is your server fast enough to keep up with your configured work queue minimums?" % (now - self.lastMerkleUpdate,))
291         
292         def run(self):
293                 while True:
294                         try:
295                                 self.merkleMaker_I()
296                         except:
297                                 self.logger.critical(traceback.format_exc())
298         
299         def start(self, *a, **k):
300                 self._prepare()
301                 super().start(*a, **k)
302         
303         def getMRD(self):
304                 (prevBlock, bits) = self.currentBlock
305                 try:
306                         MRD = self.merkleRoots.pop()
307                         self.LowestMerkleRoots = min(len(self.merkleRoots), self.LowestMerkleRoots)
308                         rollPrevBlk = False
309                 except IndexError:
310                         qsz = self.clearMerkleRoots.qsize()
311                         if qsz < 0x10:
312                                 self.logger.warning('clearMerkleRoots running out! only %d left' % (qsz,))
313                         MRD = self.clearMerkleRoots.get()
314                         self.LowestClearMerkleRoots = min(self.clearMerkleRoots.qsize(), self.LowestClearMerkleRoots)
315                         rollPrevBlk = True
316                 (merkleRoot, merkleTree, cb) = MRD
317                 return (merkleRoot, merkleTree, cb, prevBlock, bits, rollPrevBlk)
318         
319         def getMC(self):
320                 (prevBlock, bits) = self.currentBlock
321                 mt = self.currentMerkleTree
322                 cb = self.makeCoinbase()
323                 return (None, mt, cb, prevBlock, bits)