Merge branch 'bip22'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 def assembleBlock(blkhdr, txlist):
167         payload = blkhdr
168         payload += varlenEncode(len(txlist))
169         for tx in txlist:
170                 payload += tx.data
171         return payload
172
173 def blockSubmissionThread(payload, blkhash):
174         myblock = (blkhash, payload[4:36])
175         payload = b2a_hex(payload).decode('ascii')
176         nexterr = 0
177         while True:
178                 try:
179                         rv = UpstreamBitcoindJSONRPC.submitblock(payload)
180                         break
181                 except:
182                         try:
183                                 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
184                                 if rv is True:
185                                         rv = None
186                                 elif rv is False:
187                                         rv = 'rejected'
188                                 break
189                         except:
190                                 pass
191                         now = time()
192                         if now > nexterr:
193                                 # FIXME: This will show "Method not found" on pre-BIP22 servers
194                                 RaiseRedFlags(traceback.format_exc())
195                                 nexterr = now + 5
196                         if MM.currentBlock[0] not in myblock:
197                                 RaiseRedFlags('Giving up on submitting block upstream')
198                                 return
199         if rv:
200                 # FIXME: The returned value could be a list of multiple responses
201                 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
202
203 _STA = '%064x' % (config.ShareTarget,)
204 def checkShare(share):
205         shareTime = share['time'] = time()
206         
207         data = share['data']
208         data = data[:80]
209         (prevBlock, height, bits) = MM.currentBlock
210         sharePrevBlock = data[4:36]
211         if sharePrevBlock != prevBlock:
212                 if sharePrevBlock == MM.lastBlock[0]:
213                         raise RejectedShare('stale-prevblk')
214                 raise RejectedShare('bad-prevblk')
215         
216         # TODO: use userid
217         username = share['username']
218         if username not in workLog:
219                 raise RejectedShare('unknown-user')
220         
221         if data[72:76] != bits:
222                 raise RejectedShare('bad-diffbits')
223         
224         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
225         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
226         if data[1:4] != b'\0\0\0' or data[0] > 2:
227                 raise RejectedShare('bad-version')
228         
229         shareMerkleRoot = data[36:68]
230         if 'blkdata' in share:
231                 pl = share['blkdata']
232                 (txncount, pl) = varlenDecode(pl)
233                 cbtxn = bitcoin.txn.Txn(pl)
234                 cbtxn.disassemble(retExtra=True)
235                 coinbase = cbtxn.getCoinbase()
236                 wliPos = coinbase[0] + 2
237                 wliLen = coinbase[wliPos - 1]
238                 wli = coinbase[wliPos:wliPos+wliLen]
239                 mode = 'MC'
240                 moden = 1
241         else:
242                 wli = shareMerkleRoot
243                 mode = 'MRD'
244                 moden = 0
245         
246         MWL = workLog[username]
247         if wli not in MWL:
248                 raise RejectedShare('unknown-work')
249         (wld, issueT) = MWL[wli]
250         share[mode] = wld
251         
252         if data in DupeShareHACK:
253                 raise RejectedShare('duplicate')
254         DupeShareHACK[data] = None
255         
256         blkhash = dblsha(data)
257         if blkhash[28:] != b'\0\0\0\0':
258                 raise RejectedShare('H-not-zero')
259         blkhashn = hash2int(blkhash)
260         
261         global networkTarget
262         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
263         logfunc('BLKHASH: %64x' % (blkhashn,))
264         logfunc(' TARGET: %64x' % (networkTarget,))
265         
266         workMerkleTree = wld[1]
267         workCoinbase = wld[2]
268         
269         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
270         txlist = workMerkleTree.data
271         txlist = [deepcopy(txlist[0]),] + txlist[1:]
272         cbtxn = txlist[0]
273         cbtxn.setCoinbase(workCoinbase)
274         cbtxn.assemble()
275         
276         if blkhashn <= networkTarget:
277                 logfunc("Submitting upstream")
278                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
279                 if not moden:
280                         payload = assembleBlock(data, txlist)
281                 else:
282                         payload = share['data'] + share['blkdata']
283                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
284                 RBPs.append(payload)
285                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
286                 bcnode.submitBlock(payload)
287                 share['upstreamResult'] = True
288                 MM.updateBlock(blkhash)
289         
290         # Gotwork hack...
291         if gotwork and blkhashn <= config.GotWorkTarget:
292                 try:
293                         coinbaseMrkl = cbtxn.data
294                         coinbaseMrkl += blkhash
295                         steps = workMerkleTree._steps
296                         coinbaseMrkl += pack('B', len(steps))
297                         for step in steps:
298                                 coinbaseMrkl += step
299                         coinbaseMrkl += b"\0\0\0\0"
300                         info = {}
301                         info['hash'] = b2a_hex(blkhash).decode('ascii')
302                         info['header'] = b2a_hex(data).decode('ascii')
303                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
304                         thr = threading.Thread(target=submitGotwork, args=(info,))
305                         thr.daemon = True
306                         thr.start()
307                 except:
308                         checkShare.logger.warning('Failed to build gotwork request')
309         
310         if blkhashn > config.ShareTarget:
311                 raise RejectedShare('high-hash')
312         share['target'] = config.ShareTarget
313         share['_targethex'] = _STA
314         
315         shareTimestamp = unpack('<L', data[68:72])[0]
316         if shareTime < issueT - 120:
317                 raise RejectedShare('stale-work')
318         if shareTimestamp < shareTime - 300:
319                 raise RejectedShare('time-too-old')
320         if shareTimestamp > shareTime + 7200:
321                 raise RejectedShare('time-too-new')
322         
323         if moden:
324                 cbpre = cbtxn.getCoinbase()
325                 cbpreLen = len(cbpre)
326                 if coinbase[:cbpreLen] != cbpre:
327                         raise RejectedShare('bad-cb-prefix')
328                 
329                 # Filter out known "I support" flags, to prevent exploits
330                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
331                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
332                                 raise RejectedShare('bad-cb-flag')
333                 
334                 if len(coinbase) > 100:
335                         raise RejectedShare('bad-cb-length')
336                 
337                 cbtxn.setCoinbase(coinbase)
338                 cbtxn.assemble()
339                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
340                         raise RejectedShare('bad-txnmrklroot')
341                 
342                 allowed = assembleBlock(data, txlist)
343                 if allowed != share['data'] + share['blkdata']:
344                         raise RejectedShare('bad-txns')
345 checkShare.logger = logging.getLogger('checkShare')
346
347 def receiveShare(share):
348         # TODO: username => userid
349         try:
350                 checkShare(share)
351         except RejectedShare as rej:
352                 share['rejectReason'] = str(rej)
353                 raise
354         finally:
355                 if '_origdata' in share:
356                         share['solution'] = share['_origdata']
357                 else:
358                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
359                 for i in loggersShare:
360                         i(share)
361
362 def newBlockNotification():
363         logging.getLogger('newBlockNotification').info('Received new block notification')
364         MM.updateMerkleTree()
365         # TODO: Force RESPOND TO LONGPOLLS?
366         pass
367
368 def newBlockNotificationSIGNAL(signum, frame):
369         # Use a new thread, in case the signal handler is called with locks held
370         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
371         thr.daemon = True
372         thr.start()
373
374 from signal import signal, SIGUSR1
375 signal(SIGUSR1, newBlockNotificationSIGNAL)
376
377
378 import os
379 import os.path
380 import pickle
381 import signal
382 import sys
383 from time import sleep
384 import traceback
385
386 SAVE_STATE_FILENAME = 'eloipool.worklog'
387
388 def stopServers():
389         logger = logging.getLogger('stopServers')
390         
391         if hasattr(stopServers, 'already'):
392                 logger.debug('Already tried to stop servers before')
393                 return
394         stopServers.already = True
395         
396         logger.info('Stopping servers...')
397         global bcnode, server
398         servers = (bcnode, server)
399         for s in servers:
400                 s.keepgoing = False
401         for s in servers:
402                 try:
403                         s.wakeup()
404                 except:
405                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
406         i = 0
407         while True:
408                 sl = []
409                 for s in servers:
410                         if s.running:
411                                 sl.append(s.__class__.__name__)
412                 if not sl:
413                         break
414                 i += 1
415                 if i >= 0x100:
416                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
417                         break
418                 sleep(0.01)
419         
420         for s in servers:
421                 for fd in s._fd.keys():
422                         os.close(fd)
423
424 def saveState(t = None):
425         logger = logging.getLogger('saveState')
426         
427         # Then, save data needed to resume work
428         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
429         i = 0
430         while True:
431                 try:
432                         with open(SAVE_STATE_FILENAME, 'wb') as f:
433                                 pickle.dump(t, f)
434                                 pickle.dump(DupeShareHACK, f)
435                                 pickle.dump(workLog, f)
436                         break
437                 except:
438                         i += 1
439                         if i >= 0x10000:
440                                 logger.error('Failed to save work\n' + traceback.format_exc())
441                                 try:
442                                         os.unlink(SAVE_STATE_FILENAME)
443                                 except:
444                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
445
446 def exit():
447         t = time()
448         stopServers()
449         saveState(t)
450         logging.getLogger('exit').info('Goodbye...')
451         os.kill(os.getpid(), signal.SIGTERM)
452         sys.exit(0)
453
454 def restart():
455         t = time()
456         stopServers()
457         saveState(t)
458         logging.getLogger('restart').info('Restarting...')
459         try:
460                 os.execv(sys.argv[0], sys.argv)
461         except:
462                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
463
464 def restoreState():
465         if not os.path.exists(SAVE_STATE_FILENAME):
466                 return
467         
468         global workLog, DupeShareHACK
469         
470         logger = logging.getLogger('restoreState')
471         s = os.stat(SAVE_STATE_FILENAME)
472         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
473         try:
474                 with open(SAVE_STATE_FILENAME, 'rb') as f:
475                         t = pickle.load(f)
476                         if type(t) == tuple:
477                                 if len(t) > 2:
478                                         # Future formats, not supported here
479                                         ver = t[3]
480                                         # TODO
481                                 
482                                 # Old format, from 2012-02-02 to 2012-02-03
483                                 workLog = t[0]
484                                 DupeShareHACK = t[1]
485                                 t = None
486                         else:
487                                 if isinstance(t, dict):
488                                         # Old format, from 2012-02-03 to 2012-02-03
489                                         DupeShareHACK = t
490                                         t = None
491                                 else:
492                                         # Current format, from 2012-02-03 onward
493                                         DupeShareHACK = pickle.load(f)
494                                 
495                                 if t + 120 >= time():
496                                         workLog = pickle.load(f)
497                                 else:
498                                         logger.debug('Skipping restore of expired workLog')
499         except:
500                 logger.error('Failed to restore state\n' + traceback.format_exc())
501                 return
502         logger.info('State restored successfully')
503         if t:
504                 logger.info('Total downtime: %g seconds' % (time() - t,))
505
506
507 from jsonrpcserver import JSONRPCListener, JSONRPCServer
508 import interactivemode
509 from networkserver import NetworkListener
510 import threading
511 import sharelogging
512 import imp
513
514 if __name__ == "__main__":
515         if not hasattr(config, 'ShareLogging'):
516                 config.ShareLogging = ()
517         if hasattr(config, 'DbOptions'):
518                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
519                 config.ShareLogging = list(config.ShareLogging)
520                 config.ShareLogging.append( {
521                         'type': 'sql',
522                         'engine': 'postgres',
523                         'dbopts': config.DbOptions,
524                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
525                 } )
526         for i in config.ShareLogging:
527                 if not hasattr(i, 'keys'):
528                         name, parameters = i
529                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
530                         if name == 'postgres':
531                                 name = 'sql'
532                                 i = {
533                                         'engine': 'postgres',
534                                         'dbopts': parameters,
535                                 }
536                         elif name == 'logfile':
537                                 i = {}
538                                 i['thropts'] = parameters
539                                 if 'filename' in parameters:
540                                         i['filename'] = parameters['filename']
541                                         i['thropts'] = dict(i['thropts'])
542                                         del i['thropts']['filename']
543                         else:
544                                 i = parameters
545                         i['type'] = name
546                 
547                 name = i['type']
548                 parameters = i
549                 try:
550                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
551                         m = imp.load_module(name, fp, pathname, description)
552                         lo = getattr(m, name)(**parameters)
553                         loggersShare.append(lo.logShare)
554                 except:
555                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
556
557         LSbc = []
558         if not hasattr(config, 'BitcoinNodeAddresses'):
559                 config.BitcoinNodeAddresses = ()
560         for a in config.BitcoinNodeAddresses:
561                 LSbc.append(NetworkListener(bcnode, a))
562         
563         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
564                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
565         
566         import jsonrpc_getblocktemplate
567         import jsonrpc_getwork
568         import jsonrpc_setworkaux
569         
570         server = JSONRPCServer()
571         if hasattr(config, 'JSONRPCAddress'):
572                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
573                 if not hasattr(config, 'JSONRPCAddresses'):
574                         config.JSONRPCAddresses = []
575                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
576         LS = []
577         for a in config.JSONRPCAddresses:
578                 LS.append(JSONRPCListener(server, a))
579         if hasattr(config, 'SecretUser'):
580                 server.SecretUser = config.SecretUser
581         server.aux = MM.CoinbaseAux
582         server.getBlockHeader = getBlockHeader
583         server.getBlockTemplate = getBlockTemplate
584         server.receiveShare = receiveShare
585         server.RaiseRedFlags = RaiseRedFlags
586         server.ShareTarget = config.ShareTarget
587         
588         if hasattr(config, 'TrustedForwarders'):
589                 server.TrustedForwarders = config.TrustedForwarders
590         server.ServerName = config.ServerName
591         
592         MM.start()
593         
594         restoreState()
595         
596         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
597         bcnode_thr.daemon = True
598         bcnode_thr.start()
599         
600         server.serve_forever()