New DelayLogForUpstream configuration option, and try submitblock(data) before getmem...
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         if server:
96                 server.wakeLongpoll()
97
98 def blockChanged():
99         global DupeShareHACK
100         DupeShareHACK = {}
101         jsonrpc_getwork._CheckForDupesHACK = {}
102         global MM, networkTarget, server
103         networkTarget = Bits2Target(MM.currentBlock[1])
104         workLog.clear()
105         updateBlocks()
106
107
108 from merklemaker import merkleMaker
109 MM = merkleMaker()
110 MM.__dict__.update(config.__dict__)
111 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
112 MM.clearCoinbaseTxn.assemble()
113 MM.makeCoinbaseTxn = makeCoinbaseTxn
114 MM.onBlockChange = blockChanged
115 MM.onBlockUpdate = updateBlocks
116 MM.start()
117
118
119 from binascii import b2a_hex
120 from copy import deepcopy
121 from struct import pack, unpack
122 from time import time
123 from util import PendingUpstream, RejectedShare, dblsha, LEhash2int, swap32
124 import jsonrpc
125 import threading
126 import traceback
127
128 gotwork = None
129 if hasattr(config, 'GotWorkURI'):
130         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
131
132 if not hasattr(config, 'DelayLogForUpstream'):
133         config.DelayLogForUpstream = False
134
135 def submitGotwork(info):
136         try:
137                 gotwork.gotwork(info)
138         except:
139                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
140
141 def getBlockHeader(username):
142         MRD = MM.getMRD()
143         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
144         timestamp = pack('<L', int(time()))
145         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
146         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
147         return (hdr, workLog[username][merkleRoot])
148
149 def getBlockTemplate(username):
150         MC = MM.getMC()
151         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
152         wliLen = coinbase[0]
153         wli = coinbase[1:wliLen+1]
154         workLog.setdefault(username, {})[wli] = (MC, time())
155         return MC
156
157 loggersShare = []
158
159 RBDs = []
160 RBPs = []
161
162 from bitcoin.varlen import varlenEncode, varlenDecode
163 import bitcoin.txn
164 def assembleBlock(blkhdr, txlist):
165         payload = blkhdr
166         payload += varlenEncode(len(txlist))
167         for tx in txlist:
168                 payload += tx.data
169         return payload
170
171 def blockSubmissionThread(payload, share):
172         payload = b2a_hex(payload).decode('ascii')
173         while True:
174                 try:
175                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
176                         break
177                 except:
178                         pass
179                 try:
180                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
181                         reason = None if reason else 'rejected'
182                         break
183                 except:
184                         pass
185         if reason:
186                 blockSubmissionThread.logger.warning('Upstream block submission failed: %s' % (reason,))
187         if share['upstreamRejectReason'] is PendingUpstream:
188                 share['upstreamRejectReason'] = reason
189                 share['upstreamResult'] = not reason
190                 logShare(share)
191 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
192
193 def checkShare(share):
194         shareTime = share['time'] = time()
195         
196         data = share['data']
197         data = data[:80]
198         (prevBlock, bits) = MM.currentBlock
199         sharePrevBlock = data[4:36]
200         if sharePrevBlock != prevBlock:
201                 if sharePrevBlock == MM.lastBlock[0]:
202                         raise RejectedShare('stale-prevblk')
203                 raise RejectedShare('bad-prevblk')
204         
205         # TODO: use userid
206         username = share['username']
207         if username not in workLog:
208                 raise RejectedShare('unknown-user')
209         
210         if data[72:76] != bits:
211                 raise RejectedShare('bad-diffbits')
212         if data[:4] != b'\1\0\0\0':
213                 raise RejectedShare('bad-version')
214         
215         shareMerkleRoot = data[36:68]
216         if 'blkdata' in share:
217                 pl = share['blkdata']
218                 (txncount, pl) = varlenDecode(pl)
219                 cbtxn = bitcoin.txn.Txn(pl)
220                 cbtxn.disassemble(retExtra=True)
221                 coinbase = cbtxn.getCoinbase()
222                 wliLen = coinbase[0]
223                 wli = coinbase[1:wliLen+1]
224                 mode = 'MC'
225                 moden = 1
226         else:
227                 wli = shareMerkleRoot
228                 mode = 'MRD'
229                 moden = 0
230         
231         MWL = workLog[username]
232         if wli not in MWL:
233                 raise RejectedShare('unknown-work')
234         (wld, issueT) = MWL[wli]
235         share[mode] = wld
236         
237         if data in DupeShareHACK:
238                 raise RejectedShare('duplicate')
239         DupeShareHACK[data] = None
240         
241         blkhash = dblsha(data)
242         if blkhash[28:] != b'\0\0\0\0':
243                 raise RejectedShare('H-not-zero')
244         blkhashn = LEhash2int(blkhash)
245         
246         workMerkleTree = wld[1]
247         workCoinbase = wld[2]
248         
249         global networkTarget
250         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
251         if workMerkleTree.upstreamTarget != networkTarget:
252                 logfunc('pTARGET: %64x' % (workMerkleTree.upstreamTarget,))
253         logfunc('BLKHASH: %64x' % (blkhashn,))
254         logfunc(' TARGET: %64x' % (networkTarget,))
255         
256         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
257         txlist = workMerkleTree.data
258         txlist = [deepcopy(txlist[0]),] + txlist[1:]
259         cbtxn = txlist[0]
260         cbtxn.setCoinbase(workCoinbase)
261         cbtxn.assemble()
262         
263         isBlock = blkhashn <= networkTarget
264         if isBlock or blkhashn <= workMerkleTree.upstreamTarget:
265                 logfunc("Submitting upstream")
266                 if not moden:
267                         RBDs.append( deepcopy( (data, txlist) ) )
268                         payload = assembleBlock(data, txlist)
269                 else:
270                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
271                         payload = share['data'] + share['blkdata']
272                 if isBlock:
273                         logfunc('Real block payload: %s' % (payload,))
274                         RBPs.append(payload)
275                 if config.DelayLogForUpstream:
276                         share['upstreamRejectReason'] = PendingUpstream
277                 else:
278                         share['upstreamRejectReason'] = None
279                         share['upstreamResult'] = True
280                 threading.Thread(target=blockSubmissionThread, args=(payload, share)).start()
281                 if isBlock:
282                         bcnode.submitBlock(payload)
283                         MM.updateBlock(blkhash)
284         
285         # Gotwork hack...
286         if gotwork and blkhashn <= config.GotWorkTarget:
287                 try:
288                         coinbaseMrkl = cbtxn.data
289                         coinbaseMrkl += blkhash
290                         steps = workMerkleTree._steps
291                         coinbaseMrkl += pack('B', len(steps))
292                         for step in steps:
293                                 coinbaseMrkl += step
294                         coinbaseMrkl += b"\0\0\0\0"
295                         info = {}
296                         info['hash'] = b2a_hex(blkhash).decode('ascii')
297                         info['header'] = b2a_hex(data).decode('ascii')
298                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
299                         thr = threading.Thread(target=submitGotwork, args=(info,))
300                         thr.daemon = True
301                         thr.start()
302                 except:
303                         checkShare.logger.warning('Failed to build gotwork request')
304         
305         shareTimestamp = unpack('<L', data[68:72])[0]
306         if shareTime < issueT - 120:
307                 raise RejectedShare('stale-work')
308         if shareTimestamp < shareTime - 300:
309                 raise RejectedShare('time-too-old')
310         if shareTimestamp > shareTime + 7200:
311                 raise RejectedShare('time-too-new')
312         
313         if moden:
314                 cbpre = cbtxn.getCoinbase()
315                 cbpreLen = len(cbpre)
316                 if coinbase[:cbpreLen] != cbpre:
317                         raise RejectedShare('bad-cb-prefix')
318                 
319                 # Filter out known "I support" flags, to prevent exploits
320                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
321                         if coinbase.find(ff) > cbpreLen - len(ff):
322                                 raise RejectedShare('bad-cb-flag')
323                 
324                 if len(coinbase) > 100:
325                         raise RejectedShare('bad-cb-length')
326                 
327                 cbtxn.setCoinbase(coinbase)
328                 cbtxn.assemble()
329                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
330                         raise RejectedShare('bad-txnmrklroot')
331                 
332                 allowed = assembleBlock(data, txlist)
333                 if allowed != share['data'] + share['blkdata']:
334                         raise RejectedShare('bad-txns')
335 checkShare.logger = logging.getLogger('checkShare')
336
337 def logShare(share):
338         if '_origdata' in share:
339                 share['solution'] = share['_origdata']
340         else:
341                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
342         for i in loggersShare:
343                 i.logShare(share)
344
345 def receiveShare(share):
346         # TODO: username => userid
347         try:
348                 checkShare(share)
349         except RejectedShare as rej:
350                 share['rejectReason'] = str(rej)
351                 raise
352         finally:
353                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
354                         logShare(share)
355
356 def newBlockNotification(signum, frame):
357         logging.getLogger('newBlockNotification').info('Received new block notification')
358         MM.updateMerkleTree()
359         # TODO: Force RESPOND TO LONGPOLLS?
360         pass
361
362 from signal import signal, SIGUSR1
363 signal(SIGUSR1, newBlockNotification)
364
365
366 import os
367 import os.path
368 import pickle
369 import signal
370 import sys
371 from time import sleep
372 import traceback
373
374 SAVE_STATE_FILENAME = 'eloipool.worklog'
375
376 def stopServers():
377         logger = logging.getLogger('stopServers')
378         
379         logger.info('Stopping servers...')
380         global bcnode, server
381         servers = (bcnode, server)
382         for s in servers:
383                 s.keepgoing = False
384         for s in servers:
385                 s.wakeup()
386         i = 0
387         while True:
388                 sl = []
389                 for s in servers:
390                         if s.running:
391                                 sl.append(s.__class__.__name__)
392                 if not sl:
393                         break
394                 i += 1
395                 if i >= 0x100:
396                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
397                         break
398                 sleep(0.01)
399         
400         for s in servers:
401                 for fd in s._fd.keys():
402                         os.close(fd)
403
404 def stopLoggers():
405         for i in loggersShare:
406                 if hasattr(i, 'stop'):
407                         i.stop()
408
409 def saveState(t = None):
410         logger = logging.getLogger('saveState')
411         
412         # Then, save data needed to resume work
413         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
414         i = 0
415         while True:
416                 try:
417                         with open(SAVE_STATE_FILENAME, 'wb') as f:
418                                 pickle.dump(t, f)
419                                 pickle.dump(DupeShareHACK, f)
420                                 pickle.dump(workLog, f)
421                         break
422                 except:
423                         i += 1
424                         if i >= 0x10000:
425                                 logger.error('Failed to save work\n' + traceback.format_exc())
426                                 try:
427                                         os.unlink(SAVE_STATE_FILENAME)
428                                 except:
429                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
430
431 def exit():
432         t = time()
433         stopServers()
434         stopLoggers()
435         saveState(t)
436         logging.getLogger('exit').info('Goodbye...')
437         os.kill(os.getpid(), signal.SIGTERM)
438         sys.exit(0)
439
440 def restart():
441         t = time()
442         stopServers()
443         stopLoggers()
444         saveState(t)
445         logging.getLogger('restart').info('Restarting...')
446         try:
447                 os.execv(sys.argv[0], sys.argv)
448         except:
449                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
450
451 def restoreState():
452         if not os.path.exists(SAVE_STATE_FILENAME):
453                 return
454         
455         global workLog, DupeShareHACK
456         
457         logger = logging.getLogger('restoreState')
458         s = os.stat(SAVE_STATE_FILENAME)
459         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
460         try:
461                 with open(SAVE_STATE_FILENAME, 'rb') as f:
462                         t = pickle.load(f)
463                         if type(t) == tuple:
464                                 workLog = t[0]
465                                 DupeShareHACK = t[1]
466                                 t = None
467                         else:
468                                 if isinstance(t, dict):
469                                         DupeShareHACK = t
470                                         t = None
471                                 else:
472                                         DupeShareHACK = pickle.load(f)
473                                 
474                                 if s.st_mtime + 120 >= time():
475                                         workLog = pickle.load(f)
476                                 else:
477                                         logger.debug('Skipping restore of expired workLog')
478         except:
479                 logger.error('Failed to restore state\n' + traceback.format_exc())
480                 return
481         logger.info('State restored successfully')
482         if t:
483                 logger.info('Total downtime: %g seconds' % (time() - t,))
484
485
486 from jsonrpcserver import JSONRPCListener, JSONRPCServer
487 import interactivemode
488 from networkserver import NetworkListener
489 import threading
490 import sharelogging
491 import imp
492
493 if __name__ == "__main__":
494         if not hasattr(config, 'ShareLogging'):
495                 config.ShareLogging = ()
496         if hasattr(config, 'DbOptions'):
497                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
498                 config.ShareLogging = list(config.ShareLogging)
499                 config.ShareLogging.append( {
500                         'type': 'sql',
501                         'engine': 'postgres',
502                         'dbopts': config.DbOptions,
503                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
504                 } )
505         for i in config.ShareLogging:
506                 if not hasattr(i, 'keys'):
507                         name, parameters = i
508                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
509                         if name == 'postgres':
510                                 name = 'sql'
511                                 i = {
512                                         'engine': 'postgres',
513                                         'dbopts': parameters,
514                                 }
515                         elif name == 'logfile':
516                                 i = {}
517                                 i['thropts'] = parameters
518                                 if 'filename' in parameters:
519                                         i['filename'] = parameters['filename']
520                                         i['thropts'] = dict(i['thropts'])
521                                         del i['thropts']['filename']
522                         else:
523                                 i = parameters
524                         i['type'] = name
525                 
526                 name = i['type']
527                 parameters = i
528                 try:
529                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
530                         m = imp.load_module(name, fp, pathname, description)
531                         lo = getattr(m, name)(**parameters)
532                         loggersShare.append(lo)
533                 except:
534                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
535
536         LSbc = []
537         if not hasattr(config, 'BitcoinNodeAddresses'):
538                 config.BitcoinNodeAddresses = ()
539         for a in config.BitcoinNodeAddresses:
540                 LSbc.append(NetworkListener(bcnode, a))
541         
542         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
543                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
544         
545         import jsonrpc_getmemorypool
546         import jsonrpc_getwork
547         import jsonrpc_setworkaux
548         
549         server = JSONRPCServer()
550         if hasattr(config, 'JSONRPCAddress'):
551                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
552                 if not hasattr(config, 'JSONRPCAddresses'):
553                         config.JSONRPCAddresses = []
554                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
555         LS = []
556         for a in config.JSONRPCAddresses:
557                 LS.append(JSONRPCListener(server, a))
558         if hasattr(config, 'SecretUser'):
559                 server.SecretUser = config.SecretUser
560         server.aux = MM.CoinbaseAux
561         server.getBlockHeader = getBlockHeader
562         server.getBlockTemplate = getBlockTemplate
563         server.receiveShare = receiveShare
564         server.RaiseRedFlags = RaiseRedFlags
565         
566         server.TrustedForwarders = ()
567         if hasattr(config, 'TrustedForwarders'):
568                 server.TrustedForwarders = config.TrustedForwarders
569         server.ServerName = config.ServerName
570         
571         restoreState()
572         
573         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
574         bcnode_thr.daemon = True
575         bcnode_thr.start()
576         
577         server.serve_forever()