Fix MWL = workLog[None]: KeyError: None
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # Portions written by Peter Leurs <kinlo@triplemining.com>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Affero General Public License for more details.
15 #
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import argparse
20 import importlib
21 argparser = argparse.ArgumentParser()
22 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
23 args = argparser.parse_args()
24 configmod = 'config'
25 if not args.config is None:
26         configmod = 'config_%s' % (args.config,)
27 __import__(configmod)
28 config = importlib.import_module(configmod)
29
30 if not hasattr(config, 'ServerName'):
31         config.ServerName = 'Unnamed Eloipool'
32
33 if not hasattr(config, 'ShareTarget'):
34         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
35
36
37 import logging
38 import logging.handlers
39
40 rootlogger = logging.getLogger(None)
41 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
42 logformatter = logging.Formatter(logformat)
43 if len(rootlogger.handlers) == 0:
44         logging.basicConfig(
45                 format=logformat,
46                 level=logging.DEBUG,
47         )
48         for infoOnly in (
49                 'checkShare',
50                 'getTarget',
51                 'JSONRPCHandler',
52                 'JSONRPCServer',
53                 'merkleMaker',
54                 'StratumServer',
55                 'Waker for JSONRPCServer',
56                 'Waker for StratumServer',
57                 'WorkLogPruner'
58         ):
59                 logging.getLogger(infoOnly).setLevel(logging.INFO)
60 if getattr(config, 'LogToSysLog', False):
61         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
62         rootlogger.addHandler(sysloghandler)
63 if hasattr(config, 'LogFile'):
64         if isinstance(config.LogFile, str):
65                 filehandler = logging.FileHandler(config.LogFile)
66         else:
67                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
68         filehandler.setFormatter(logformatter)
69         rootlogger.addHandler(filehandler)
70
71 def RaiseRedFlags(reason):
72         logging.getLogger('redflag').critical(reason)
73         return reason
74
75
76 from bitcoin.node import BitcoinLink, BitcoinNode
77 bcnode = BitcoinNode(config.UpstreamNetworkId)
78 bcnode.userAgent += b'Eloipool:0.1/'
79 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80
81 import jsonrpc
82
83 try:
84         import jsonrpc.authproxy
85         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
86 except:
87         pass
88
89
90 from bitcoin.script import BitcoinScript
91 from bitcoin.txn import Txn
92 from base58 import b58decode
93 from binascii import b2a_hex
94 from struct import pack
95 import subprocess
96 from time import time
97
98 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True, prevBlockHex = None):
99         txn = Txn.new()
100         
101         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
102                 coinbased = 0
103                 try:
104                         cmd = config.CoinbaserCmd
105                         cmd = cmd.replace('%d', str(coinbaseValue))
106                         cmd = cmd.replace('%p', prevBlockHex or '""')
107                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
108                         nout = int(p.stdout.readline())
109                         for i in range(nout):
110                                 amount = int(p.stdout.readline())
111                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
112                                 pkScript = BitcoinScript.toAddress(addr)
113                                 txn.addOutput(amount, pkScript)
114                                 coinbased += amount
115                 except:
116                         coinbased = coinbaseValue + 1
117                 if coinbased >= coinbaseValue:
118                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
119                         txn.outputs = []
120                 else:
121                         coinbaseValue -= coinbased
122         
123         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
124         txn.addOutput(coinbaseValue, pkScript)
125         
126         # TODO
127         # TODO: red flag on dupe coinbase
128         return txn
129
130
131 import jsonrpc_getwork
132 from util import Bits2Target
133
134 workLog = {}
135 userStatus = {}
136 networkTarget = None
137 DupeShareHACK = {}
138
139 server = None
140 stratumsrv = None
141 def updateBlocks():
142         server.wakeLongpoll()
143         stratumsrv.updateJob()
144
145 def blockChanged():
146         global MM, networkTarget, server
147         bits = MM.currentBlock[2]
148         if bits is None:
149                 networkTarget = None
150         else:
151                 networkTarget = Bits2Target(bits)
152         if MM.lastBlock != (None, None, None):
153                 global DupeShareHACK
154                 DupeShareHACK = {}
155                 jsonrpc_getwork._CheckForDupesHACK = {}
156                 workLog.clear()
157         server.wakeLongpoll(wantClear=True)
158         stratumsrv.updateJob(wantClear=True)
159
160
161 from time import sleep, time
162 import traceback
163
164 def _WorkLogPruner_I(wl):
165         now = time()
166         pruned = 0
167         for username in wl:
168                 userwork = wl[username]
169                 for wli in tuple(userwork.keys()):
170                         if now > userwork[wli][1] + 120:
171                                 del userwork[wli]
172                                 pruned += 1
173         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
174
175 def WorkLogPruner(wl):
176         while True:
177                 try:
178                         sleep(60)
179                         _WorkLogPruner_I(wl)
180                 except:
181                         WorkLogPruner.logger.error(traceback.format_exc())
182 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
183
184
185 from merklemaker import merkleMaker
186 MM = merkleMaker()
187 MM.__dict__.update(config.__dict__)
188 MM.makeCoinbaseTxn = makeCoinbaseTxn
189 MM.onBlockChange = blockChanged
190 MM.onBlockUpdate = updateBlocks
191
192
193 from binascii import b2a_hex
194 from copy import deepcopy
195 from math import ceil, log
196 from merklemaker import MakeBlockHeader
197 from struct import pack, unpack
198 import threading
199 from time import time
200 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
201 import jsonrpc
202 import traceback
203
204 gotwork = None
205 if hasattr(config, 'GotWorkURI'):
206         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
207
208 if not hasattr(config, 'DelayLogForUpstream'):
209         config.DelayLogForUpstream = False
210
211 if not hasattr(config, 'DynamicTargetting'):
212         config.DynamicTargetting = 0
213 else:
214         if not hasattr(config, 'DynamicTargetWindow'):
215                 config.DynamicTargetWindow = 120
216         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
217
218 def submitGotwork(info):
219         try:
220                 gotwork.gotwork(info)
221         except:
222                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
223
224 if not hasattr(config, 'GotWorkTarget'):
225         config.GotWorkTarget = 0
226
227 def clampTarget(target, DTMode):
228         # ShareTarget is the minimum
229         if target is None or target > config.ShareTarget:
230                 target = config.ShareTarget
231         
232         # Never target above upstream(s), as we'd lose blocks
233         target = max(target, networkTarget, config.GotWorkTarget)
234         
235         if DTMode == 2:
236                 # Ceil target to a power of two :)
237                 truebits = log(target, 2)
238                 if target <= 2**int(truebits):
239                         # Workaround for bug in Python's math.log function
240                         truebits = int(truebits)
241                 target = 2**ceil(truebits) - 1
242         elif DTMode == 3:
243                 # Round target to multiple of bdiff 1
244                 target = bdiff1target / int(round(target2bdiff(target)))
245         
246         # Return None for ShareTarget to save memory
247         if target == config.ShareTarget:
248                 return None
249         return target
250
251 def getTarget(username, now, DTMode = None, RequestedTarget = None):
252         if DTMode is None:
253                 DTMode = config.DynamicTargetting
254         if not DTMode:
255                 return None
256         if username in userStatus:
257                 status = userStatus[username]
258         else:
259                 # No record, use default target
260                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
261                 userStatus[username] = [RequestedTarget, now, 0]
262                 return RequestedTarget
263         (targetIn, lastUpdate, work) = status
264         if work <= config.DynamicTargetGoal:
265                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
266                         # No reason to change it just yet
267                         return clampTarget(targetIn, DTMode)
268                 if not work:
269                         # No shares received, reset to minimum
270                         if targetIn:
271                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
272                                 userStatus[username] = [None, now, 0]
273                         return clampTarget(None, DTMode)
274         
275         deltaSec = now - lastUpdate
276         target = targetIn or config.ShareTarget
277         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
278         target = clampTarget(target, DTMode)
279         if target != targetIn:
280                 pfx = 'Retargetting %s' % (repr(username),)
281                 tin = targetIn or config.ShareTarget
282                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
283                 tgt = target or config.ShareTarget
284                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
285         userStatus[username] = [target, now, 0]
286         return target
287 getTarget.logger = logging.getLogger('getTarget')
288
289 def TopTargets(n = 0x10):
290         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
291         tmp.sort(key=lambda k: -userStatus[k][0])
292         tmp2 = {}
293         def t2d(t):
294                 if t not in tmp2:
295                         tmp2[t] = target2pdiff(t)
296                 return tmp2[t]
297         for k in tmp[-n:]:
298                 tgt = userStatus[k][0]
299                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
300
301 def RegisterWork(username, wli, wld, RequestedTarget = None):
302         now = time()
303         target = getTarget(username, now, RequestedTarget=RequestedTarget)
304         wld = tuple(wld) + (target,)
305         workLog.setdefault(username, {})[wli] = (wld, now)
306         return target or config.ShareTarget
307
308 def getBlockHeader(username):
309         MRD = MM.getMRD()
310         merkleRoot = MRD[0]
311         hdr = MakeBlockHeader(MRD)
312         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
313         target = RegisterWork(username, merkleRoot, MRD)
314         return (hdr, workLog[username][merkleRoot], target)
315
316 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
317         if server.tls.wantClear:
318                 wantClear = True
319         elif p_magic and username not in workLog:
320                 wantClear = True
321                 p_magic[0] = True
322         else:
323                 wantClear = False
324         MC = MM.getMC(wantClear)
325         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
326         wliPos = coinbase[0] + 2
327         wliLen = coinbase[wliPos - 1]
328         wli = coinbase[wliPos:wliPos+wliLen]
329         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
330         return (MC, workLog[username][wli], target)
331
332 def getStratumJob(jobid, wantClear = False):
333         MC = MM.getMC(wantClear)
334         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
335         now = time()
336         workLog.setdefault(None, {})[jobid] = (MC, now)
337         return (MC, workLog[None][jobid])
338
339 def getExistingStratumJob(jobid):
340         wld = workLog[None][jobid]
341         return (wld[0], wld)
342
343 loggersShare = []
344 authenticators = []
345
346 RBDs = []
347 RBPs = []
348
349 from bitcoin.varlen import varlenEncode, varlenDecode
350 import bitcoin.txn
351 from merklemaker import assembleBlock
352
353 if not hasattr(config, 'BlockSubmissions'):
354         config.BlockSubmissions = None
355
356 RBFs = []
357 def blockSubmissionThread(payload, blkhash, share):
358         if config.BlockSubmissions is None:
359                 servers = list(a for b in MM.TemplateSources for a in b)
360         else:
361                 servers = list(config.BlockSubmissions)
362         
363         if hasattr(share['merkletree'], 'source_uri'):
364                 servers.insert(0, {
365                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
366                         'name': share['merkletree'].source,
367                 })
368         elif not servers:
369                 servers = list(a for b in MM.TemplateSources for a in b)
370         
371         myblock = (blkhash, payload[4:36])
372         payload = b2a_hex(payload).decode('ascii')
373         nexterr = 0
374         tries = 0
375         success = False
376         while len(servers):
377                 tries += 1
378                 TS = servers.pop(0)
379                 UpstreamBitcoindJSONRPC = TS['access']
380                 try:
381                         # BIP 22 standard submitblock
382                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
383                 except BaseException as gbterr:
384                         gbterr_fmt = traceback.format_exc()
385                         try:
386                                 try:
387                                         # bitcoind 0.5/0.6 getmemorypool
388                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
389                                 except:
390                                         # Old BIP 22 draft getmemorypool
391                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
392                                 if reason is True:
393                                         reason = None
394                                 elif reason is False:
395                                         reason = 'rejected'
396                         except BaseException as gmperr:
397                                 now = time()
398                                 if now > nexterr:
399                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
400                                         RaiseRedFlags(gbterr_fmt)
401                                         nexterr = now + 5
402                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
403                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
404                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
405                                         if share['upstreamRejectReason'] is PendingUpstream:
406                                                 share['upstreamRejectReason'] = 'GAVE UP'
407                                                 share['upstreamResult'] = False
408                                                 logShare(share)
409                                         return
410                                 
411                                 servers.append(TS)
412                                 continue
413                 
414                 # At this point, we have a reason back
415                 if reason:
416                         # FIXME: The returned value could be a list of multiple responses
417                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
418                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
419                                 # no big deal
420                                 blockSubmissionThread.logger.debug(msg)
421                         else:
422                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
423                                 RaiseRedFlags(msg)
424                 else:
425                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
426                         success = True
427                 if share['upstreamRejectReason'] is PendingUpstream:
428                         share['upstreamRejectReason'] = reason
429                         share['upstreamResult'] = not reason
430                         logShare(share)
431 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
432
433 def checkData(share):
434         data = share['data']
435         data = data[:80]
436         (prevBlock, height, bits) = MM.currentBlock
437         sharePrevBlock = data[4:36]
438         if sharePrevBlock != prevBlock:
439                 if sharePrevBlock == MM.lastBlock[0]:
440                         raise RejectedShare('stale-prevblk')
441                 raise RejectedShare('bad-prevblk')
442         
443         if data[72:76] != bits:
444                 raise RejectedShare('bad-diffbits')
445         
446         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
447         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
448         if data[1:4] != b'\0\0\0' or data[0] > 2:
449                 raise RejectedShare('bad-version')
450
451 def buildStratumData(share, merkleroot):
452         (prevBlock, height, bits) = MM.currentBlock
453         
454         data = b'\x02\0\0\0'
455         data += prevBlock
456         data += merkleroot
457         data += share['ntime'][::-1]
458         data += bits
459         data += share['nonce'][::-1]
460         
461         share['data'] = data
462         return data
463
464 def IsJobValid(wli, wluser = None):
465         if wluser not in workLog:
466                 return False
467         if wli not in workLog[wluser]:
468                 return False
469         (wld, issueT) = workLog[wluser][wli]
470         if time() < issueT - 120:
471                 return False
472         return True
473
474 def checkShare(share):
475         shareTime = share['time'] = time()
476         
477         username = share['username']
478         if 'data' in share:
479                 # getwork/GBT
480                 checkData(share)
481                 data = share['data']
482                 
483                 if username not in workLog:
484                         raise RejectedShare('unknown-user')
485                 MWL = workLog[username]
486                 
487                 shareMerkleRoot = data[36:68]
488                 if 'blkdata' in share:
489                         pl = share['blkdata']
490                         (txncount, pl) = varlenDecode(pl)
491                         cbtxn = bitcoin.txn.Txn(pl)
492                         othertxndata = cbtxn.disassemble(retExtra=True)
493                         coinbase = cbtxn.getCoinbase()
494                         wliPos = coinbase[0] + 2
495                         wliLen = coinbase[wliPos - 1]
496                         wli = coinbase[wliPos:wliPos+wliLen]
497                         mode = 'MC'
498                         moden = 1
499                 else:
500                         wli = shareMerkleRoot
501                         mode = 'MRD'
502                         moden = 0
503                         coinbase = None
504         else:
505                 # Stratum
506                 wli = share['jobid']
507                 buildStratumData(share, b'\0' * 32)
508                 mode = 'MC'
509                 moden = 1
510                 othertxndata = b''
511                 if None not in workLog:
512                         # We haven't yet sent any stratum work for this block
513                         raise RejectedShare('unknown-work')
514                 MWL = workLog[None]
515         
516         if wli not in MWL:
517                 raise RejectedShare('unknown-work')
518         (wld, issueT) = MWL[wli]
519         share[mode] = wld
520         
521         share['issuetime'] = issueT
522         
523         (workMerkleTree, workCoinbase) = wld[1:3]
524         share['merkletree'] = workMerkleTree
525         if 'jobid' in share:
526                 cbtxn = deepcopy(workMerkleTree.data[0])
527                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
528                 cbtxn.setCoinbase(coinbase)
529                 cbtxn.assemble()
530                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
531                 shareMerkleRoot = data[36:68]
532         
533         if data in DupeShareHACK:
534                 raise RejectedShare('duplicate')
535         DupeShareHACK[data] = None
536         
537         blkhash = dblsha(data)
538         if blkhash[28:] != b'\0\0\0\0':
539                 raise RejectedShare('H-not-zero')
540         blkhashn = LEhash2int(blkhash)
541         
542         global networkTarget
543         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
544         logfunc('BLKHASH: %64x' % (blkhashn,))
545         logfunc(' TARGET: %64x' % (networkTarget,))
546         
547         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
548         txlist = workMerkleTree.data
549         txlist = [deepcopy(txlist[0]),] + txlist[1:]
550         cbtxn = txlist[0]
551         cbtxn.setCoinbase(coinbase or workCoinbase)
552         cbtxn.assemble()
553         
554         if blkhashn <= networkTarget:
555                 logfunc("Submitting upstream")
556                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
557                 if not moden:
558                         payload = assembleBlock(data, txlist)
559                 else:
560                         payload = share['data']
561                         if len(othertxndata):
562                                 payload += share['blkdata']
563                         else:
564                                 payload += assembleBlock(data, txlist)[80:]
565                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
566                 RBPs.append(payload)
567                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
568                 bcnode.submitBlock(payload)
569                 if config.DelayLogForUpstream:
570                         share['upstreamRejectReason'] = PendingUpstream
571                 else:
572                         share['upstreamRejectReason'] = None
573                         share['upstreamResult'] = True
574                 MM.updateBlock(blkhash)
575         
576         # Gotwork hack...
577         if gotwork and blkhashn <= config.GotWorkTarget:
578                 try:
579                         coinbaseMrkl = cbtxn.data
580                         coinbaseMrkl += blkhash
581                         steps = workMerkleTree._steps
582                         coinbaseMrkl += pack('B', len(steps))
583                         for step in steps:
584                                 coinbaseMrkl += step
585                         coinbaseMrkl += b"\0\0\0\0"
586                         info = {}
587                         info['hash'] = b2a_hex(blkhash).decode('ascii')
588                         info['header'] = b2a_hex(data).decode('ascii')
589                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
590                         thr = threading.Thread(target=submitGotwork, args=(info,))
591                         thr.daemon = True
592                         thr.start()
593                 except:
594                         checkShare.logger.warning('Failed to build gotwork request')
595         
596         if 'target' in share:
597                 workTarget = share['target']
598         elif len(wld) > 6:
599                 workTarget = wld[6]
600         else:
601                 workTarget = None
602         
603         if workTarget is None:
604                 workTarget = config.ShareTarget
605         if blkhashn > workTarget:
606                 raise RejectedShare('high-hash')
607         share['target'] = workTarget
608         share['_targethex'] = '%064x' % (workTarget,)
609         
610         shareTimestamp = unpack('<L', data[68:72])[0]
611         if shareTime < issueT - 120:
612                 raise RejectedShare('stale-work')
613         if shareTimestamp < shareTime - 300:
614                 raise RejectedShare('time-too-old')
615         if shareTimestamp > shareTime + 7200:
616                 raise RejectedShare('time-too-new')
617         
618         if moden:
619                 cbpre = workCoinbase
620                 cbpreLen = len(cbpre)
621                 if coinbase[:cbpreLen] != cbpre:
622                         raise RejectedShare('bad-cb-prefix')
623                 
624                 # Filter out known "I support" flags, to prevent exploits
625                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
626                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
627                                 raise RejectedShare('bad-cb-flag')
628                 
629                 if len(coinbase) > 100:
630                         raise RejectedShare('bad-cb-length')
631                 
632                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
633                         raise RejectedShare('bad-txnmrklroot')
634                 
635                 if len(othertxndata):
636                         allowed = assembleBlock(data, txlist)[80:]
637                         if allowed != share['blkdata']:
638                                 raise RejectedShare('bad-txns')
639         
640         if config.DynamicTargetting and username in userStatus:
641                 # NOTE: userStatus[username] only doesn't exist across restarts
642                 status = userStatus[username]
643                 target = status[0] or config.ShareTarget
644                 if target == workTarget:
645                         userStatus[username][2] += 1
646                 else:
647                         userStatus[username][2] += float(target) / workTarget
648 checkShare.logger = logging.getLogger('checkShare')
649
650 def logShare(share):
651         if '_origdata' in share:
652                 share['solution'] = share['_origdata']
653         else:
654                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
655         for i in loggersShare:
656                 i.logShare(share)
657
658 def checkAuthentication(username, password):
659         # HTTPServer uses bytes, and StratumServer uses str
660         if hasattr(username, 'decode'): username = username.decode('utf8')
661         if hasattr(password, 'decode'): password = password.decode('utf8')
662         
663         for i in authenticators:
664                 if i.checkAuthentication(username, password):
665                         return True
666         return False
667
668 def receiveShare(share):
669         # TODO: username => userid
670         try:
671                 checkShare(share)
672         except RejectedShare as rej:
673                 share['rejectReason'] = str(rej)
674                 raise
675         except BaseException as e:
676                 share['rejectReason'] = 'ERROR'
677                 raise
678         finally:
679                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
680                         logShare(share)
681
682 def newBlockNotification():
683         logging.getLogger('newBlockNotification').info('Received new block notification')
684         MM.updateMerkleTree()
685         # TODO: Force RESPOND TO LONGPOLLS?
686         pass
687
688 def newBlockNotificationSIGNAL(signum, frame):
689         # Use a new thread, in case the signal handler is called with locks held
690         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
691         thr.daemon = True
692         thr.start()
693
694 from signal import signal, SIGUSR1
695 signal(SIGUSR1, newBlockNotificationSIGNAL)
696
697
698 import os
699 import os.path
700 import pickle
701 import signal
702 import sys
703 from time import sleep
704 import traceback
705
706 if getattr(config, 'SaveStateFilename', None) is None:
707         config.SaveStateFilename = 'eloipool.worklog'
708
709 def stopServers():
710         logger = logging.getLogger('stopServers')
711         
712         if hasattr(stopServers, 'already'):
713                 logger.debug('Already tried to stop servers before')
714                 return
715         stopServers.already = True
716         
717         logger.info('Stopping servers...')
718         global bcnode, server
719         servers = (bcnode, server, stratumsrv)
720         for s in servers:
721                 s.keepgoing = False
722         for s in servers:
723                 try:
724                         s.wakeup()
725                 except:
726                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
727         i = 0
728         while True:
729                 sl = []
730                 for s in servers:
731                         if s.running:
732                                 sl.append(s.__class__.__name__)
733                 if not sl:
734                         break
735                 i += 1
736                 if i >= 0x100:
737                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
738                         break
739                 sleep(0.01)
740         
741         for s in servers:
742                 for fd in s._fd.keys():
743                         os.close(fd)
744
745 def stopLoggers():
746         for i in loggersShare:
747                 if hasattr(i, 'stop'):
748                         i.stop()
749
750 def saveState(SAVE_STATE_FILENAME, t = None):
751         logger = logging.getLogger('saveState')
752         
753         # Then, save data needed to resume work
754         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
755         i = 0
756         while True:
757                 try:
758                         with open(SAVE_STATE_FILENAME, 'wb') as f:
759                                 pickle.dump(t, f)
760                                 pickle.dump(DupeShareHACK, f)
761                                 pickle.dump(workLog, f)
762                         break
763                 except:
764                         i += 1
765                         if i >= 0x10000:
766                                 logger.error('Failed to save work\n' + traceback.format_exc())
767                                 try:
768                                         os.unlink(SAVE_STATE_FILENAME)
769                                 except:
770                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
771
772 def exit():
773         t = time()
774         stopServers()
775         stopLoggers()
776         saveState(config.SaveStateFilename, t=t)
777         logging.getLogger('exit').info('Goodbye...')
778         os.kill(os.getpid(), signal.SIGTERM)
779         sys.exit(0)
780
781 def restart():
782         t = time()
783         stopServers()
784         stopLoggers()
785         saveState(config.SaveStateFilename, t=t)
786         logging.getLogger('restart').info('Restarting...')
787         try:
788                 os.execv(sys.argv[0], sys.argv)
789         except:
790                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
791
792 def restoreState(SAVE_STATE_FILENAME):
793         if not os.path.exists(SAVE_STATE_FILENAME):
794                 return
795         
796         global workLog, DupeShareHACK
797         
798         logger = logging.getLogger('restoreState')
799         s = os.stat(SAVE_STATE_FILENAME)
800         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
801         try:
802                 with open(SAVE_STATE_FILENAME, 'rb') as f:
803                         t = pickle.load(f)
804                         if type(t) == tuple:
805                                 if len(t) > 2:
806                                         # Future formats, not supported here
807                                         ver = t[3]
808                                         # TODO
809                                 
810                                 # Old format, from 2012-02-02 to 2012-02-03
811                                 workLog = t[0]
812                                 DupeShareHACK = t[1]
813                                 t = None
814                         else:
815                                 if isinstance(t, dict):
816                                         # Old format, from 2012-02-03 to 2012-02-03
817                                         DupeShareHACK = t
818                                         t = None
819                                 else:
820                                         # Current format, from 2012-02-03 onward
821                                         DupeShareHACK = pickle.load(f)
822                                 
823                                 if t + 120 >= time():
824                                         workLog = pickle.load(f)
825                                 else:
826                                         logger.debug('Skipping restore of expired workLog')
827         except:
828                 logger.error('Failed to restore state\n' + traceback.format_exc())
829                 return
830         logger.info('State restored successfully')
831         if t:
832                 logger.info('Total downtime: %g seconds' % (time() - t,))
833
834
835 from jsonrpcserver import JSONRPCListener, JSONRPCServer
836 import interactivemode
837 from networkserver import NetworkListener
838 import threading
839 import sharelogging
840 import authentication
841 from stratumserver import StratumServer
842 import imp
843
844 if __name__ == "__main__":
845         if not hasattr(config, 'ShareLogging'):
846                 config.ShareLogging = ()
847         if hasattr(config, 'DbOptions'):
848                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
849                 config.ShareLogging = list(config.ShareLogging)
850                 config.ShareLogging.append( {
851                         'type': 'sql',
852                         'engine': 'postgres',
853                         'dbopts': config.DbOptions,
854                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
855                 } )
856         for i in config.ShareLogging:
857                 if not hasattr(i, 'keys'):
858                         name, parameters = i
859                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
860                         if name == 'postgres':
861                                 name = 'sql'
862                                 i = {
863                                         'engine': 'postgres',
864                                         'dbopts': parameters,
865                                 }
866                         elif name == 'logfile':
867                                 i = {}
868                                 i['thropts'] = parameters
869                                 if 'filename' in parameters:
870                                         i['filename'] = parameters['filename']
871                                         i['thropts'] = dict(i['thropts'])
872                                         del i['thropts']['filename']
873                         else:
874                                 i = parameters
875                         i['type'] = name
876                 
877                 name = i['type']
878                 parameters = i
879                 try:
880                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
881                         m = imp.load_module(name, fp, pathname, description)
882                         lo = getattr(m, name)(**parameters)
883                         loggersShare.append(lo)
884                 except:
885                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
886         
887         if not hasattr(config, 'Authentication'):
888                 config.Authentication = ({'module': 'allowall'},)
889         
890         for i in config.Authentication:
891                 name = i['module']
892                 parameters = i
893                 try:
894                         fp, pathname, description = imp.find_module(name, authentication.__path__)
895                         m = imp.load_module(name, fp, pathname, description)
896                         lo = getattr(m, name)(**parameters)
897                         authenticators.append(lo)
898                 except:
899                         logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
900         
901         LSbc = []
902         if not hasattr(config, 'BitcoinNodeAddresses'):
903                 config.BitcoinNodeAddresses = ()
904         for a in config.BitcoinNodeAddresses:
905                 LSbc.append(NetworkListener(bcnode, a))
906         
907         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
908                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
909         
910         import jsonrpc_getblocktemplate
911         import jsonrpc_getwork
912         import jsonrpc_setworkaux
913         
914         server = JSONRPCServer()
915         server.tls = threading.local()
916         server.tls.wantClear = False
917         if hasattr(config, 'JSONRPCAddress'):
918                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
919                 if not hasattr(config, 'JSONRPCAddresses'):
920                         config.JSONRPCAddresses = []
921                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
922         LS = []
923         for a in config.JSONRPCAddresses:
924                 LS.append(JSONRPCListener(server, a))
925         if hasattr(config, 'SecretUser'):
926                 server.SecretUser = config.SecretUser
927         server.aux = MM.CoinbaseAux
928         server.getBlockHeader = getBlockHeader
929         server.getBlockTemplate = getBlockTemplate
930         server.receiveShare = receiveShare
931         server.RaiseRedFlags = RaiseRedFlags
932         server.ShareTarget = config.ShareTarget
933         server.checkAuthentication = checkAuthentication
934         
935         if hasattr(config, 'TrustedForwarders'):
936                 server.TrustedForwarders = config.TrustedForwarders
937         server.ServerName = config.ServerName
938         
939         stratumsrv = StratumServer()
940         stratumsrv.getStratumJob = getStratumJob
941         stratumsrv.getExistingStratumJob = getExistingStratumJob
942         stratumsrv.receiveShare = receiveShare
943         stratumsrv.RaiseRedFlags = RaiseRedFlags
944         stratumsrv.getTarget = getTarget
945         stratumsrv.defaultTarget = config.ShareTarget
946         stratumsrv.IsJobValid = IsJobValid
947         stratumsrv.checkAuthentication = checkAuthentication
948         if not hasattr(config, 'StratumAddresses'):
949                 config.StratumAddresses = ()
950         for a in config.StratumAddresses:
951                 NetworkListener(stratumsrv, a)
952         
953         MM.start()
954         
955         restoreState(config.SaveStateFilename)
956         
957         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
958         prune_thr.daemon = True
959         prune_thr.start()
960         
961         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
962         bcnode_thr.daemon = True
963         bcnode_thr.start()
964         
965         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
966         stratum_thr.daemon = True
967         stratum_thr.start()
968         
969         server.serve_forever()