Bugfix: Check fd still has a handler object, in case it has been destroyed in the...
[bitcoin:eloipool.git] / merklemaker.py
1 from binascii import a2b_hex, b2a_hex
2 from bitcoin.script import countSigOps
3 from bitcoin.txn import Txn
4 from collections import deque
5 from queue import Queue
6 import jsonrpc
7 import logging
8 from merkletree import MerkleTree
9 from struct import pack
10 import threading
11 from time import sleep, time
12 import traceback
13
14 _makeCoinbase = [0, 0]
15
16 class merkleMaker(threading.Thread):
17         def __init__(self, *a, **k):
18                 super().__init__(*a, **k)
19                 self.daemon = True
20                 self.logger = logging.getLogger('merkleMaker')
21                 self.CoinbasePrefix = b''
22                 self.CoinbaseAux = {}
23         
24         def _prepare(self):
25                 self.access = jsonrpc.ServiceProxy(self.UpstreamURI)
26                 
27                 self.currentBlock = (None, None)
28                 self.currentMerkleTree = None
29                 self.merkleRoots = deque(maxlen=self.WorkQueueSizeRegular[1])
30                 self.clearMerkleTree = MerkleTree([self.clearCoinbaseTxn])
31                 self.clearMerkleRoots = Queue(self.WorkQueueSizeLongpoll[1])
32                 
33                 self.nextMerkleUpdate = 0
34                 global now
35                 now = time()
36                 self.updateMerkleTree()
37         
38         def updateBlock(self, newBlock, bits = None, _HBH = None):
39                 if newBlock == self.currentBlock[0]:
40                         return
41                 
42                 if bits is None:
43                         bits = self.currentBlock[1]
44                 if _HBH is None:
45                         _HBH = (b2a_hex(newBlock[::-1]).decode('utf8'), b2a_hex(bits[::-1]).decode('utf8'))
46                 self.logger.debug('New block: %s (bits: %s)' % _HBH)
47                 self.merkleRoots.clear()
48                 self.currentMerkleTree = self.clearMerkleTree
49                 self.lastBlock = self.currentBlock
50                 self.currentBlock = (newBlock, bits)
51                 self.needMerkle = 2
52                 self.onBlockChange()
53         
54         def updateMerkleTree(self):
55                 global now
56                 self.logger.debug('Polling bitcoind for memorypool')
57                 self.nextMerkleUpdate = now + self.TxnUpdateRetryWait
58                 MP = self.access.getmemorypool()
59                 prevBlock = a2b_hex(MP['previousblockhash'])[::-1]
60                 bits = a2b_hex(MP['bits'])[::-1]
61                 if (prevBlock, bits) != self.currentBlock:
62                         self.updateBlock(prevBlock, bits, _HBH=(MP['previousblockhash'], MP['bits']))
63                 # TODO: cache Txn or at least txid from previous merkle roots?
64                 txnlist = [a for a in map(a2b_hex, MP['transactions'])]
65                 
66                 t = self.makeCoinbaseTxn(MP['coinbasevalue'])
67                 t.setCoinbase(b'\0\0')
68                 t.assemble()
69                 txnlist.insert(0, t.data)
70                 
71                 txnlistsz = sum(map(len, txnlist))
72                 while txnlistsz > 934464:  # TODO: 1 "MB" limit - 64 KB breathing room
73                         self.logger.debug('Trimming transaction for size limit')
74                         txnlistsz -= len(txnlist.pop())
75                 
76                 txnlistsz = sum(map(countSigOps, txnlist))
77                 while txnlistsz > 19488:  # TODO: 20k limit - 0x200 breathing room
78                         self.logger.debug('Trimming transaction for SigOp limit')
79                         txnlistsz -= countSigOps(txnlist.pop())
80                 
81                 txnlist = [a for a in map(Txn, txnlist[1:])]
82                 txnlist.insert(0, t)
83                 txnlist = list(txnlist)
84                 newMerkleTree = MerkleTree(txnlist)
85                 if newMerkleTree.merkleRoot() != self.currentMerkleTree.merkleRoot():
86                         self.logger.debug('Updating merkle tree')
87                         self.currentMerkleTree = newMerkleTree
88                 self.nextMerkleUpdate = now + self.MinimumTxnUpdateWait
89                 
90                 if self.needMerkle == 2:
91                         self.needMerkle = 1
92         
93         def makeCoinbase(self):
94                 now = int(time())
95                 if now > _makeCoinbase[0]:
96                         _makeCoinbase[0] = now
97                         _makeCoinbase[1] = 0
98                 else:
99                         _makeCoinbase[1] += 1
100                 rv = self.CoinbasePrefix
101                 rv += pack('>L', now) + pack('>Q', _makeCoinbase[1]).lstrip(b'\0')
102                 for v in self.CoinbaseAux.values():
103                         rv += v
104                 return rv[:100]
105         
106         def makeMerkleRoot(self, merkleTree):
107                 t = merkleTree.data[0]
108                 cb = self.makeCoinbase()
109                 t.setCoinbase(cb)
110                 t.assemble()
111                 merkleRoot = merkleTree.merkleRoot()
112                 return (merkleRoot, merkleTree, cb)
113         
114         _doing_last = None
115         def _doing(self, what):
116                 if self._doing_last == what:
117                         self._doing_i += 1
118                         return
119                 global now
120                 if self._doing_last:
121                         self.logger.debug("Switching from (%4dx in %5.3f seconds) %s => %s" % (self._doing_i, now - self._doing_s, self._doing_last, what))
122                 self._doing_last = what
123                 self._doing_i = 1
124                 self._doing_s = now
125         
126         def merkleMaker_I(self):
127                 global now
128                 
129                 # First, update merkle tree if we haven't for a while and aren't crunched for time
130                 now = time()
131                 if self.nextMerkleUpdate <= now and self.clearMerkleRoots.qsize() > self.WorkQueueSizeLongpoll[0] and len(self.merkleRoots) > self.WorkQueueSizeRegular[0]:
132                         self.updateMerkleTree()
133                 # Next, fill up the longpoll queue first, since it can be used as a failover for the main queue
134                 elif not self.clearMerkleRoots.full():
135                         self._doing('blank merkle roots')
136                         self.clearMerkleRoots.put(self.makeMerkleRoot(self.clearMerkleTree))
137                 # Next, fill up the main queue (until they're all current)
138                 elif len(self.merkleRoots) < self.WorkQueueSizeRegular[1] or self.merkleRoots[0][1] != self.currentMerkleTree:
139                         self._doing('regular merkle roots')
140                         self.merkleRoots.append(self.makeMerkleRoot(self.currentMerkleTree))
141                 else:
142                         if self.needMerkle == 1:
143                                 self.onBlockUpdate()
144                                 self.needMerkle = False
145                         self._doing('idle')
146                         # TODO: rather than sleepspin, block until MinimumTxnUpdateWait expires or threading.Condition(?)
147                         sleep(self.IdleSleepTime)
148         
149         def run(self):
150                 while True:
151                         try:
152                                 self.merkleMaker_I()
153                                 self._THISISUGLY._flushrecv()
154                         except:
155                                 self.logger.critical(traceback.format_exc())
156         
157         def start(self, *a, **k):
158                 self._prepare()
159                 super().start(*a, **k)
160         
161         def getMRD(self):
162                 (prevBlock, bits) = self.currentBlock
163                 try:
164                         MRD = self.merkleRoots.pop()
165                         rollPrevBlk = False
166                 except IndexError:
167                         MRD = self.clearMerkleRoots.get()
168                         rollPrevBlk = True
169                 (merkleRoot, merkleTree, cb) = MRD
170                 return (merkleRoot, merkleTree, cb, prevBlock, bits, rollPrevBlk)