Merge branch 'bugfix_hicb_getMC'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 def assembleBlock(blkhdr, txlist):
167         payload = blkhdr
168         payload += varlenEncode(len(txlist))
169         for tx in txlist:
170                 payload += tx.data
171         return payload
172
173 def blockSubmissionThread(payload):
174         while True:
175                 try:
176                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
177                         break
178                 except:
179                         pass
180         if not rv:
181                 RaiseRedFlags('Upstream rejected block!')
182
183 _STA = '%064x' % (config.ShareTarget,)
184 def checkShare(share):
185         shareTime = share['time'] = time()
186         
187         data = share['data']
188         data = data[:80]
189         (prevBlock, height, bits) = MM.currentBlock
190         sharePrevBlock = data[4:36]
191         if sharePrevBlock != prevBlock:
192                 if sharePrevBlock == MM.lastBlock[0]:
193                         raise RejectedShare('stale-prevblk')
194                 raise RejectedShare('bad-prevblk')
195         
196         # TODO: use userid
197         username = share['username']
198         if username not in workLog:
199                 raise RejectedShare('unknown-user')
200         
201         if data[72:76] != bits:
202                 raise RejectedShare('bad-diffbits')
203         
204         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
205         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
206         if data[1:4] != b'\0\0\0' or data[0] > 2:
207                 raise RejectedShare('bad-version')
208         
209         shareMerkleRoot = data[36:68]
210         if 'blkdata' in share:
211                 pl = share['blkdata']
212                 (txncount, pl) = varlenDecode(pl)
213                 cbtxn = bitcoin.txn.Txn(pl)
214                 cbtxn.disassemble(retExtra=True)
215                 coinbase = cbtxn.getCoinbase()
216                 wliPos = coinbase[0] + 2
217                 wliLen = coinbase[wliPos - 1]
218                 wli = coinbase[wliPos:wliPos+wliLen]
219                 mode = 'MC'
220                 moden = 1
221         else:
222                 wli = shareMerkleRoot
223                 mode = 'MRD'
224                 moden = 0
225         
226         MWL = workLog[username]
227         if wli not in MWL:
228                 raise RejectedShare('unknown-work')
229         (wld, issueT) = MWL[wli]
230         share[mode] = wld
231         
232         if data in DupeShareHACK:
233                 raise RejectedShare('duplicate')
234         DupeShareHACK[data] = None
235         
236         blkhash = dblsha(data)
237         if blkhash[28:] != b'\0\0\0\0':
238                 raise RejectedShare('H-not-zero')
239         blkhashn = hash2int(blkhash)
240         
241         global networkTarget
242         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
243         logfunc('BLKHASH: %64x' % (blkhashn,))
244         logfunc(' TARGET: %64x' % (networkTarget,))
245         
246         workMerkleTree = wld[1]
247         workCoinbase = wld[2]
248         
249         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
250         txlist = workMerkleTree.data
251         txlist = [deepcopy(txlist[0]),] + txlist[1:]
252         cbtxn = txlist[0]
253         cbtxn.setCoinbase(workCoinbase)
254         cbtxn.assemble()
255         
256         if blkhashn <= networkTarget:
257                 logfunc("Submitting upstream")
258                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
259                 if not moden:
260                         payload = assembleBlock(data, txlist)
261                 else:
262                         payload = share['data'] + share['blkdata']
263                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
264                 RBPs.append(payload)
265                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
266                 bcnode.submitBlock(payload)
267                 share['upstreamResult'] = True
268                 MM.updateBlock(blkhash)
269         
270         # Gotwork hack...
271         if gotwork and blkhashn <= config.GotWorkTarget:
272                 try:
273                         coinbaseMrkl = cbtxn.data
274                         coinbaseMrkl += blkhash
275                         steps = workMerkleTree._steps
276                         coinbaseMrkl += pack('B', len(steps))
277                         for step in steps:
278                                 coinbaseMrkl += step
279                         coinbaseMrkl += b"\0\0\0\0"
280                         info = {}
281                         info['hash'] = b2a_hex(blkhash).decode('ascii')
282                         info['header'] = b2a_hex(data).decode('ascii')
283                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
284                         thr = threading.Thread(target=submitGotwork, args=(info,))
285                         thr.daemon = True
286                         thr.start()
287                 except:
288                         checkShare.logger.warning('Failed to build gotwork request')
289         
290         if blkhashn > config.ShareTarget:
291                 raise RejectedShare('high-hash')
292         share['target'] = config.ShareTarget
293         share['_targethex'] = _STA
294         
295         shareTimestamp = unpack('<L', data[68:72])[0]
296         if shareTime < issueT - 120:
297                 raise RejectedShare('stale-work')
298         if shareTimestamp < shareTime - 300:
299                 raise RejectedShare('time-too-old')
300         if shareTimestamp > shareTime + 7200:
301                 raise RejectedShare('time-too-new')
302         
303         if moden:
304                 cbpre = cbtxn.getCoinbase()
305                 cbpreLen = len(cbpre)
306                 if coinbase[:cbpreLen] != cbpre:
307                         raise RejectedShare('bad-cb-prefix')
308                 
309                 # Filter out known "I support" flags, to prevent exploits
310                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
311                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
312                                 raise RejectedShare('bad-cb-flag')
313                 
314                 if len(coinbase) > 100:
315                         raise RejectedShare('bad-cb-length')
316                 
317                 cbtxn.setCoinbase(coinbase)
318                 cbtxn.assemble()
319                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
320                         raise RejectedShare('bad-txnmrklroot')
321                 
322                 allowed = assembleBlock(data, txlist)
323                 if allowed != share['data'] + share['blkdata']:
324                         raise RejectedShare('bad-txns')
325 checkShare.logger = logging.getLogger('checkShare')
326
327 def receiveShare(share):
328         # TODO: username => userid
329         try:
330                 checkShare(share)
331         except RejectedShare as rej:
332                 share['rejectReason'] = str(rej)
333                 raise
334         finally:
335                 if '_origdata' in share:
336                         share['solution'] = share['_origdata']
337                 else:
338                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
339                 for i in loggersShare:
340                         i(share)
341
342 def newBlockNotification():
343         logging.getLogger('newBlockNotification').info('Received new block notification')
344         MM.updateMerkleTree()
345         # TODO: Force RESPOND TO LONGPOLLS?
346         pass
347
348 def newBlockNotificationSIGNAL(signum, frame):
349         # Use a new thread, in case the signal handler is called with locks held
350         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
351         thr.daemon = True
352         thr.start()
353
354 from signal import signal, SIGUSR1
355 signal(SIGUSR1, newBlockNotificationSIGNAL)
356
357
358 import os
359 import os.path
360 import pickle
361 import signal
362 import sys
363 from time import sleep
364 import traceback
365
366 SAVE_STATE_FILENAME = 'eloipool.worklog'
367
368 def stopServers():
369         logger = logging.getLogger('stopServers')
370         
371         if hasattr(stopServers, 'already'):
372                 logger.debug('Already tried to stop servers before')
373                 return
374         stopServers.already = True
375         
376         logger.info('Stopping servers...')
377         global bcnode, server
378         servers = (bcnode, server)
379         for s in servers:
380                 s.keepgoing = False
381         for s in servers:
382                 try:
383                         s.wakeup()
384                 except:
385                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
386         i = 0
387         while True:
388                 sl = []
389                 for s in servers:
390                         if s.running:
391                                 sl.append(s.__class__.__name__)
392                 if not sl:
393                         break
394                 i += 1
395                 if i >= 0x100:
396                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
397                         break
398                 sleep(0.01)
399         
400         for s in servers:
401                 for fd in s._fd.keys():
402                         os.close(fd)
403
404 def saveState(t = None):
405         logger = logging.getLogger('saveState')
406         
407         # Then, save data needed to resume work
408         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
409         i = 0
410         while True:
411                 try:
412                         with open(SAVE_STATE_FILENAME, 'wb') as f:
413                                 pickle.dump(t, f)
414                                 pickle.dump(DupeShareHACK, f)
415                                 pickle.dump(workLog, f)
416                         break
417                 except:
418                         i += 1
419                         if i >= 0x10000:
420                                 logger.error('Failed to save work\n' + traceback.format_exc())
421                                 try:
422                                         os.unlink(SAVE_STATE_FILENAME)
423                                 except:
424                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
425
426 def exit():
427         t = time()
428         stopServers()
429         saveState(t)
430         logging.getLogger('exit').info('Goodbye...')
431         os.kill(os.getpid(), signal.SIGTERM)
432         sys.exit(0)
433
434 def restart():
435         t = time()
436         stopServers()
437         saveState(t)
438         logging.getLogger('restart').info('Restarting...')
439         try:
440                 os.execv(sys.argv[0], sys.argv)
441         except:
442                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
443
444 def restoreState():
445         if not os.path.exists(SAVE_STATE_FILENAME):
446                 return
447         
448         global workLog, DupeShareHACK
449         
450         logger = logging.getLogger('restoreState')
451         s = os.stat(SAVE_STATE_FILENAME)
452         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
453         try:
454                 with open(SAVE_STATE_FILENAME, 'rb') as f:
455                         t = pickle.load(f)
456                         if type(t) == tuple:
457                                 if len(t) > 2:
458                                         # Future formats, not supported here
459                                         ver = t[3]
460                                         # TODO
461                                 
462                                 # Old format, from 2012-02-02 to 2012-02-03
463                                 workLog = t[0]
464                                 DupeShareHACK = t[1]
465                                 t = None
466                         else:
467                                 if isinstance(t, dict):
468                                         # Old format, from 2012-02-03 to 2012-02-03
469                                         DupeShareHACK = t
470                                         t = None
471                                 else:
472                                         # Current format, from 2012-02-03 onward
473                                         DupeShareHACK = pickle.load(f)
474                                 
475                                 if t + 120 >= time():
476                                         workLog = pickle.load(f)
477                                 else:
478                                         logger.debug('Skipping restore of expired workLog')
479         except:
480                 logger.error('Failed to restore state\n' + traceback.format_exc())
481                 return
482         logger.info('State restored successfully')
483         if t:
484                 logger.info('Total downtime: %g seconds' % (time() - t,))
485
486
487 from jsonrpcserver import JSONRPCListener, JSONRPCServer
488 import interactivemode
489 from networkserver import NetworkListener
490 import threading
491 import sharelogging
492 import imp
493
494 if __name__ == "__main__":
495         if not hasattr(config, 'ShareLogging'):
496                 config.ShareLogging = ()
497         if hasattr(config, 'DbOptions'):
498                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
499                 config.ShareLogging = list(config.ShareLogging)
500                 config.ShareLogging.append( {
501                         'type': 'sql',
502                         'engine': 'postgres',
503                         'dbopts': config.DbOptions,
504                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
505                 } )
506         for i in config.ShareLogging:
507                 if not hasattr(i, 'keys'):
508                         name, parameters = i
509                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
510                         if name == 'postgres':
511                                 name = 'sql'
512                                 i = {
513                                         'engine': 'postgres',
514                                         'dbopts': parameters,
515                                 }
516                         elif name == 'logfile':
517                                 i = {}
518                                 i['thropts'] = parameters
519                                 if 'filename' in parameters:
520                                         i['filename'] = parameters['filename']
521                                         i['thropts'] = dict(i['thropts'])
522                                         del i['thropts']['filename']
523                         else:
524                                 i = parameters
525                         i['type'] = name
526                 
527                 name = i['type']
528                 parameters = i
529                 try:
530                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
531                         m = imp.load_module(name, fp, pathname, description)
532                         lo = getattr(m, name)(**parameters)
533                         loggersShare.append(lo.logShare)
534                 except:
535                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
536
537         LSbc = []
538         if not hasattr(config, 'BitcoinNodeAddresses'):
539                 config.BitcoinNodeAddresses = ()
540         for a in config.BitcoinNodeAddresses:
541                 LSbc.append(NetworkListener(bcnode, a))
542         
543         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
544                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
545         
546         import jsonrpc_getmemorypool
547         import jsonrpc_getwork
548         import jsonrpc_setworkaux
549         
550         server = JSONRPCServer()
551         if hasattr(config, 'JSONRPCAddress'):
552                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
553                 if not hasattr(config, 'JSONRPCAddresses'):
554                         config.JSONRPCAddresses = []
555                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
556         LS = []
557         for a in config.JSONRPCAddresses:
558                 LS.append(JSONRPCListener(server, a))
559         if hasattr(config, 'SecretUser'):
560                 server.SecretUser = config.SecretUser
561         server.aux = MM.CoinbaseAux
562         server.getBlockHeader = getBlockHeader
563         server.getBlockTemplate = getBlockTemplate
564         server.receiveShare = receiveShare
565         server.RaiseRedFlags = RaiseRedFlags
566         server.ShareTarget = config.ShareTarget
567         
568         if hasattr(config, 'TrustedForwarders'):
569                 server.TrustedForwarders = config.TrustedForwarders
570         server.ServerName = config.ServerName
571         
572         MM.start()
573         
574         restoreState()
575         
576         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
577         bcnode_thr.daemon = True
578         bcnode_thr.start()
579         
580         server.serve_forever()