Update to use current BIP22 method names for upstream
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 def assembleBlock(blkhdr, txlist):
167         payload = blkhdr
168         payload += varlenEncode(len(txlist))
169         for tx in txlist:
170                 payload += tx.data
171         return payload
172
173 def blockSubmissionThread(payload, blkhash):
174         myblock = (blkhash, payload[4:36])
175         payload = b2a_hex(payload).decode('ascii')
176         nexterr = 0
177         while True:
178                 try:
179                         rv = UpstreamBitcoindJSONRPC.submitblock(payload)
180                         break
181                 except:
182                         try:
183                                 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
184                                 if rv is True:
185                                         rv = None
186                                 elif rv is False:
187                                         rv = 'rejected'
188                                 break
189                         except:
190                                 pass
191                         now = time()
192                         if now > nexterr:
193                                 # FIXME: This will show "Method not found" on pre-BIP22 servers
194                                 RaiseRedFlags(traceback.format_exc())
195                                 nexterr = now + 5
196                         if MM.currentBlock[0] not in myblock:
197                                 RaiseRedFlags('Giving up on submitting block upstream')
198                                 return
199         if rv:
200                 # FIXME: The returned value could be a list of multiple responses
201                 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
202
203 _STA = '%064x' % (config.ShareTarget,)
204 def checkShare(share):
205         shareTime = share['time'] = time()
206         
207         data = share['data']
208         data = data[:80]
209         (prevBlock, height, bits) = MM.currentBlock
210         sharePrevBlock = data[4:36]
211         if sharePrevBlock != prevBlock:
212                 if sharePrevBlock == MM.lastBlock[0]:
213                         raise RejectedShare('stale-prevblk')
214                 raise RejectedShare('bad-prevblk')
215         
216         # TODO: use userid
217         username = share['username']
218         if username not in workLog:
219                 raise RejectedShare('unknown-user')
220         
221         if data[72:76] != bits:
222                 raise RejectedShare('bad-diffbits')
223         
224         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
225         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
226         if data[1:4] != b'\0\0\0' or data[0] > 2:
227                 raise RejectedShare('bad-version')
228         
229         shareMerkleRoot = data[36:68]
230         if 'blkdata' in share:
231                 pl = share['blkdata']
232                 (txncount, pl) = varlenDecode(pl)
233                 cbtxn = bitcoin.txn.Txn(pl)
234                 cbtxn.disassemble(retExtra=True)
235                 coinbase = cbtxn.getCoinbase()
236                 wliLen = coinbase[0]
237                 wli = coinbase[1:wliLen+1]
238                 mode = 'MC'
239                 moden = 1
240         else:
241                 wli = shareMerkleRoot
242                 mode = 'MRD'
243                 moden = 0
244         
245         MWL = workLog[username]
246         if wli not in MWL:
247                 raise RejectedShare('unknown-work')
248         (wld, issueT) = MWL[wli]
249         share[mode] = wld
250         
251         if data in DupeShareHACK:
252                 raise RejectedShare('duplicate')
253         DupeShareHACK[data] = None
254         
255         blkhash = dblsha(data)
256         if blkhash[28:] != b'\0\0\0\0':
257                 raise RejectedShare('H-not-zero')
258         blkhashn = hash2int(blkhash)
259         
260         global networkTarget
261         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
262         logfunc('BLKHASH: %64x' % (blkhashn,))
263         logfunc(' TARGET: %64x' % (networkTarget,))
264         
265         workMerkleTree = wld[1]
266         workCoinbase = wld[2]
267         
268         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
269         txlist = workMerkleTree.data
270         txlist = [deepcopy(txlist[0]),] + txlist[1:]
271         cbtxn = txlist[0]
272         cbtxn.setCoinbase(workCoinbase)
273         cbtxn.assemble()
274         
275         if blkhashn <= networkTarget:
276                 logfunc("Submitting upstream")
277                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
278                 if not moden:
279                         payload = assembleBlock(data, txlist)
280                 else:
281                         payload = share['data'] + share['blkdata']
282                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
283                 RBPs.append(payload)
284                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
285                 bcnode.submitBlock(payload)
286                 share['upstreamResult'] = True
287                 MM.updateBlock(blkhash)
288         
289         # Gotwork hack...
290         if gotwork and blkhashn <= config.GotWorkTarget:
291                 try:
292                         coinbaseMrkl = cbtxn.data
293                         coinbaseMrkl += blkhash
294                         steps = workMerkleTree._steps
295                         coinbaseMrkl += pack('B', len(steps))
296                         for step in steps:
297                                 coinbaseMrkl += step
298                         coinbaseMrkl += b"\0\0\0\0"
299                         info = {}
300                         info['hash'] = b2a_hex(blkhash).decode('ascii')
301                         info['header'] = b2a_hex(data).decode('ascii')
302                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
303                         thr = threading.Thread(target=submitGotwork, args=(info,))
304                         thr.daemon = True
305                         thr.start()
306                 except:
307                         checkShare.logger.warning('Failed to build gotwork request')
308         
309         if blkhashn > config.ShareTarget:
310                 raise RejectedShare('high-hash')
311         share['target'] = config.ShareTarget
312         share['_targethex'] = _STA
313         
314         shareTimestamp = unpack('<L', data[68:72])[0]
315         if shareTime < issueT - 120:
316                 raise RejectedShare('stale-work')
317         if shareTimestamp < shareTime - 300:
318                 raise RejectedShare('time-too-old')
319         if shareTimestamp > shareTime + 7200:
320                 raise RejectedShare('time-too-new')
321         
322         if moden:
323                 cbpre = cbtxn.getCoinbase()
324                 cbpreLen = len(cbpre)
325                 if coinbase[:cbpreLen] != cbpre:
326                         raise RejectedShare('bad-cb-prefix')
327                 
328                 # Filter out known "I support" flags, to prevent exploits
329                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
330                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
331                                 raise RejectedShare('bad-cb-flag')
332                 
333                 if len(coinbase) > 100:
334                         raise RejectedShare('bad-cb-length')
335                 
336                 cbtxn.setCoinbase(coinbase)
337                 cbtxn.assemble()
338                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
339                         raise RejectedShare('bad-txnmrklroot')
340                 
341                 allowed = assembleBlock(data, txlist)
342                 if allowed != share['data'] + share['blkdata']:
343                         raise RejectedShare('bad-txns')
344 checkShare.logger = logging.getLogger('checkShare')
345
346 def receiveShare(share):
347         # TODO: username => userid
348         try:
349                 checkShare(share)
350         except RejectedShare as rej:
351                 share['rejectReason'] = str(rej)
352                 raise
353         finally:
354                 if '_origdata' in share:
355                         share['solution'] = share['_origdata']
356                 else:
357                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
358                 for i in loggersShare:
359                         i(share)
360
361 def newBlockNotification():
362         logging.getLogger('newBlockNotification').info('Received new block notification')
363         MM.updateMerkleTree()
364         # TODO: Force RESPOND TO LONGPOLLS?
365         pass
366
367 def newBlockNotificationSIGNAL(signum, frame):
368         # Use a new thread, in case the signal handler is called with locks held
369         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
370         thr.daemon = True
371         thr.start()
372
373 from signal import signal, SIGUSR1
374 signal(SIGUSR1, newBlockNotificationSIGNAL)
375
376
377 import os
378 import os.path
379 import pickle
380 import signal
381 import sys
382 from time import sleep
383 import traceback
384
385 SAVE_STATE_FILENAME = 'eloipool.worklog'
386
387 def stopServers():
388         logger = logging.getLogger('stopServers')
389         
390         if hasattr(stopServers, 'already'):
391                 logger.debug('Already tried to stop servers before')
392                 return
393         stopServers.already = True
394         
395         logger.info('Stopping servers...')
396         global bcnode, server
397         servers = (bcnode, server)
398         for s in servers:
399                 s.keepgoing = False
400         for s in servers:
401                 try:
402                         s.wakeup()
403                 except:
404                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
405         i = 0
406         while True:
407                 sl = []
408                 for s in servers:
409                         if s.running:
410                                 sl.append(s.__class__.__name__)
411                 if not sl:
412                         break
413                 i += 1
414                 if i >= 0x100:
415                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
416                         break
417                 sleep(0.01)
418         
419         for s in servers:
420                 for fd in s._fd.keys():
421                         os.close(fd)
422
423 def saveState(t = None):
424         logger = logging.getLogger('saveState')
425         
426         # Then, save data needed to resume work
427         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
428         i = 0
429         while True:
430                 try:
431                         with open(SAVE_STATE_FILENAME, 'wb') as f:
432                                 pickle.dump(t, f)
433                                 pickle.dump(DupeShareHACK, f)
434                                 pickle.dump(workLog, f)
435                         break
436                 except:
437                         i += 1
438                         if i >= 0x10000:
439                                 logger.error('Failed to save work\n' + traceback.format_exc())
440                                 try:
441                                         os.unlink(SAVE_STATE_FILENAME)
442                                 except:
443                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
444
445 def exit():
446         t = time()
447         stopServers()
448         saveState(t)
449         logging.getLogger('exit').info('Goodbye...')
450         os.kill(os.getpid(), signal.SIGTERM)
451         sys.exit(0)
452
453 def restart():
454         t = time()
455         stopServers()
456         saveState(t)
457         logging.getLogger('restart').info('Restarting...')
458         try:
459                 os.execv(sys.argv[0], sys.argv)
460         except:
461                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
462
463 def restoreState():
464         if not os.path.exists(SAVE_STATE_FILENAME):
465                 return
466         
467         global workLog, DupeShareHACK
468         
469         logger = logging.getLogger('restoreState')
470         s = os.stat(SAVE_STATE_FILENAME)
471         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
472         try:
473                 with open(SAVE_STATE_FILENAME, 'rb') as f:
474                         t = pickle.load(f)
475                         if type(t) == tuple:
476                                 if len(t) > 2:
477                                         # Future formats, not supported here
478                                         ver = t[3]
479                                         # TODO
480                                 
481                                 # Old format, from 2012-02-02 to 2012-02-03
482                                 workLog = t[0]
483                                 DupeShareHACK = t[1]
484                                 t = None
485                         else:
486                                 if isinstance(t, dict):
487                                         # Old format, from 2012-02-03 to 2012-02-03
488                                         DupeShareHACK = t
489                                         t = None
490                                 else:
491                                         # Current format, from 2012-02-03 onward
492                                         DupeShareHACK = pickle.load(f)
493                                 
494                                 if t + 120 >= time():
495                                         workLog = pickle.load(f)
496                                 else:
497                                         logger.debug('Skipping restore of expired workLog')
498         except:
499                 logger.error('Failed to restore state\n' + traceback.format_exc())
500                 return
501         logger.info('State restored successfully')
502         if t:
503                 logger.info('Total downtime: %g seconds' % (time() - t,))
504
505
506 from jsonrpcserver import JSONRPCListener, JSONRPCServer
507 import interactivemode
508 from networkserver import NetworkListener
509 import threading
510 import sharelogging
511 import imp
512
513 if __name__ == "__main__":
514         if not hasattr(config, 'ShareLogging'):
515                 config.ShareLogging = ()
516         if hasattr(config, 'DbOptions'):
517                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
518                 config.ShareLogging = list(config.ShareLogging)
519                 config.ShareLogging.append( {
520                         'type': 'sql',
521                         'engine': 'postgres',
522                         'dbopts': config.DbOptions,
523                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
524                 } )
525         for i in config.ShareLogging:
526                 if not hasattr(i, 'keys'):
527                         name, parameters = i
528                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
529                         if name == 'postgres':
530                                 name = 'sql'
531                                 i = {
532                                         'engine': 'postgres',
533                                         'dbopts': parameters,
534                                 }
535                         elif name == 'logfile':
536                                 i = {}
537                                 i['thropts'] = parameters
538                                 if 'filename' in parameters:
539                                         i['filename'] = parameters['filename']
540                                         i['thropts'] = dict(i['thropts'])
541                                         del i['thropts']['filename']
542                         else:
543                                 i = parameters
544                         i['type'] = name
545                 
546                 name = i['type']
547                 parameters = i
548                 try:
549                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
550                         m = imp.load_module(name, fp, pathname, description)
551                         lo = getattr(m, name)(**parameters)
552                         loggersShare.append(lo.logShare)
553                 except:
554                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
555
556         LSbc = []
557         if not hasattr(config, 'BitcoinNodeAddresses'):
558                 config.BitcoinNodeAddresses = ()
559         for a in config.BitcoinNodeAddresses:
560                 LSbc.append(NetworkListener(bcnode, a))
561         
562         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
563                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
564         
565         import jsonrpc_getmemorypool
566         import jsonrpc_getwork
567         import jsonrpc_setworkaux
568         
569         server = JSONRPCServer()
570         if hasattr(config, 'JSONRPCAddress'):
571                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
572                 if not hasattr(config, 'JSONRPCAddresses'):
573                         config.JSONRPCAddresses = []
574                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
575         LS = []
576         for a in config.JSONRPCAddresses:
577                 LS.append(JSONRPCListener(server, a))
578         if hasattr(config, 'SecretUser'):
579                 server.SecretUser = config.SecretUser
580         server.aux = MM.CoinbaseAux
581         server.getBlockHeader = getBlockHeader
582         server.getBlockTemplate = getBlockTemplate
583         server.receiveShare = receiveShare
584         server.RaiseRedFlags = RaiseRedFlags
585         server.ShareTarget = config.ShareTarget
586         
587         if hasattr(config, 'TrustedForwarders'):
588                 server.TrustedForwarders = config.TrustedForwarders
589         server.ServerName = config.ServerName
590         
591         MM.start()
592         
593         restoreState()
594         
595         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
596         bcnode_thr.daemon = True
597         bcnode_thr.start()
598         
599         server.serve_forever()