Bugfix: checkShare: Check for work identifier after skipping over height-in-coinbase
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 def assembleBlock(blkhdr, txlist):
167         payload = blkhdr
168         payload += varlenEncode(len(txlist))
169         for tx in txlist:
170                 payload += tx.data
171         return payload
172
173 def blockSubmissionThread(payload):
174         while True:
175                 try:
176                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
177                         break
178                 except:
179                         pass
180         if not rv:
181                 RaiseRedFlags('Upstream rejected block!')
182
183 _STA = '%064x' % (config.ShareTarget,)
184 def checkShare(share):
185         shareTime = share['time'] = time()
186         
187         data = share['data']
188         data = data[:80]
189         (prevBlock, height, bits) = MM.currentBlock
190         sharePrevBlock = data[4:36]
191         if sharePrevBlock != prevBlock:
192                 if sharePrevBlock == MM.lastBlock[0]:
193                         raise RejectedShare('stale-prevblk')
194                 raise RejectedShare('bad-prevblk')
195         
196         # TODO: use userid
197         username = share['username']
198         if username not in workLog:
199                 raise RejectedShare('unknown-user')
200         
201         if data[72:76] != bits:
202                 raise RejectedShare('bad-diffbits')
203         if data[:4] != b'\1\0\0\0':
204                 raise RejectedShare('bad-version')
205         
206         shareMerkleRoot = data[36:68]
207         if 'blkdata' in share:
208                 pl = share['blkdata']
209                 (txncount, pl) = varlenDecode(pl)
210                 cbtxn = bitcoin.txn.Txn(pl)
211                 cbtxn.disassemble(retExtra=True)
212                 coinbase = cbtxn.getCoinbase()
213                 wliPos = coinbase[0] + 2
214                 wliLen = coinbase[wliPos - 1]
215                 wli = coinbase[wliPos:wliPos+wliLen]
216                 mode = 'MC'
217                 moden = 1
218         else:
219                 wli = shareMerkleRoot
220                 mode = 'MRD'
221                 moden = 0
222         
223         MWL = workLog[username]
224         if wli not in MWL:
225                 raise RejectedShare('unknown-work')
226         (wld, issueT) = MWL[wli]
227         share[mode] = wld
228         
229         if data in DupeShareHACK:
230                 raise RejectedShare('duplicate')
231         DupeShareHACK[data] = None
232         
233         blkhash = dblsha(data)
234         if blkhash[28:] != b'\0\0\0\0':
235                 raise RejectedShare('H-not-zero')
236         blkhashn = hash2int(blkhash)
237         
238         global networkTarget
239         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
240         logfunc('BLKHASH: %64x' % (blkhashn,))
241         logfunc(' TARGET: %64x' % (networkTarget,))
242         
243         workMerkleTree = wld[1]
244         workCoinbase = wld[2]
245         
246         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
247         txlist = workMerkleTree.data
248         txlist = [deepcopy(txlist[0]),] + txlist[1:]
249         cbtxn = txlist[0]
250         cbtxn.setCoinbase(workCoinbase)
251         cbtxn.assemble()
252         
253         if blkhashn <= networkTarget:
254                 logfunc("Submitting upstream")
255                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
256                 if not moden:
257                         payload = assembleBlock(data, txlist)
258                 else:
259                         payload = share['data'] + share['blkdata']
260                 logfunc('Real block payload: %s' % (payload,))
261                 RBPs.append(payload)
262                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
263                 bcnode.submitBlock(payload)
264                 share['upstreamResult'] = True
265                 MM.updateBlock(blkhash)
266         
267         # Gotwork hack...
268         if gotwork and blkhashn <= config.GotWorkTarget:
269                 try:
270                         coinbaseMrkl = cbtxn.data
271                         coinbaseMrkl += blkhash
272                         steps = workMerkleTree._steps
273                         coinbaseMrkl += pack('B', len(steps))
274                         for step in steps:
275                                 coinbaseMrkl += step
276                         coinbaseMrkl += b"\0\0\0\0"
277                         info = {}
278                         info['hash'] = b2a_hex(blkhash).decode('ascii')
279                         info['header'] = b2a_hex(data).decode('ascii')
280                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
281                         thr = threading.Thread(target=submitGotwork, args=(info,))
282                         thr.daemon = True
283                         thr.start()
284                 except:
285                         checkShare.logger.warning('Failed to build gotwork request')
286         
287         if blkhashn > config.ShareTarget:
288                 raise RejectedShare('high-hash')
289         share['target'] = config.ShareTarget
290         share['_targethex'] = _STA
291         
292         shareTimestamp = unpack('<L', data[68:72])[0]
293         if shareTime < issueT - 120:
294                 raise RejectedShare('stale-work')
295         if shareTimestamp < shareTime - 300:
296                 raise RejectedShare('time-too-old')
297         if shareTimestamp > shareTime + 7200:
298                 raise RejectedShare('time-too-new')
299         
300         if moden:
301                 cbpre = cbtxn.getCoinbase()
302                 cbpreLen = len(cbpre)
303                 if coinbase[:cbpreLen] != cbpre:
304                         raise RejectedShare('bad-cb-prefix')
305                 
306                 # Filter out known "I support" flags, to prevent exploits
307                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
308                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
309                                 raise RejectedShare('bad-cb-flag')
310                 
311                 if len(coinbase) > 100:
312                         raise RejectedShare('bad-cb-length')
313                 
314                 cbtxn.setCoinbase(coinbase)
315                 cbtxn.assemble()
316                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
317                         raise RejectedShare('bad-txnmrklroot')
318                 
319                 allowed = assembleBlock(data, txlist)
320                 if allowed != share['data'] + share['blkdata']:
321                         raise RejectedShare('bad-txns')
322 checkShare.logger = logging.getLogger('checkShare')
323
324 def receiveShare(share):
325         # TODO: username => userid
326         try:
327                 checkShare(share)
328         except RejectedShare as rej:
329                 share['rejectReason'] = str(rej)
330                 raise
331         finally:
332                 if '_origdata' in share:
333                         share['solution'] = share['_origdata']
334                 else:
335                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
336                 for i in loggersShare:
337                         i(share)
338
339 def newBlockNotification():
340         logging.getLogger('newBlockNotification').info('Received new block notification')
341         MM.updateMerkleTree()
342         # TODO: Force RESPOND TO LONGPOLLS?
343         pass
344
345 def newBlockNotificationSIGNAL(signum, frame):
346         # Use a new thread, in case the signal handler is called with locks held
347         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
348         thr.daemon = True
349         thr.start()
350
351 from signal import signal, SIGUSR1
352 signal(SIGUSR1, newBlockNotificationSIGNAL)
353
354
355 import os
356 import os.path
357 import pickle
358 import signal
359 import sys
360 from time import sleep
361 import traceback
362
363 SAVE_STATE_FILENAME = 'eloipool.worklog'
364
365 def stopServers():
366         logger = logging.getLogger('stopServers')
367         
368         if hasattr(stopServers, 'already'):
369                 logger.debug('Already tried to stop servers before')
370                 return
371         stopServers.already = True
372         
373         logger.info('Stopping servers...')
374         global bcnode, server
375         servers = (bcnode, server)
376         for s in servers:
377                 s.keepgoing = False
378         for s in servers:
379                 try:
380                         s.wakeup()
381                 except:
382                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
383         i = 0
384         while True:
385                 sl = []
386                 for s in servers:
387                         if s.running:
388                                 sl.append(s.__class__.__name__)
389                 if not sl:
390                         break
391                 i += 1
392                 if i >= 0x100:
393                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
394                         break
395                 sleep(0.01)
396         
397         for s in servers:
398                 for fd in s._fd.keys():
399                         os.close(fd)
400
401 def saveState(t = None):
402         logger = logging.getLogger('saveState')
403         
404         # Then, save data needed to resume work
405         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
406         i = 0
407         while True:
408                 try:
409                         with open(SAVE_STATE_FILENAME, 'wb') as f:
410                                 pickle.dump(t, f)
411                                 pickle.dump(DupeShareHACK, f)
412                                 pickle.dump(workLog, f)
413                         break
414                 except:
415                         i += 1
416                         if i >= 0x10000:
417                                 logger.error('Failed to save work\n' + traceback.format_exc())
418                                 try:
419                                         os.unlink(SAVE_STATE_FILENAME)
420                                 except:
421                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
422
423 def exit():
424         t = time()
425         stopServers()
426         saveState(t)
427         logging.getLogger('exit').info('Goodbye...')
428         os.kill(os.getpid(), signal.SIGTERM)
429         sys.exit(0)
430
431 def restart():
432         t = time()
433         stopServers()
434         saveState(t)
435         logging.getLogger('restart').info('Restarting...')
436         try:
437                 os.execv(sys.argv[0], sys.argv)
438         except:
439                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
440
441 def restoreState():
442         if not os.path.exists(SAVE_STATE_FILENAME):
443                 return
444         
445         global workLog, DupeShareHACK
446         
447         logger = logging.getLogger('restoreState')
448         s = os.stat(SAVE_STATE_FILENAME)
449         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
450         try:
451                 with open(SAVE_STATE_FILENAME, 'rb') as f:
452                         t = pickle.load(f)
453                         if type(t) == tuple:
454                                 if len(t) > 2:
455                                         # Future formats, not supported here
456                                         ver = t[3]
457                                         # TODO
458                                 
459                                 # Old format, from 2012-02-02 to 2012-02-03
460                                 workLog = t[0]
461                                 DupeShareHACK = t[1]
462                                 t = None
463                         else:
464                                 if isinstance(t, dict):
465                                         # Old format, from 2012-02-03 to 2012-02-03
466                                         DupeShareHACK = t
467                                         t = None
468                                 else:
469                                         # Current format, from 2012-02-03 onward
470                                         DupeShareHACK = pickle.load(f)
471                                 
472                                 if t + 120 >= time():
473                                         workLog = pickle.load(f)
474                                 else:
475                                         logger.debug('Skipping restore of expired workLog')
476         except:
477                 logger.error('Failed to restore state\n' + traceback.format_exc())
478                 return
479         logger.info('State restored successfully')
480         if t:
481                 logger.info('Total downtime: %g seconds' % (time() - t,))
482
483
484 from jsonrpcserver import JSONRPCListener, JSONRPCServer
485 import interactivemode
486 from networkserver import NetworkListener
487 import threading
488 import sharelogging
489 import imp
490
491 if __name__ == "__main__":
492         if not hasattr(config, 'ShareLogging'):
493                 config.ShareLogging = ()
494         if hasattr(config, 'DbOptions'):
495                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
496                 config.ShareLogging = list(config.ShareLogging)
497                 config.ShareLogging.append( {
498                         'type': 'sql',
499                         'engine': 'postgres',
500                         'dbopts': config.DbOptions,
501                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
502                 } )
503         for i in config.ShareLogging:
504                 if not hasattr(i, 'keys'):
505                         name, parameters = i
506                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
507                         if name == 'postgres':
508                                 name = 'sql'
509                                 i = {
510                                         'engine': 'postgres',
511                                         'dbopts': parameters,
512                                 }
513                         elif name == 'logfile':
514                                 i = {}
515                                 i['thropts'] = parameters
516                                 if 'filename' in parameters:
517                                         i['filename'] = parameters['filename']
518                                         i['thropts'] = dict(i['thropts'])
519                                         del i['thropts']['filename']
520                         else:
521                                 i = parameters
522                         i['type'] = name
523                 
524                 name = i['type']
525                 parameters = i
526                 try:
527                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
528                         m = imp.load_module(name, fp, pathname, description)
529                         lo = getattr(m, name)(**parameters)
530                         loggersShare.append(lo.logShare)
531                 except:
532                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
533
534         LSbc = []
535         if not hasattr(config, 'BitcoinNodeAddresses'):
536                 config.BitcoinNodeAddresses = ()
537         for a in config.BitcoinNodeAddresses:
538                 LSbc.append(NetworkListener(bcnode, a))
539         
540         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
541                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
542         
543         import jsonrpc_getmemorypool
544         import jsonrpc_getwork
545         import jsonrpc_setworkaux
546         
547         server = JSONRPCServer()
548         if hasattr(config, 'JSONRPCAddress'):
549                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
550                 if not hasattr(config, 'JSONRPCAddresses'):
551                         config.JSONRPCAddresses = []
552                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
553         LS = []
554         for a in config.JSONRPCAddresses:
555                 LS.append(JSONRPCListener(server, a))
556         if hasattr(config, 'SecretUser'):
557                 server.SecretUser = config.SecretUser
558         server.aux = MM.CoinbaseAux
559         server.getBlockHeader = getBlockHeader
560         server.getBlockTemplate = getBlockTemplate
561         server.receiveShare = receiveShare
562         server.RaiseRedFlags = RaiseRedFlags
563         server.ShareTarget = config.ShareTarget
564         
565         if hasattr(config, 'TrustedForwarders'):
566                 server.TrustedForwarders = config.TrustedForwarders
567         server.ServerName = config.ServerName
568         
569         MM.start()
570         
571         restoreState()
572         
573         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
574         bcnode_thr.daemon = True
575         bcnode_thr.start()
576         
577         server.serve_forever()