Merge branch 'bugfix_hicb_getMC'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliPos = coinbase[0] + 2
155         wliLen = coinbase[wliPos - 1]
156         wli = coinbase[wliPos:wliPos+wliLen]
157         workLog.setdefault(username, {})[wli] = (MC, time())
158         return MC
159
160 loggersShare = []
161
162 RBDs = []
163 RBPs = []
164
165 from bitcoin.varlen import varlenEncode, varlenDecode
166 import bitcoin.txn
167 def assembleBlock(blkhdr, txlist):
168         payload = blkhdr
169         payload += varlenEncode(len(txlist))
170         for tx in txlist:
171                 payload += tx.data
172         return payload
173
174 def blockSubmissionThread(payload, blkhash):
175         myblock = (blkhash, payload[4:36])
176         payload = b2a_hex(payload).decode('ascii')
177         nexterr = 0
178         while True:
179                 try:
180                         rv = UpstreamBitcoindJSONRPC.submitblock(payload)
181                         break
182                 except:
183                         try:
184                                 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
185                                 if rv is True:
186                                         rv = None
187                                 elif rv is False:
188                                         rv = 'rejected'
189                                 break
190                         except:
191                                 pass
192                         now = time()
193                         if now > nexterr:
194                                 # FIXME: This will show "Method not found" on pre-BIP22 servers
195                                 RaiseRedFlags(traceback.format_exc())
196                                 nexterr = now + 5
197                         if MM.currentBlock[0] not in myblock:
198                                 RaiseRedFlags('Giving up on submitting block upstream')
199                                 return
200         if rv:
201                 # FIXME: The returned value could be a list of multiple responses
202                 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
203
204 _STA = '%064x' % (config.ShareTarget,)
205 def checkShare(share):
206         shareTime = share['time'] = time()
207         
208         data = share['data']
209         data = data[:80]
210         (prevBlock, height, bits) = MM.currentBlock
211         sharePrevBlock = data[4:36]
212         if sharePrevBlock != prevBlock:
213                 if sharePrevBlock == MM.lastBlock[0]:
214                         raise RejectedShare('stale-prevblk')
215                 raise RejectedShare('bad-prevblk')
216         
217         # TODO: use userid
218         username = share['username']
219         if username not in workLog:
220                 raise RejectedShare('unknown-user')
221         
222         if data[72:76] != bits:
223                 raise RejectedShare('bad-diffbits')
224         
225         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
226         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
227         if data[1:4] != b'\0\0\0' or data[0] > 2:
228                 raise RejectedShare('bad-version')
229         
230         shareMerkleRoot = data[36:68]
231         if 'blkdata' in share:
232                 pl = share['blkdata']
233                 (txncount, pl) = varlenDecode(pl)
234                 cbtxn = bitcoin.txn.Txn(pl)
235                 cbtxn.disassemble(retExtra=True)
236                 coinbase = cbtxn.getCoinbase()
237                 wliPos = coinbase[0] + 2
238                 wliLen = coinbase[wliPos - 1]
239                 wli = coinbase[wliPos:wliPos+wliLen]
240                 mode = 'MC'
241                 moden = 1
242         else:
243                 wli = shareMerkleRoot
244                 mode = 'MRD'
245                 moden = 0
246         
247         MWL = workLog[username]
248         if wli not in MWL:
249                 raise RejectedShare('unknown-work')
250         (wld, issueT) = MWL[wli]
251         share[mode] = wld
252         
253         if data in DupeShareHACK:
254                 raise RejectedShare('duplicate')
255         DupeShareHACK[data] = None
256         
257         blkhash = dblsha(data)
258         if blkhash[28:] != b'\0\0\0\0':
259                 raise RejectedShare('H-not-zero')
260         blkhashn = hash2int(blkhash)
261         
262         global networkTarget
263         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
264         logfunc('BLKHASH: %64x' % (blkhashn,))
265         logfunc(' TARGET: %64x' % (networkTarget,))
266         
267         workMerkleTree = wld[1]
268         workCoinbase = wld[2]
269         
270         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
271         txlist = workMerkleTree.data
272         txlist = [deepcopy(txlist[0]),] + txlist[1:]
273         cbtxn = txlist[0]
274         cbtxn.setCoinbase(workCoinbase)
275         cbtxn.assemble()
276         
277         if blkhashn <= networkTarget:
278                 logfunc("Submitting upstream")
279                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
280                 if not moden:
281                         payload = assembleBlock(data, txlist)
282                 else:
283                         payload = share['data'] + share['blkdata']
284                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
285                 RBPs.append(payload)
286                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
287                 bcnode.submitBlock(payload)
288                 share['upstreamResult'] = True
289                 MM.updateBlock(blkhash)
290         
291         # Gotwork hack...
292         if gotwork and blkhashn <= config.GotWorkTarget:
293                 try:
294                         coinbaseMrkl = cbtxn.data
295                         coinbaseMrkl += blkhash
296                         steps = workMerkleTree._steps
297                         coinbaseMrkl += pack('B', len(steps))
298                         for step in steps:
299                                 coinbaseMrkl += step
300                         coinbaseMrkl += b"\0\0\0\0"
301                         info = {}
302                         info['hash'] = b2a_hex(blkhash).decode('ascii')
303                         info['header'] = b2a_hex(data).decode('ascii')
304                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
305                         thr = threading.Thread(target=submitGotwork, args=(info,))
306                         thr.daemon = True
307                         thr.start()
308                 except:
309                         checkShare.logger.warning('Failed to build gotwork request')
310         
311         if blkhashn > config.ShareTarget:
312                 raise RejectedShare('high-hash')
313         share['target'] = config.ShareTarget
314         share['_targethex'] = _STA
315         
316         shareTimestamp = unpack('<L', data[68:72])[0]
317         if shareTime < issueT - 120:
318                 raise RejectedShare('stale-work')
319         if shareTimestamp < shareTime - 300:
320                 raise RejectedShare('time-too-old')
321         if shareTimestamp > shareTime + 7200:
322                 raise RejectedShare('time-too-new')
323         
324         if moden:
325                 cbpre = cbtxn.getCoinbase()
326                 cbpreLen = len(cbpre)
327                 if coinbase[:cbpreLen] != cbpre:
328                         raise RejectedShare('bad-cb-prefix')
329                 
330                 # Filter out known "I support" flags, to prevent exploits
331                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
332                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
333                                 raise RejectedShare('bad-cb-flag')
334                 
335                 if len(coinbase) > 100:
336                         raise RejectedShare('bad-cb-length')
337                 
338                 cbtxn.setCoinbase(coinbase)
339                 cbtxn.assemble()
340                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
341                         raise RejectedShare('bad-txnmrklroot')
342                 
343                 allowed = assembleBlock(data, txlist)
344                 if allowed != share['data'] + share['blkdata']:
345                         raise RejectedShare('bad-txns')
346 checkShare.logger = logging.getLogger('checkShare')
347
348 def receiveShare(share):
349         # TODO: username => userid
350         try:
351                 checkShare(share)
352         except RejectedShare as rej:
353                 share['rejectReason'] = str(rej)
354                 raise
355         finally:
356                 if '_origdata' in share:
357                         share['solution'] = share['_origdata']
358                 else:
359                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
360                 for i in loggersShare:
361                         i(share)
362
363 def newBlockNotification():
364         logging.getLogger('newBlockNotification').info('Received new block notification')
365         MM.updateMerkleTree()
366         # TODO: Force RESPOND TO LONGPOLLS?
367         pass
368
369 def newBlockNotificationSIGNAL(signum, frame):
370         # Use a new thread, in case the signal handler is called with locks held
371         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
372         thr.daemon = True
373         thr.start()
374
375 from signal import signal, SIGUSR1
376 signal(SIGUSR1, newBlockNotificationSIGNAL)
377
378
379 import os
380 import os.path
381 import pickle
382 import signal
383 import sys
384 from time import sleep
385 import traceback
386
387 SAVE_STATE_FILENAME = 'eloipool.worklog'
388
389 def stopServers():
390         logger = logging.getLogger('stopServers')
391         
392         if hasattr(stopServers, 'already'):
393                 logger.debug('Already tried to stop servers before')
394                 return
395         stopServers.already = True
396         
397         logger.info('Stopping servers...')
398         global bcnode, server
399         servers = (bcnode, server)
400         for s in servers:
401                 s.keepgoing = False
402         for s in servers:
403                 try:
404                         s.wakeup()
405                 except:
406                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
407         i = 0
408         while True:
409                 sl = []
410                 for s in servers:
411                         if s.running:
412                                 sl.append(s.__class__.__name__)
413                 if not sl:
414                         break
415                 i += 1
416                 if i >= 0x100:
417                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
418                         break
419                 sleep(0.01)
420         
421         for s in servers:
422                 for fd in s._fd.keys():
423                         os.close(fd)
424
425 def saveState(t = None):
426         logger = logging.getLogger('saveState')
427         
428         # Then, save data needed to resume work
429         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
430         i = 0
431         while True:
432                 try:
433                         with open(SAVE_STATE_FILENAME, 'wb') as f:
434                                 pickle.dump(t, f)
435                                 pickle.dump(DupeShareHACK, f)
436                                 pickle.dump(workLog, f)
437                         break
438                 except:
439                         i += 1
440                         if i >= 0x10000:
441                                 logger.error('Failed to save work\n' + traceback.format_exc())
442                                 try:
443                                         os.unlink(SAVE_STATE_FILENAME)
444                                 except:
445                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
446
447 def exit():
448         t = time()
449         stopServers()
450         saveState(t)
451         logging.getLogger('exit').info('Goodbye...')
452         os.kill(os.getpid(), signal.SIGTERM)
453         sys.exit(0)
454
455 def restart():
456         t = time()
457         stopServers()
458         saveState(t)
459         logging.getLogger('restart').info('Restarting...')
460         try:
461                 os.execv(sys.argv[0], sys.argv)
462         except:
463                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
464
465 def restoreState():
466         if not os.path.exists(SAVE_STATE_FILENAME):
467                 return
468         
469         global workLog, DupeShareHACK
470         
471         logger = logging.getLogger('restoreState')
472         s = os.stat(SAVE_STATE_FILENAME)
473         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
474         try:
475                 with open(SAVE_STATE_FILENAME, 'rb') as f:
476                         t = pickle.load(f)
477                         if type(t) == tuple:
478                                 if len(t) > 2:
479                                         # Future formats, not supported here
480                                         ver = t[3]
481                                         # TODO
482                                 
483                                 # Old format, from 2012-02-02 to 2012-02-03
484                                 workLog = t[0]
485                                 DupeShareHACK = t[1]
486                                 t = None
487                         else:
488                                 if isinstance(t, dict):
489                                         # Old format, from 2012-02-03 to 2012-02-03
490                                         DupeShareHACK = t
491                                         t = None
492                                 else:
493                                         # Current format, from 2012-02-03 onward
494                                         DupeShareHACK = pickle.load(f)
495                                 
496                                 if t + 120 >= time():
497                                         workLog = pickle.load(f)
498                                 else:
499                                         logger.debug('Skipping restore of expired workLog')
500         except:
501                 logger.error('Failed to restore state\n' + traceback.format_exc())
502                 return
503         logger.info('State restored successfully')
504         if t:
505                 logger.info('Total downtime: %g seconds' % (time() - t,))
506
507
508 from jsonrpcserver import JSONRPCListener, JSONRPCServer
509 import interactivemode
510 from networkserver import NetworkListener
511 import threading
512 import sharelogging
513 import imp
514
515 if __name__ == "__main__":
516         if not hasattr(config, 'ShareLogging'):
517                 config.ShareLogging = ()
518         if hasattr(config, 'DbOptions'):
519                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
520                 config.ShareLogging = list(config.ShareLogging)
521                 config.ShareLogging.append( {
522                         'type': 'sql',
523                         'engine': 'postgres',
524                         'dbopts': config.DbOptions,
525                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
526                 } )
527         for i in config.ShareLogging:
528                 if not hasattr(i, 'keys'):
529                         name, parameters = i
530                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
531                         if name == 'postgres':
532                                 name = 'sql'
533                                 i = {
534                                         'engine': 'postgres',
535                                         'dbopts': parameters,
536                                 }
537                         elif name == 'logfile':
538                                 i = {}
539                                 i['thropts'] = parameters
540                                 if 'filename' in parameters:
541                                         i['filename'] = parameters['filename']
542                                         i['thropts'] = dict(i['thropts'])
543                                         del i['thropts']['filename']
544                         else:
545                                 i = parameters
546                         i['type'] = name
547                 
548                 name = i['type']
549                 parameters = i
550                 try:
551                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
552                         m = imp.load_module(name, fp, pathname, description)
553                         lo = getattr(m, name)(**parameters)
554                         loggersShare.append(lo.logShare)
555                 except:
556                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
557
558         LSbc = []
559         if not hasattr(config, 'BitcoinNodeAddresses'):
560                 config.BitcoinNodeAddresses = ()
561         for a in config.BitcoinNodeAddresses:
562                 LSbc.append(NetworkListener(bcnode, a))
563         
564         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
565                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
566         
567         import jsonrpc_getblocktemplate
568         import jsonrpc_getwork
569         import jsonrpc_setworkaux
570         
571         server = JSONRPCServer()
572         if hasattr(config, 'JSONRPCAddress'):
573                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
574                 if not hasattr(config, 'JSONRPCAddresses'):
575                         config.JSONRPCAddresses = []
576                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
577         LS = []
578         for a in config.JSONRPCAddresses:
579                 LS.append(JSONRPCListener(server, a))
580         if hasattr(config, 'SecretUser'):
581                 server.SecretUser = config.SecretUser
582         server.aux = MM.CoinbaseAux
583         server.getBlockHeader = getBlockHeader
584         server.getBlockTemplate = getBlockTemplate
585         server.receiveShare = receiveShare
586         server.RaiseRedFlags = RaiseRedFlags
587         server.ShareTarget = config.ShareTarget
588         
589         if hasattr(config, 'TrustedForwarders'):
590                 server.TrustedForwarders = config.TrustedForwarders
591         server.ServerName = config.ServerName
592         
593         MM.start()
594         
595         restoreState()
596         
597         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
598         bcnode_thr.daemon = True
599         bcnode_thr.start()
600         
601         server.serve_forever()