Bugfix: Tolerate flags in coinbase prefix we provided
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 from util import Bits2Target
80
81 workLog = {}
82 networkTarget = None
83 DupeShareHACK = {}
84
85 server = None
86 def updateBlocks():
87         if server:
88                 server.wakeLongpoll()
89
90 def blockChanged():
91         global DupeShareHACK
92         DupeShareHACK = {}
93         global MM, networkTarget, server
94         networkTarget = Bits2Target(MM.currentBlock[1])
95         workLog.clear()
96         updateBlocks()
97
98
99 from merklemaker import merkleMaker
100 MM = merkleMaker()
101 MM.__dict__.update(config.__dict__)
102 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
103 MM.clearCoinbaseTxn.assemble()
104 MM.makeCoinbaseTxn = makeCoinbaseTxn
105 MM.onBlockChange = blockChanged
106 MM.onBlockUpdate = updateBlocks
107 MM.start()
108
109
110 from binascii import b2a_hex
111 from copy import deepcopy
112 from struct import pack, unpack
113 from time import time
114 from util import RejectedShare, dblsha, hash2int, swap32
115 import jsonrpc
116 import threading
117 import traceback
118
119 gotwork = None
120 if hasattr(config, 'GotWorkURI'):
121         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
122
123 def submitGotwork(info):
124         try:
125                 gotwork.gotwork(info)
126         except:
127                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
128
129 db = None
130 if hasattr(config, 'DbOptions'):
131         import psycopg2
132         db = psycopg2.connect(**config.DbOptions)
133
134 def getBlockHeader(username):
135         MRD = MM.getMRD()
136         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
137         timestamp = pack('<L', int(time()))
138         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
139         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
140         return hdr
141
142 def getBlockTemplate(username):
143         MC = MM.getMC()
144         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
145         wliLen = coinbase[0]
146         wli = coinbase[1:wliLen+1]
147         workLog.setdefault(username, {})[wli] = (MC, time())
148         return MC
149
150 def YN(b):
151         if b is None:
152                 return None
153         return 'Y' if b else 'N'
154
155 def logShare(share):
156         if db is None:
157                 return
158         dbc = db.cursor()
159         rem_host = share.get('remoteHost', '?')
160         username = share['username']
161         reason = share.get('rejectReason', None)
162         upstreamResult = share.get('upstreamResult', None)
163         if '_origdata' in share:
164                 solution = share['_origdata']
165         else:
166                 solution = b2a_hex(swap32(share['data'])).decode('utf8')
167         #solution = b2a_hex(solution).decode('utf8')
168         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
169         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
170         dbc.execute(stmt, params)
171         db.commit()
172
173 RBDs = []
174 RBPs = []
175
176 from bitcoin.varlen import varlenEncode, varlenDecode
177 import bitcoin.txn
178 def assembleBlock(blkhdr, txlist):
179         payload = blkhdr
180         payload += varlenEncode(len(txlist))
181         for tx in txlist:
182                 payload += tx.data
183         return payload
184
185 def blockSubmissionThread(payload):
186         while True:
187                 try:
188                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
189                         break
190                 except:
191                         pass
192
193 def checkShare(share):
194         data = share['data']
195         data = data[:80]
196         (prevBlock, bits) = MM.currentBlock
197         sharePrevBlock = data[4:36]
198         if sharePrevBlock != prevBlock:
199                 if sharePrevBlock == MM.lastBlock[0]:
200                         raise RejectedShare('stale-prevblk')
201                 raise RejectedShare('bad-prevblk')
202         
203         # TODO: use userid
204         username = share['username']
205         if username not in workLog:
206                 raise RejectedShare('unknown-user')
207         
208         if data[72:76] != bits:
209                 raise RejectedShare('bad-diffbits')
210         if data[:4] != b'\1\0\0\0':
211                 raise RejectedShare('bad-version')
212         
213         shareMerkleRoot = data[36:68]
214         if 'blkdata' in share:
215                 pl = share['blkdata']
216                 (txncount, pl) = varlenDecode(pl)
217                 cbtxn = bitcoin.txn.Txn(pl)
218                 cbtxn.disassemble(retExtra=True)
219                 coinbase = cbtxn.getCoinbase()
220                 wliLen = coinbase[0]
221                 wli = coinbase[1:wliLen+1]
222                 mode = 'MC'
223                 moden = 1
224         else:
225                 wli = shareMerkleRoot
226                 mode = 'MRD'
227                 moden = 0
228         
229         MWL = workLog[username]
230         if wli not in MWL:
231                 raise RejectedShare('unknown-work')
232         (wld, t) = MWL[wli]
233         share[mode] = wld
234         
235         if data in DupeShareHACK:
236                 raise RejectedShare('duplicate')
237         DupeShareHACK[data] = None
238         
239         shareTimestamp = unpack('<L', data[68:72])[0]
240         shareTime = share['time'] = time()
241         if shareTime < t - 120:
242                 raise RejectedShare('stale-work')
243         if shareTimestamp < shareTime - 300:
244                 raise RejectedShare('time-too-old')
245         if shareTimestamp > shareTime + 7200:
246                 raise RejectedShare('time-too-new')
247         
248         blkhash = dblsha(data)
249         if blkhash[28:] != b'\0\0\0\0':
250                 raise RejectedShare('H-not-zero')
251         blkhashn = hash2int(blkhash)
252         
253         global networkTarget
254         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
255         logfunc('BLKHASH: %64x' % (blkhashn,))
256         logfunc(' TARGET: %64x' % (networkTarget,))
257         
258         workMerkleTree = wld[1]
259         workCoinbase = wld[2]
260         
261         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
262         txlist = workMerkleTree.data
263         cbtxn = txlist[0]
264         cbtxn.setCoinbase(workCoinbase)
265         cbtxn.assemble()
266         
267         if blkhashn <= networkTarget:
268                 logfunc("Submitting upstream")
269                 if not moden:
270                         RBDs.append( deepcopy( (data, txlist) ) )
271                         payload = assembleBlock(data, txlist)
272                 else:
273                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
274                         payload = share['data'] + share['blkdata']
275                 logfunc('Real block payload: %s' % (payload,))
276                 RBPs.append(payload)
277                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
278                 bcnode.submitBlock(payload)
279                 share['upstreamResult'] = True
280                 MM.updateBlock(blkhash)
281         
282         # Gotwork hack...
283         if gotwork and blkhashn <= config.GotWorkTarget:
284                 try:
285                         coinbaseMrkl = cbtxn.data
286                         coinbaseMrkl += blkhash
287                         steps = workMerkleTree._steps
288                         coinbaseMrkl += pack('B', len(steps))
289                         for step in steps:
290                                 coinbaseMrkl += step
291                         coinbaseMrkl += b"\0\0\0\0"
292                         info = {}
293                         info['hash'] = b2a_hex(blkhash).decode('ascii')
294                         info['header'] = b2a_hex(data).decode('ascii')
295                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
296                         thr = threading.Thread(target=submitGotwork, args=(info,))
297                         thr.daemon = True
298                         thr.start()
299                 except:
300                         checkShare.logger.warning('Failed to build gotwork request')
301         
302         if moden:
303                 cbpre = cbtxn.getCoinbase()
304                 cbpreLen = len(cbpre)
305                 if coinbase[:cbpreLen] != cbpre:
306                         raise RejectedShare('bad-cb-prefix')
307                 
308                 # Filter out known "I support" flags, to prevent exploits
309                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
310                         if coinbase.find(ff) > cbpreLen - len(ff):
311                                 raise RejectedShare('bad-cb-flag')
312                 
313                 if len(coinbase) > 100:
314                         raise RejectedShare('bad-cb-length')
315                 
316                 cbtxn = deepcopy(cbtxn)
317                 cbtxn.setCoinbase(coinbase)
318                 cbtxn.assemble()
319                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
320                         raise RejectedShare('bad-txnmrklroot')
321                 
322                 txlist = [cbtxn,] + txlist[1:]
323                 allowed = assembleBlock(data, txlist)
324                 if allowed != share['data'] + share['blkdata']:
325                         raise RejectedShare('bad-txns')
326         
327         logShare(share)
328 checkShare.logger = logging.getLogger('checkShare')
329
330 def receiveShare(share):
331         # TODO: username => userid
332         try:
333                 checkShare(share)
334         except RejectedShare as rej:
335                 share['rejectReason'] = str(rej)
336                 logShare(share)
337                 raise
338         # TODO
339
340 def newBlockNotification(signum, frame):
341         logging.getLogger('newBlockNotification').info('Received new block notification')
342         MM.updateMerkleTree()
343         # TODO: Force RESPOND TO LONGPOLLS?
344         pass
345
346 from signal import signal, SIGUSR1
347 signal(SIGUSR1, newBlockNotification)
348
349
350 import os
351 import os.path
352 import pickle
353 import signal
354 import sys
355 from time import sleep
356 import traceback
357
358 SAVE_STATE_FILENAME = 'eloipool.worklog'
359
360 def stopServers():
361         logger = logging.getLogger('stopServers')
362         
363         logger.info('Stopping servers...')
364         global bcnode, server
365         servers = (bcnode, server)
366         for s in servers:
367                 s.keepgoing = False
368         for s in servers:
369                 s.wakeup()
370         i = 0
371         while True:
372                 sl = []
373                 for s in servers:
374                         if s.running:
375                                 sl.append(s.__class__.__name__)
376                 if not sl:
377                         break
378                 i += 1
379                 if i >= 0x100:
380                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
381                         break
382                 sleep(0.01)
383         
384         for s in servers:
385                 for fd in s._fd.keys():
386                         os.close(fd)
387
388 def saveState():
389         logger = logging.getLogger('saveState')
390         
391         # Then, save data needed to resume work
392         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
393         i = 0
394         while True:
395                 try:
396                         with open(SAVE_STATE_FILENAME, 'wb') as f:
397                                 pickle.dump( (workLog, DupeShareHACK), f )
398                         break
399                 except:
400                         i += 1
401                         if i >= 0x10000:
402                                 logger.error('Failed to save work\n' + traceback.format_exc())
403                                 try:
404                                         os.unlink(SAVE_STATE_FILENAME)
405                                 except:
406                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
407
408 def exit():
409         stopServers()
410         saveState()
411         logging.getLogger('exit').info('Goodbye...')
412         os.kill(os.getpid(), signal.SIGTERM)
413         sys.exit(0)
414
415 def restart():
416         stopServers()
417         saveState()
418         logging.getLogger('restart').info('Restarting...')
419         try:
420                 os.execv(sys.argv[0], sys.argv)
421         except:
422                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
423
424 def restoreState():
425         if not os.path.exists(SAVE_STATE_FILENAME):
426                 return
427         
428         global workLog, DupeShareHACK
429         
430         logger = logging.getLogger('restoreState')
431         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
432         try:
433                 with open(SAVE_STATE_FILENAME, 'rb') as f:
434                         data = pickle.load(f)
435                         workLog = data[0]
436                         DupeShareHACK = data[1]
437         except:
438                 logger.error('Failed to restore state\n' + traceback.format_exc())
439                 return
440         logger.info('State restored successfully')
441
442
443 from jsonrpcserver import JSONRPCListener, JSONRPCServer
444 import interactivemode
445 from networkserver import NetworkListener
446 import threading
447
448 if __name__ == "__main__":
449         LSbc = []
450         if not hasattr(config, 'BitcoinNodeAddresses'):
451                 config.BitcoinNodeAddresses = ()
452         for a in config.BitcoinNodeAddresses:
453                 LSbc.append(NetworkListener(bcnode, a))
454         
455         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
456                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
457         
458         server = JSONRPCServer()
459         if hasattr(config, 'JSONRPCAddress'):
460                 if not hasattr(config, 'JSONRPCAddresses'):
461                         config.JSONRPCAddresses = []
462                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
463         LS = []
464         for a in config.JSONRPCAddresses:
465                 LS.append(JSONRPCListener(server, a))
466         if hasattr(config, 'SecretUser'):
467                 server.SecretUser = config.SecretUser
468         server.aux = MM.CoinbaseAux
469         server.getBlockHeader = getBlockHeader
470         server.getBlockTemplate = getBlockTemplate
471         server.receiveShare = receiveShare
472         server.RaiseRedFlags = RaiseRedFlags
473         
474         restoreState()
475         
476         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
477         bcnode_thr.daemon = True
478         bcnode_thr.start()
479         
480         server.serve_forever()