Merge branch 'worklog_prune'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from time import sleep, time
115 import traceback
116
117 def _WorkLogPruner_I(wl):
118         now = time()
119         pruned = 0
120         for username in wl:
121                 userwork = wl[username]
122                 for wli in tuple(userwork.keys()):
123                         if now > userwork[wli][1] + 120:
124                                 del userwork[wli]
125                                 pruned += 1
126         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
127
128 def WorkLogPruner(wl):
129         while True:
130                 try:
131                         sleep(60)
132                         _WorkLogPruner_I(wl)
133                 except:
134                         WorkLogPruner.logger.error(traceback.format_exc())
135 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
136
137
138 from merklemaker import merkleMaker
139 MM = merkleMaker()
140 MM.__dict__.update(config.__dict__)
141 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
142 MM.clearCoinbaseTxn.assemble()
143 MM.makeCoinbaseTxn = makeCoinbaseTxn
144 MM.onBlockChange = blockChanged
145 MM.onBlockUpdate = updateBlocks
146
147
148 from binascii import b2a_hex
149 from copy import deepcopy
150 from struct import pack, unpack
151 import threading
152 from time import time
153 from util import RejectedShare, dblsha, hash2int, swap32
154 import jsonrpc
155 import traceback
156
157 gotwork = None
158 if hasattr(config, 'GotWorkURI'):
159         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
160
161 def submitGotwork(info):
162         try:
163                 gotwork.gotwork(info)
164         except:
165                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
166
167 def getBlockHeader(username):
168         MRD = MM.getMRD()
169         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
170         timestamp = pack('<L', int(time()))
171         hdr = b'\2\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
172         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
173         return (hdr, workLog[username][merkleRoot])
174
175 def getBlockTemplate(username):
176         MC = MM.getMC()
177         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
178         wliPos = coinbase[0] + 2
179         wliLen = coinbase[wliPos - 1]
180         wli = coinbase[wliPos:wliPos+wliLen]
181         workLog.setdefault(username, {})[wli] = (MC, time())
182         return MC
183
184 loggersShare = []
185
186 RBDs = []
187 RBPs = []
188
189 from bitcoin.varlen import varlenEncode, varlenDecode
190 import bitcoin.txn
191 def assembleBlock(blkhdr, txlist):
192         payload = blkhdr
193         payload += varlenEncode(len(txlist))
194         for tx in txlist:
195                 payload += tx.data
196         return payload
197
198 def blockSubmissionThread(payload, blkhash):
199         myblock = (blkhash, payload[4:36])
200         payload = b2a_hex(payload).decode('ascii')
201         nexterr = 0
202         while True:
203                 try:
204                         rv = UpstreamBitcoindJSONRPC.submitblock(payload)
205                         break
206                 except:
207                         try:
208                                 rv = UpstreamBitcoindJSONRPC.getmemorypool(payload)
209                                 if rv is True:
210                                         rv = None
211                                 elif rv is False:
212                                         rv = 'rejected'
213                                 break
214                         except:
215                                 pass
216                         now = time()
217                         if now > nexterr:
218                                 # FIXME: This will show "Method not found" on pre-BIP22 servers
219                                 RaiseRedFlags(traceback.format_exc())
220                                 nexterr = now + 5
221                         if MM.currentBlock[0] not in myblock:
222                                 RaiseRedFlags('Giving up on submitting block upstream')
223                                 return
224         if rv:
225                 # FIXME: The returned value could be a list of multiple responses
226                 RaiseRedFlags('Upstream block submission failed: %s' % (rv,))
227
228 _STA = '%064x' % (config.ShareTarget,)
229 def checkShare(share):
230         shareTime = share['time'] = time()
231         
232         data = share['data']
233         data = data[:80]
234         (prevBlock, height, bits) = MM.currentBlock
235         sharePrevBlock = data[4:36]
236         if sharePrevBlock != prevBlock:
237                 if sharePrevBlock == MM.lastBlock[0]:
238                         raise RejectedShare('stale-prevblk')
239                 raise RejectedShare('bad-prevblk')
240         
241         # TODO: use userid
242         username = share['username']
243         if username not in workLog:
244                 raise RejectedShare('unknown-user')
245         
246         if data[72:76] != bits:
247                 raise RejectedShare('bad-diffbits')
248         
249         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
250         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
251         if data[1:4] != b'\0\0\0' or data[0] > 2:
252                 raise RejectedShare('bad-version')
253         
254         shareMerkleRoot = data[36:68]
255         if 'blkdata' in share:
256                 pl = share['blkdata']
257                 (txncount, pl) = varlenDecode(pl)
258                 cbtxn = bitcoin.txn.Txn(pl)
259                 cbtxn.disassemble(retExtra=True)
260                 coinbase = cbtxn.getCoinbase()
261                 wliPos = coinbase[0] + 2
262                 wliLen = coinbase[wliPos - 1]
263                 wli = coinbase[wliPos:wliPos+wliLen]
264                 mode = 'MC'
265                 moden = 1
266         else:
267                 wli = shareMerkleRoot
268                 mode = 'MRD'
269                 moden = 0
270         
271         MWL = workLog[username]
272         if wli not in MWL:
273                 raise RejectedShare('unknown-work')
274         (wld, issueT) = MWL[wli]
275         share[mode] = wld
276         
277         if data in DupeShareHACK:
278                 raise RejectedShare('duplicate')
279         DupeShareHACK[data] = None
280         
281         blkhash = dblsha(data)
282         if blkhash[28:] != b'\0\0\0\0':
283                 raise RejectedShare('H-not-zero')
284         blkhashn = hash2int(blkhash)
285         
286         global networkTarget
287         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
288         logfunc('BLKHASH: %64x' % (blkhashn,))
289         logfunc(' TARGET: %64x' % (networkTarget,))
290         
291         workMerkleTree = wld[1]
292         workCoinbase = wld[2]
293         
294         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
295         txlist = workMerkleTree.data
296         txlist = [deepcopy(txlist[0]),] + txlist[1:]
297         cbtxn = txlist[0]
298         cbtxn.setCoinbase(workCoinbase)
299         cbtxn.assemble()
300         
301         if blkhashn <= networkTarget:
302                 logfunc("Submitting upstream")
303                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
304                 if not moden:
305                         payload = assembleBlock(data, txlist)
306                 else:
307                         payload = share['data'] + share['blkdata']
308                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
309                 RBPs.append(payload)
310                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash)).start()
311                 bcnode.submitBlock(payload)
312                 share['upstreamResult'] = True
313                 MM.updateBlock(blkhash)
314         
315         # Gotwork hack...
316         if gotwork and blkhashn <= config.GotWorkTarget:
317                 try:
318                         coinbaseMrkl = cbtxn.data
319                         coinbaseMrkl += blkhash
320                         steps = workMerkleTree._steps
321                         coinbaseMrkl += pack('B', len(steps))
322                         for step in steps:
323                                 coinbaseMrkl += step
324                         coinbaseMrkl += b"\0\0\0\0"
325                         info = {}
326                         info['hash'] = b2a_hex(blkhash).decode('ascii')
327                         info['header'] = b2a_hex(data).decode('ascii')
328                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
329                         thr = threading.Thread(target=submitGotwork, args=(info,))
330                         thr.daemon = True
331                         thr.start()
332                 except:
333                         checkShare.logger.warning('Failed to build gotwork request')
334         
335         if blkhashn > config.ShareTarget:
336                 raise RejectedShare('high-hash')
337         share['target'] = config.ShareTarget
338         share['_targethex'] = _STA
339         
340         shareTimestamp = unpack('<L', data[68:72])[0]
341         if shareTime < issueT - 120:
342                 raise RejectedShare('stale-work')
343         if shareTimestamp < shareTime - 300:
344                 raise RejectedShare('time-too-old')
345         if shareTimestamp > shareTime + 7200:
346                 raise RejectedShare('time-too-new')
347         
348         if moden:
349                 cbpre = cbtxn.getCoinbase()
350                 cbpreLen = len(cbpre)
351                 if coinbase[:cbpreLen] != cbpre:
352                         raise RejectedShare('bad-cb-prefix')
353                 
354                 # Filter out known "I support" flags, to prevent exploits
355                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
356                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
357                                 raise RejectedShare('bad-cb-flag')
358                 
359                 if len(coinbase) > 100:
360                         raise RejectedShare('bad-cb-length')
361                 
362                 cbtxn.setCoinbase(coinbase)
363                 cbtxn.assemble()
364                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
365                         raise RejectedShare('bad-txnmrklroot')
366                 
367                 allowed = assembleBlock(data, txlist)
368                 if allowed != share['data'] + share['blkdata']:
369                         raise RejectedShare('bad-txns')
370 checkShare.logger = logging.getLogger('checkShare')
371
372 def receiveShare(share):
373         # TODO: username => userid
374         try:
375                 checkShare(share)
376         except RejectedShare as rej:
377                 share['rejectReason'] = str(rej)
378                 raise
379         finally:
380                 if '_origdata' in share:
381                         share['solution'] = share['_origdata']
382                 else:
383                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
384                 for i in loggersShare:
385                         i(share)
386
387 def newBlockNotification():
388         logging.getLogger('newBlockNotification').info('Received new block notification')
389         MM.updateMerkleTree()
390         # TODO: Force RESPOND TO LONGPOLLS?
391         pass
392
393 def newBlockNotificationSIGNAL(signum, frame):
394         # Use a new thread, in case the signal handler is called with locks held
395         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
396         thr.daemon = True
397         thr.start()
398
399 from signal import signal, SIGUSR1
400 signal(SIGUSR1, newBlockNotificationSIGNAL)
401
402
403 import os
404 import os.path
405 import pickle
406 import signal
407 import sys
408 from time import sleep
409 import traceback
410
411 SAVE_STATE_FILENAME = 'eloipool.worklog'
412
413 def stopServers():
414         logger = logging.getLogger('stopServers')
415         
416         if hasattr(stopServers, 'already'):
417                 logger.debug('Already tried to stop servers before')
418                 return
419         stopServers.already = True
420         
421         logger.info('Stopping servers...')
422         global bcnode, server
423         servers = (bcnode, server)
424         for s in servers:
425                 s.keepgoing = False
426         for s in servers:
427                 try:
428                         s.wakeup()
429                 except:
430                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
431         i = 0
432         while True:
433                 sl = []
434                 for s in servers:
435                         if s.running:
436                                 sl.append(s.__class__.__name__)
437                 if not sl:
438                         break
439                 i += 1
440                 if i >= 0x100:
441                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
442                         break
443                 sleep(0.01)
444         
445         for s in servers:
446                 for fd in s._fd.keys():
447                         os.close(fd)
448
449 def saveState(t = None):
450         logger = logging.getLogger('saveState')
451         
452         # Then, save data needed to resume work
453         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
454         i = 0
455         while True:
456                 try:
457                         with open(SAVE_STATE_FILENAME, 'wb') as f:
458                                 pickle.dump(t, f)
459                                 pickle.dump(DupeShareHACK, f)
460                                 pickle.dump(workLog, f)
461                         break
462                 except:
463                         i += 1
464                         if i >= 0x10000:
465                                 logger.error('Failed to save work\n' + traceback.format_exc())
466                                 try:
467                                         os.unlink(SAVE_STATE_FILENAME)
468                                 except:
469                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
470
471 def exit():
472         t = time()
473         stopServers()
474         saveState(t)
475         logging.getLogger('exit').info('Goodbye...')
476         os.kill(os.getpid(), signal.SIGTERM)
477         sys.exit(0)
478
479 def restart():
480         t = time()
481         stopServers()
482         saveState(t)
483         logging.getLogger('restart').info('Restarting...')
484         try:
485                 os.execv(sys.argv[0], sys.argv)
486         except:
487                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
488
489 def restoreState():
490         if not os.path.exists(SAVE_STATE_FILENAME):
491                 return
492         
493         global workLog, DupeShareHACK
494         
495         logger = logging.getLogger('restoreState')
496         s = os.stat(SAVE_STATE_FILENAME)
497         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
498         try:
499                 with open(SAVE_STATE_FILENAME, 'rb') as f:
500                         t = pickle.load(f)
501                         if type(t) == tuple:
502                                 if len(t) > 2:
503                                         # Future formats, not supported here
504                                         ver = t[3]
505                                         # TODO
506                                 
507                                 # Old format, from 2012-02-02 to 2012-02-03
508                                 workLog = t[0]
509                                 DupeShareHACK = t[1]
510                                 t = None
511                         else:
512                                 if isinstance(t, dict):
513                                         # Old format, from 2012-02-03 to 2012-02-03
514                                         DupeShareHACK = t
515                                         t = None
516                                 else:
517                                         # Current format, from 2012-02-03 onward
518                                         DupeShareHACK = pickle.load(f)
519                                 
520                                 if t + 120 >= time():
521                                         workLog = pickle.load(f)
522                                 else:
523                                         logger.debug('Skipping restore of expired workLog')
524         except:
525                 logger.error('Failed to restore state\n' + traceback.format_exc())
526                 return
527         logger.info('State restored successfully')
528         if t:
529                 logger.info('Total downtime: %g seconds' % (time() - t,))
530
531
532 from jsonrpcserver import JSONRPCListener, JSONRPCServer
533 import interactivemode
534 from networkserver import NetworkListener
535 import threading
536 import sharelogging
537 import imp
538
539 if __name__ == "__main__":
540         if not hasattr(config, 'ShareLogging'):
541                 config.ShareLogging = ()
542         if hasattr(config, 'DbOptions'):
543                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
544                 config.ShareLogging = list(config.ShareLogging)
545                 config.ShareLogging.append( {
546                         'type': 'sql',
547                         'engine': 'postgres',
548                         'dbopts': config.DbOptions,
549                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
550                 } )
551         for i in config.ShareLogging:
552                 if not hasattr(i, 'keys'):
553                         name, parameters = i
554                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
555                         if name == 'postgres':
556                                 name = 'sql'
557                                 i = {
558                                         'engine': 'postgres',
559                                         'dbopts': parameters,
560                                 }
561                         elif name == 'logfile':
562                                 i = {}
563                                 i['thropts'] = parameters
564                                 if 'filename' in parameters:
565                                         i['filename'] = parameters['filename']
566                                         i['thropts'] = dict(i['thropts'])
567                                         del i['thropts']['filename']
568                         else:
569                                 i = parameters
570                         i['type'] = name
571                 
572                 name = i['type']
573                 parameters = i
574                 try:
575                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
576                         m = imp.load_module(name, fp, pathname, description)
577                         lo = getattr(m, name)(**parameters)
578                         loggersShare.append(lo.logShare)
579                 except:
580                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
581
582         LSbc = []
583         if not hasattr(config, 'BitcoinNodeAddresses'):
584                 config.BitcoinNodeAddresses = ()
585         for a in config.BitcoinNodeAddresses:
586                 LSbc.append(NetworkListener(bcnode, a))
587         
588         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
589                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
590         
591         import jsonrpc_getblocktemplate
592         import jsonrpc_getwork
593         import jsonrpc_setworkaux
594         
595         server = JSONRPCServer()
596         if hasattr(config, 'JSONRPCAddress'):
597                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
598                 if not hasattr(config, 'JSONRPCAddresses'):
599                         config.JSONRPCAddresses = []
600                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
601         LS = []
602         for a in config.JSONRPCAddresses:
603                 LS.append(JSONRPCListener(server, a))
604         if hasattr(config, 'SecretUser'):
605                 server.SecretUser = config.SecretUser
606         server.aux = MM.CoinbaseAux
607         server.getBlockHeader = getBlockHeader
608         server.getBlockTemplate = getBlockTemplate
609         server.receiveShare = receiveShare
610         server.RaiseRedFlags = RaiseRedFlags
611         server.ShareTarget = config.ShareTarget
612         
613         if hasattr(config, 'TrustedForwarders'):
614                 server.TrustedForwarders = config.TrustedForwarders
615         server.ServerName = config.ServerName
616         
617         MM.start()
618         
619         restoreState()
620         
621         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
622         prune_thr.daemon = True
623         prune_thr.start()
624         
625         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
626         bcnode_thr.daemon = True
627         bcnode_thr.start()
628         
629         server.serve_forever()