Merge branch 'bip23_submitcbonly'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from time import sleep, time
115 import traceback
116
117 def _WorkLogPruner_I(wl):
118         now = time()
119         pruned = 0
120         for username in wl:
121                 userwork = wl[username]
122                 for wli in tuple(userwork.keys()):
123                         if now > userwork[wli][1] + 120:
124                                 del userwork[wli]
125                                 pruned += 1
126         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
127
128 def WorkLogPruner(wl):
129         while True:
130                 try:
131                         sleep(60)
132                         _WorkLogPruner_I(wl)
133                 except:
134                         WorkLogPruner.logger.error(traceback.format_exc())
135 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
136
137
138 from merklemaker import merkleMaker
139 MM = merkleMaker()
140 MM.__dict__.update(config.__dict__)
141 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
142 MM.clearCoinbaseTxn.assemble()
143 MM.makeCoinbaseTxn = makeCoinbaseTxn
144 MM.onBlockChange = blockChanged
145 MM.onBlockUpdate = updateBlocks
146
147
148 from binascii import b2a_hex
149 from copy import deepcopy
150 from merklemaker import MakeBlockHeader
151 from struct import pack, unpack
152 import threading
153 from time import time
154 from util import RejectedShare, dblsha, hash2int, swap32
155 import jsonrpc
156 import traceback
157
158 gotwork = None
159 if hasattr(config, 'GotWorkURI'):
160         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
161
162 def submitGotwork(info):
163         try:
164                 gotwork.gotwork(info)
165         except:
166                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
167
168 def getBlockHeader(username):
169         MRD = MM.getMRD()
170         merkleRoot = MRD[0]
171         hdr = MakeBlockHeader(MRD)
172         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
173         return (hdr, workLog[username][merkleRoot])
174
175 def getBlockTemplate(username):
176         MC = MM.getMC()
177         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
178         wliPos = coinbase[0] + 2
179         wliLen = coinbase[wliPos - 1]
180         wli = coinbase[wliPos:wliPos+wliLen]
181         workLog.setdefault(username, {})[wli] = (MC, time())
182         return MC
183
184 loggersShare = []
185
186 RBDs = []
187 RBPs = []
188
189 from bitcoin.varlen import varlenEncode, varlenDecode
190 import bitcoin.txn
191 from merklemaker import assembleBlock
192
193 def blockSubmissionThread(payload, blkhash):
194         myblock = (blkhash, payload[4:36])
195         payload = b2a_hex(payload).decode('ascii')
196         nexterr = 0
197         while True:
198                 try:
199                         rv = UpstreamBitcoindJSONRPC.submitblock(payload)
200                         break
201                 except:
202                         try:
203                                 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
204                                 if rv is True:
205                                         rv = None
206                                 elif rv is False:
207                                         rv = 'rejected'
208                                 break
209                         except:
210                                 pass
211                         now = time()
212                         if now > nexterr:
213                                 # FIXME: This will show "Method not found" on pre-BIP22 servers
214                                 RaiseRedFlags(traceback.format_exc())
215                                 nexterr = now + 5
216                         if MM.currentBlock[0] not in myblock:
217                                 RaiseRedFlags('Giving up on submitting block upstream')
218                                 return
219         if rv:
220                 # FIXME: The returned value could be a list of multiple responses
221                 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
222
223 _STA = '%064x' % (config.ShareTarget,)
224 def checkShare(share):
225         shareTime = share['time'] = time()
226         
227         data = share['data']
228         data = data[:80]
229         (prevBlock, height, bits) = MM.currentBlock
230         sharePrevBlock = data[4:36]
231         if sharePrevBlock != prevBlock:
232                 if sharePrevBlock == MM.lastBlock[0]:
233                         raise RejectedShare('stale-prevblk')
234                 raise RejectedShare('bad-prevblk')
235         
236         # TODO: use userid
237         username = share['username']
238         if username not in workLog:
239                 raise RejectedShare('unknown-user')
240         
241         if data[72:76] != bits:
242                 raise RejectedShare('bad-diffbits')
243         
244         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
245         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
246         if data[1:4] != b'\0\0\0' or data[0] > 2:
247                 raise RejectedShare('bad-version')
248         
249         shareMerkleRoot = data[36:68]
250         if 'blkdata' in share:
251                 pl = share['blkdata']
252                 (txncount, pl) = varlenDecode(pl)
253                 cbtxn = bitcoin.txn.Txn(pl)
254                 othertxndata = cbtxn.disassemble(retExtra=True)
255                 coinbase = cbtxn.getCoinbase()
256                 wliPos = coinbase[0] + 2
257                 wliLen = coinbase[wliPos - 1]
258                 wli = coinbase[wliPos:wliPos+wliLen]
259                 mode = 'MC'
260                 moden = 1
261         else:
262                 wli = shareMerkleRoot
263                 mode = 'MRD'
264                 moden = 0
265         
266         MWL = workLog[username]
267         if wli not in MWL:
268                 raise RejectedShare('unknown-work')
269         (wld, issueT) = MWL[wli]
270         share[mode] = wld
271         
272         if data in DupeShareHACK:
273                 raise RejectedShare('duplicate')
274         DupeShareHACK[data] = None
275         
276         blkhash = dblsha(data)
277         if blkhash[28:] != b'\0\0\0\0':
278                 raise RejectedShare('H-not-zero')
279         blkhashn = hash2int(blkhash)
280         
281         global networkTarget
282         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
283         logfunc('BLKHASH: %64x' % (blkhashn,))
284         logfunc(' TARGET: %64x' % (networkTarget,))
285         
286         workMerkleTree = wld[1]
287         workCoinbase = wld[2]
288         
289         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
290         txlist = workMerkleTree.data
291         txlist = [deepcopy(txlist[0]),] + txlist[1:]
292         cbtxn = txlist[0]
293         cbtxn.setCoinbase(workCoinbase)
294         cbtxn.assemble()
295         
296         if blkhashn <= networkTarget:
297                 logfunc("Submitting upstream")
298                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
299                 if not moden:
300                         payload = assembleBlock(data, txlist)
301                 else:
302                         payload = share['data'] + share['blkdata']
303                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
304                 RBPs.append(payload)
305                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
306                 bcnode.submitBlock(payload)
307                 share['upstreamResult'] = True
308                 MM.updateBlock(blkhash)
309         
310         # Gotwork hack...
311         if gotwork and blkhashn <= config.GotWorkTarget:
312                 try:
313                         coinbaseMrkl = cbtxn.data
314                         coinbaseMrkl += blkhash
315                         steps = workMerkleTree._steps
316                         coinbaseMrkl += pack('B', len(steps))
317                         for step in steps:
318                                 coinbaseMrkl += step
319                         coinbaseMrkl += b"\0\0\0\0"
320                         info = {}
321                         info['hash'] = b2a_hex(blkhash).decode('ascii')
322                         info['header'] = b2a_hex(data).decode('ascii')
323                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
324                         thr = threading.Thread(target=submitGotwork, args=(info,))
325                         thr.daemon = True
326                         thr.start()
327                 except:
328                         checkShare.logger.warning('Failed to build gotwork request')
329         
330         if blkhashn > config.ShareTarget:
331                 raise RejectedShare('high-hash')
332         share['target'] = config.ShareTarget
333         share['_targethex'] = _STA
334         
335         shareTimestamp = unpack('<L', data[68:72])[0]
336         if shareTime < issueT - 120:
337                 raise RejectedShare('stale-work')
338         if shareTimestamp < shareTime - 300:
339                 raise RejectedShare('time-too-old')
340         if shareTimestamp > shareTime + 7200:
341                 raise RejectedShare('time-too-new')
342         
343         if moden:
344                 cbpre = cbtxn.getCoinbase()
345                 cbpreLen = len(cbpre)
346                 if coinbase[:cbpreLen] != cbpre:
347                         raise RejectedShare('bad-cb-prefix')
348                 
349                 # Filter out known "I support" flags, to prevent exploits
350                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
351                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
352                                 raise RejectedShare('bad-cb-flag')
353                 
354                 if len(coinbase) > 100:
355                         raise RejectedShare('bad-cb-length')
356                 
357                 cbtxn.setCoinbase(coinbase)
358                 cbtxn.assemble()
359                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
360                         raise RejectedShare('bad-txnmrklroot')
361                 
362                 if len(othertxndata):
363                         allowed = assembleBlock(data, txlist)[80:]
364                         if allowed != share['blkdata']:
365                                 raise RejectedShare('bad-txns')
366 checkShare.logger = logging.getLogger('checkShare')
367
368 def receiveShare(share):
369         # TODO: username => userid
370         try:
371                 checkShare(share)
372         except RejectedShare as rej:
373                 share['rejectReason'] = str(rej)
374                 raise
375         finally:
376                 if '_origdata' in share:
377                         share['solution'] = share['_origdata']
378                 else:
379                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
380                 for i in loggersShare:
381                         i(share)
382
383 def newBlockNotification():
384         logging.getLogger('newBlockNotification').info('Received new block notification')
385         MM.updateMerkleTree()
386         # TODO: Force RESPOND TO LONGPOLLS?
387         pass
388
389 def newBlockNotificationSIGNAL(signum, frame):
390         # Use a new thread, in case the signal handler is called with locks held
391         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
392         thr.daemon = True
393         thr.start()
394
395 from signal import signal, SIGUSR1
396 signal(SIGUSR1, newBlockNotificationSIGNAL)
397
398
399 import os
400 import os.path
401 import pickle
402 import signal
403 import sys
404 from time import sleep
405 import traceback
406
407 SAVE_STATE_FILENAME = 'eloipool.worklog'
408
409 def stopServers():
410         logger = logging.getLogger('stopServers')
411         
412         if hasattr(stopServers, 'already'):
413                 logger.debug('Already tried to stop servers before')
414                 return
415         stopServers.already = True
416         
417         logger.info('Stopping servers...')
418         global bcnode, server
419         servers = (bcnode, server)
420         for s in servers:
421                 s.keepgoing = False
422         for s in servers:
423                 try:
424                         s.wakeup()
425                 except:
426                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
427         i = 0
428         while True:
429                 sl = []
430                 for s in servers:
431                         if s.running:
432                                 sl.append(s.__class__.__name__)
433                 if not sl:
434                         break
435                 i += 1
436                 if i >= 0x100:
437                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
438                         break
439                 sleep(0.01)
440         
441         for s in servers:
442                 for fd in s._fd.keys():
443                         os.close(fd)
444
445 def saveState(t = None):
446         logger = logging.getLogger('saveState')
447         
448         # Then, save data needed to resume work
449         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
450         i = 0
451         while True:
452                 try:
453                         with open(SAVE_STATE_FILENAME, 'wb') as f:
454                                 pickle.dump(t, f)
455                                 pickle.dump(DupeShareHACK, f)
456                                 pickle.dump(workLog, f)
457                         break
458                 except:
459                         i += 1
460                         if i >= 0x10000:
461                                 logger.error('Failed to save work\n' + traceback.format_exc())
462                                 try:
463                                         os.unlink(SAVE_STATE_FILENAME)
464                                 except:
465                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
466
467 def exit():
468         t = time()
469         stopServers()
470         saveState(t)
471         logging.getLogger('exit').info('Goodbye...')
472         os.kill(os.getpid(), signal.SIGTERM)
473         sys.exit(0)
474
475 def restart():
476         t = time()
477         stopServers()
478         saveState(t)
479         logging.getLogger('restart').info('Restarting...')
480         try:
481                 os.execv(sys.argv[0], sys.argv)
482         except:
483                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
484
485 def restoreState():
486         if not os.path.exists(SAVE_STATE_FILENAME):
487                 return
488         
489         global workLog, DupeShareHACK
490         
491         logger = logging.getLogger('restoreState')
492         s = os.stat(SAVE_STATE_FILENAME)
493         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
494         try:
495                 with open(SAVE_STATE_FILENAME, 'rb') as f:
496                         t = pickle.load(f)
497                         if type(t) == tuple:
498                                 if len(t) > 2:
499                                         # Future formats, not supported here
500                                         ver = t[3]
501                                         # TODO
502                                 
503                                 # Old format, from 2012-02-02 to 2012-02-03
504                                 workLog = t[0]
505                                 DupeShareHACK = t[1]
506                                 t = None
507                         else:
508                                 if isinstance(t, dict):
509                                         # Old format, from 2012-02-03 to 2012-02-03
510                                         DupeShareHACK = t
511                                         t = None
512                                 else:
513                                         # Current format, from 2012-02-03 onward
514                                         DupeShareHACK = pickle.load(f)
515                                 
516                                 if t + 120 >= time():
517                                         workLog = pickle.load(f)
518                                 else:
519                                         logger.debug('Skipping restore of expired workLog')
520         except:
521                 logger.error('Failed to restore state\n' + traceback.format_exc())
522                 return
523         logger.info('State restored successfully')
524         if t:
525                 logger.info('Total downtime: %g seconds' % (time() - t,))
526
527
528 from jsonrpcserver import JSONRPCListener, JSONRPCServer
529 import interactivemode
530 from networkserver import NetworkListener
531 import threading
532 import sharelogging
533 import imp
534
535 if __name__ == "__main__":
536         if not hasattr(config, 'ShareLogging'):
537                 config.ShareLogging = ()
538         if hasattr(config, 'DbOptions'):
539                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
540                 config.ShareLogging = list(config.ShareLogging)
541                 config.ShareLogging.append( {
542                         'type': 'sql',
543                         'engine': 'postgres',
544                         'dbopts': config.DbOptions,
545                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
546                 } )
547         for i in config.ShareLogging:
548                 if not hasattr(i, 'keys'):
549                         name, parameters = i
550                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
551                         if name == 'postgres':
552                                 name = 'sql'
553                                 i = {
554                                         'engine': 'postgres',
555                                         'dbopts': parameters,
556                                 }
557                         elif name == 'logfile':
558                                 i = {}
559                                 i['thropts'] = parameters
560                                 if 'filename' in parameters:
561                                         i['filename'] = parameters['filename']
562                                         i['thropts'] = dict(i['thropts'])
563                                         del i['thropts']['filename']
564                         else:
565                                 i = parameters
566                         i['type'] = name
567                 
568                 name = i['type']
569                 parameters = i
570                 try:
571                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
572                         m = imp.load_module(name, fp, pathname, description)
573                         lo = getattr(m, name)(**parameters)
574                         loggersShare.append(lo.logShare)
575                 except:
576                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
577
578         LSbc = []
579         if not hasattr(config, 'BitcoinNodeAddresses'):
580                 config.BitcoinNodeAddresses = ()
581         for a in config.BitcoinNodeAddresses:
582                 LSbc.append(NetworkListener(bcnode, a))
583         
584         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
585                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
586         
587         import jsonrpc_getblocktemplate
588         import jsonrpc_getwork
589         import jsonrpc_setworkaux
590         
591         server = JSONRPCServer()
592         if hasattr(config, 'JSONRPCAddress'):
593                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
594                 if not hasattr(config, 'JSONRPCAddresses'):
595                         config.JSONRPCAddresses = []
596                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
597         LS = []
598         for a in config.JSONRPCAddresses:
599                 LS.append(JSONRPCListener(server, a))
600         if hasattr(config, 'SecretUser'):
601                 server.SecretUser = config.SecretUser
602         server.aux = MM.CoinbaseAux
603         server.getBlockHeader = getBlockHeader
604         server.getBlockTemplate = getBlockTemplate
605         server.receiveShare = receiveShare
606         server.RaiseRedFlags = RaiseRedFlags
607         server.ShareTarget = config.ShareTarget
608         
609         if hasattr(config, 'TrustedForwarders'):
610                 server.TrustedForwarders = config.TrustedForwarders
611         server.ServerName = config.ServerName
612         
613         MM.start()
614         
615         restoreState()
616         
617         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
618         prune_thr.daemon = True
619         prune_thr.start()
620         
621         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
622         bcnode_thr.daemon = True
623         bcnode_thr.start()
624         
625         server.serve_forever()