Merge branch 'up_bip23_propose'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from time import sleep, time
115 import traceback
116
117 def _WorkLogPruner_I(wl):
118         now = time()
119         pruned = 0
120         for username in wl:
121                 userwork = wl[username]
122                 for wli in tuple(userwork.keys()):
123                         if now > userwork[wli][1] + 120:
124                                 del userwork[wli]
125                                 pruned += 1
126         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
127
128 def WorkLogPruner(wl):
129         while True:
130                 try:
131                         sleep(60)
132                         _WorkLogPruner_I(wl)
133                 except:
134                         WorkLogPruner.logger.error(traceback.format_exc())
135 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
136
137
138 from merklemaker import merkleMaker
139 MM = merkleMaker()
140 MM.__dict__.update(config.__dict__)
141 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
142 MM.clearCoinbaseTxn.assemble()
143 MM.makeCoinbaseTxn = makeCoinbaseTxn
144 MM.onBlockChange = blockChanged
145 MM.onBlockUpdate = updateBlocks
146
147
148 from binascii import b2a_hex
149 from copy import deepcopy
150 from merklemaker import MakeBlockHeader
151 from struct import pack, unpack
152 import threading
153 from time import time
154 from util import RejectedShare, dblsha, hash2int, swap32
155 import jsonrpc
156 import traceback
157
158 gotwork = None
159 if hasattr(config, 'GotWorkURI'):
160         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
161
162 def submitGotwork(info):
163         try:
164                 gotwork.gotwork(info)
165         except:
166                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
167
168 def getBlockHeader(username):
169         MRD = MM.getMRD()
170         merkleRoot = MRD[0]
171         hdr = MakeBlockHeader(MRD)
172         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
173         return (hdr, workLog[username][merkleRoot])
174
175 def getBlockTemplate(username):
176         MC = MM.getMC()
177         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
178         wliPos = coinbase[0] + 2
179         wliLen = coinbase[wliPos - 1]
180         wli = coinbase[wliPos:wliPos+wliLen]
181         workLog.setdefault(username, {})[wli] = (MC, time())
182         return MC
183
184 loggersShare = []
185
186 RBDs = []
187 RBPs = []
188
189 from bitcoin.varlen import varlenEncode, varlenDecode
190 import bitcoin.txn
191 from merklemaker import assembleBlock
192
193 def blockSubmissionThread(payload, blkhash):
194         myblock = (blkhash, payload[4:36])
195         payload = b2a_hex(payload).decode('ascii')
196         nexterr = 0
197         while True:
198                 try:
199                         rv = UpstreamBitcoindJSONRPC.submitblock(payload)
200                         break
201                 except:
202                         try:
203                                 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
204                                 if rv is True:
205                                         rv = None
206                                 elif rv is False:
207                                         rv = 'rejected'
208                                 break
209                         except:
210                                 pass
211                         now = time()
212                         if now > nexterr:
213                                 # FIXME: This will show "Method not found" on pre-BIP22 servers
214                                 RaiseRedFlags(traceback.format_exc())
215                                 nexterr = now + 5
216                         if MM.currentBlock[0] not in myblock:
217                                 RaiseRedFlags('Giving up on submitting block upstream')
218                                 return
219         if rv:
220                 # FIXME: The returned value could be a list of multiple responses
221                 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
222
223 _STA = '%064x' % (config.ShareTarget,)
224 def checkShare(share):
225         shareTime = share['time'] = time()
226         
227         data = share['data']
228         data = data[:80]
229         (prevBlock, height, bits) = MM.currentBlock
230         sharePrevBlock = data[4:36]
231         if sharePrevBlock != prevBlock:
232                 if sharePrevBlock == MM.lastBlock[0]:
233                         raise RejectedShare('stale-prevblk')
234                 raise RejectedShare('bad-prevblk')
235         
236         # TODO: use userid
237         username = share['username']
238         if username not in workLog:
239                 raise RejectedShare('unknown-user')
240         
241         if data[72:76] != bits:
242                 raise RejectedShare('bad-diffbits')
243         
244         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
245         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
246         if data[1:4] != b'\0\0\0' or data[0] > 2:
247                 raise RejectedShare('bad-version')
248         
249         shareMerkleRoot = data[36:68]
250         if 'blkdata' in share:
251                 pl = share['blkdata']
252                 (txncount, pl) = varlenDecode(pl)
253                 cbtxn = bitcoin.txn.Txn(pl)
254                 cbtxn.disassemble(retExtra=True)
255                 coinbase = cbtxn.getCoinbase()
256                 wliPos = coinbase[0] + 2
257                 wliLen = coinbase[wliPos - 1]
258                 wli = coinbase[wliPos:wliPos+wliLen]
259                 mode = 'MC'
260                 moden = 1
261         else:
262                 wli = shareMerkleRoot
263                 mode = 'MRD'
264                 moden = 0
265         
266         MWL = workLog[username]
267         if wli not in MWL:
268                 raise RejectedShare('unknown-work')
269         (wld, issueT) = MWL[wli]
270         share[mode] = wld
271         
272         if data in DupeShareHACK:
273                 raise RejectedShare('duplicate')
274         DupeShareHACK[data] = None
275         
276         blkhash = dblsha(data)
277         if blkhash[28:] != b'\0\0\0\0':
278                 raise RejectedShare('H-not-zero')
279         blkhashn = hash2int(blkhash)
280         
281         global networkTarget
282         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
283         logfunc('BLKHASH: %64x' % (blkhashn,))
284         logfunc(' TARGET: %64x' % (networkTarget,))
285         
286         workMerkleTree = wld[1]
287         workCoinbase = wld[2]
288         
289         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
290         txlist = workMerkleTree.data
291         txlist = [deepcopy(txlist[0]),] + txlist[1:]
292         cbtxn = txlist[0]
293         cbtxn.setCoinbase(workCoinbase)
294         cbtxn.assemble()
295         
296         if blkhashn <= networkTarget:
297                 logfunc("Submitting upstream")
298                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
299                 if not moden:
300                         payload = assembleBlock(data, txlist)
301                 else:
302                         payload = share['data'] + share['blkdata']
303                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
304                 RBPs.append(payload)
305                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
306                 bcnode.submitBlock(payload)
307                 share['upstreamResult'] = True
308                 MM.updateBlock(blkhash)
309         
310         # Gotwork hack...
311         if gotwork and blkhashn <= config.GotWorkTarget:
312                 try:
313                         coinbaseMrkl = cbtxn.data
314                         coinbaseMrkl += blkhash
315                         steps = workMerkleTree._steps
316                         coinbaseMrkl += pack('B', len(steps))
317                         for step in steps:
318                                 coinbaseMrkl += step
319                         coinbaseMrkl += b"\0\0\0\0"
320                         info = {}
321                         info['hash'] = b2a_hex(blkhash).decode('ascii')
322                         info['header'] = b2a_hex(data).decode('ascii')
323                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
324                         thr = threading.Thread(target=submitGotwork, args=(info,))
325                         thr.daemon = True
326                         thr.start()
327                 except:
328                         checkShare.logger.warning('Failed to build gotwork request')
329         
330         if blkhashn > config.ShareTarget:
331                 raise RejectedShare('high-hash')
332         share['target'] = config.ShareTarget
333         share['_targethex'] = _STA
334         
335         shareTimestamp = unpack('<L', data[68:72])[0]
336         if shareTime < issueT - 120:
337                 raise RejectedShare('stale-work')
338         if shareTimestamp < shareTime - 300:
339                 raise RejectedShare('time-too-old')
340         if shareTimestamp > shareTime + 7200:
341                 raise RejectedShare('time-too-new')
342         
343         if moden:
344                 cbpre = cbtxn.getCoinbase()
345                 cbpreLen = len(cbpre)
346                 if coinbase[:cbpreLen] != cbpre:
347                         raise RejectedShare('bad-cb-prefix')
348                 
349                 # Filter out known "I support" flags, to prevent exploits
350                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
351                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
352                                 raise RejectedShare('bad-cb-flag')
353                 
354                 if len(coinbase) > 100:
355                         raise RejectedShare('bad-cb-length')
356                 
357                 cbtxn.setCoinbase(coinbase)
358                 cbtxn.assemble()
359                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
360                         raise RejectedShare('bad-txnmrklroot')
361                 
362                 allowed = assembleBlock(data, txlist)
363                 if allowed != share['data'] + share['blkdata']:
364                         raise RejectedShare('bad-txns')
365 checkShare.logger = logging.getLogger('checkShare')
366
367 def receiveShare(share):
368         # TODO: username => userid
369         try:
370                 checkShare(share)
371         except RejectedShare as rej:
372                 share['rejectReason'] = str(rej)
373                 raise
374         finally:
375                 if '_origdata' in share:
376                         share['solution'] = share['_origdata']
377                 else:
378                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
379                 for i in loggersShare:
380                         i(share)
381
382 def newBlockNotification():
383         logging.getLogger('newBlockNotification').info('Received new block notification')
384         MM.updateMerkleTree()
385         # TODO: Force RESPOND TO LONGPOLLS?
386         pass
387
388 def newBlockNotificationSIGNAL(signum, frame):
389         # Use a new thread, in case the signal handler is called with locks held
390         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
391         thr.daemon = True
392         thr.start()
393
394 from signal import signal, SIGUSR1
395 signal(SIGUSR1, newBlockNotificationSIGNAL)
396
397
398 import os
399 import os.path
400 import pickle
401 import signal
402 import sys
403 from time import sleep
404 import traceback
405
406 SAVE_STATE_FILENAME = 'eloipool.worklog'
407
408 def stopServers():
409         logger = logging.getLogger('stopServers')
410         
411         if hasattr(stopServers, 'already'):
412                 logger.debug('Already tried to stop servers before')
413                 return
414         stopServers.already = True
415         
416         logger.info('Stopping servers...')
417         global bcnode, server
418         servers = (bcnode, server)
419         for s in servers:
420                 s.keepgoing = False
421         for s in servers:
422                 try:
423                         s.wakeup()
424                 except:
425                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
426         i = 0
427         while True:
428                 sl = []
429                 for s in servers:
430                         if s.running:
431                                 sl.append(s.__class__.__name__)
432                 if not sl:
433                         break
434                 i += 1
435                 if i >= 0x100:
436                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
437                         break
438                 sleep(0.01)
439         
440         for s in servers:
441                 for fd in s._fd.keys():
442                         os.close(fd)
443
444 def saveState(t = None):
445         logger = logging.getLogger('saveState')
446         
447         # Then, save data needed to resume work
448         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
449         i = 0
450         while True:
451                 try:
452                         with open(SAVE_STATE_FILENAME, 'wb') as f:
453                                 pickle.dump(t, f)
454                                 pickle.dump(DupeShareHACK, f)
455                                 pickle.dump(workLog, f)
456                         break
457                 except:
458                         i += 1
459                         if i >= 0x10000:
460                                 logger.error('Failed to save work\n' + traceback.format_exc())
461                                 try:
462                                         os.unlink(SAVE_STATE_FILENAME)
463                                 except:
464                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
465
466 def exit():
467         t = time()
468         stopServers()
469         saveState(t)
470         logging.getLogger('exit').info('Goodbye...')
471         os.kill(os.getpid(), signal.SIGTERM)
472         sys.exit(0)
473
474 def restart():
475         t = time()
476         stopServers()
477         saveState(t)
478         logging.getLogger('restart').info('Restarting...')
479         try:
480                 os.execv(sys.argv[0], sys.argv)
481         except:
482                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
483
484 def restoreState():
485         if not os.path.exists(SAVE_STATE_FILENAME):
486                 return
487         
488         global workLog, DupeShareHACK
489         
490         logger = logging.getLogger('restoreState')
491         s = os.stat(SAVE_STATE_FILENAME)
492         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
493         try:
494                 with open(SAVE_STATE_FILENAME, 'rb') as f:
495                         t = pickle.load(f)
496                         if type(t) == tuple:
497                                 if len(t) > 2:
498                                         # Future formats, not supported here
499                                         ver = t[3]
500                                         # TODO
501                                 
502                                 # Old format, from 2012-02-02 to 2012-02-03
503                                 workLog = t[0]
504                                 DupeShareHACK = t[1]
505                                 t = None
506                         else:
507                                 if isinstance(t, dict):
508                                         # Old format, from 2012-02-03 to 2012-02-03
509                                         DupeShareHACK = t
510                                         t = None
511                                 else:
512                                         # Current format, from 2012-02-03 onward
513                                         DupeShareHACK = pickle.load(f)
514                                 
515                                 if t + 120 >= time():
516                                         workLog = pickle.load(f)
517                                 else:
518                                         logger.debug('Skipping restore of expired workLog')
519         except:
520                 logger.error('Failed to restore state\n' + traceback.format_exc())
521                 return
522         logger.info('State restored successfully')
523         if t:
524                 logger.info('Total downtime: %g seconds' % (time() - t,))
525
526
527 from jsonrpcserver import JSONRPCListener, JSONRPCServer
528 import interactivemode
529 from networkserver import NetworkListener
530 import threading
531 import sharelogging
532 import imp
533
534 if __name__ == "__main__":
535         if not hasattr(config, 'ShareLogging'):
536                 config.ShareLogging = ()
537         if hasattr(config, 'DbOptions'):
538                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
539                 config.ShareLogging = list(config.ShareLogging)
540                 config.ShareLogging.append( {
541                         'type': 'sql',
542                         'engine': 'postgres',
543                         'dbopts': config.DbOptions,
544                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
545                 } )
546         for i in config.ShareLogging:
547                 if not hasattr(i, 'keys'):
548                         name, parameters = i
549                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
550                         if name == 'postgres':
551                                 name = 'sql'
552                                 i = {
553                                         'engine': 'postgres',
554                                         'dbopts': parameters,
555                                 }
556                         elif name == 'logfile':
557                                 i = {}
558                                 i['thropts'] = parameters
559                                 if 'filename' in parameters:
560                                         i['filename'] = parameters['filename']
561                                         i['thropts'] = dict(i['thropts'])
562                                         del i['thropts']['filename']
563                         else:
564                                 i = parameters
565                         i['type'] = name
566                 
567                 name = i['type']
568                 parameters = i
569                 try:
570                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
571                         m = imp.load_module(name, fp, pathname, description)
572                         lo = getattr(m, name)(**parameters)
573                         loggersShare.append(lo.logShare)
574                 except:
575                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
576
577         LSbc = []
578         if not hasattr(config, 'BitcoinNodeAddresses'):
579                 config.BitcoinNodeAddresses = ()
580         for a in config.BitcoinNodeAddresses:
581                 LSbc.append(NetworkListener(bcnode, a))
582         
583         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
584                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
585         
586         import jsonrpc_getblocktemplate
587         import jsonrpc_getwork
588         import jsonrpc_setworkaux
589         
590         server = JSONRPCServer()
591         if hasattr(config, 'JSONRPCAddress'):
592                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
593                 if not hasattr(config, 'JSONRPCAddresses'):
594                         config.JSONRPCAddresses = []
595                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
596         LS = []
597         for a in config.JSONRPCAddresses:
598                 LS.append(JSONRPCListener(server, a))
599         if hasattr(config, 'SecretUser'):
600                 server.SecretUser = config.SecretUser
601         server.aux = MM.CoinbaseAux
602         server.getBlockHeader = getBlockHeader
603         server.getBlockTemplate = getBlockTemplate
604         server.receiveShare = receiveShare
605         server.RaiseRedFlags = RaiseRedFlags
606         server.ShareTarget = config.ShareTarget
607         
608         if hasattr(config, 'TrustedForwarders'):
609                 server.TrustedForwarders = config.TrustedForwarders
610         server.ServerName = config.ServerName
611         
612         MM.start()
613         
614         restoreState()
615         
616         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
617         prune_thr.daemon = True
618         prune_thr.start()
619         
620         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
621         bcnode_thr.daemon = True
622         bcnode_thr.start()
623         
624         server.serve_forever()