Implemented authentication modules, and added 2 modules: allowall and simplefile
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # Portions written by Peter Leurs <kinlo@triplemining.com>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Affero General Public License for more details.
15 #
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import argparse
20 import importlib
21 argparser = argparse.ArgumentParser()
22 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
23 args = argparser.parse_args()
24 configmod = 'config'
25 if not args.config is None:
26         configmod = 'config_%s' % (args.config,)
27 __import__(configmod)
28 config = importlib.import_module(configmod)
29
30 if not hasattr(config, 'ServerName'):
31         config.ServerName = 'Unnamed Eloipool'
32
33 if not hasattr(config, 'ShareTarget'):
34         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
35
36
37 import logging
38 import logging.handlers
39
40 rootlogger = logging.getLogger(None)
41 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
42 logformatter = logging.Formatter(logformat)
43 if len(rootlogger.handlers) == 0:
44         logging.basicConfig(
45                 format=logformat,
46                 level=logging.DEBUG,
47         )
48         for infoOnly in (
49                 'checkShare',
50                 'getTarget',
51                 'JSONRPCHandler',
52                 'JSONRPCServer',
53                 'merkleMaker',
54                 'StratumServer',
55                 'Waker for JSONRPCServer',
56                 'Waker for StratumServer',
57                 'WorkLogPruner'
58         ):
59                 logging.getLogger(infoOnly).setLevel(logging.INFO)
60 if getattr(config, 'LogToSysLog', False):
61         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
62         rootlogger.addHandler(sysloghandler)
63 if hasattr(config, 'LogFile'):
64         if isinstance(config.LogFile, str):
65                 filehandler = logging.FileHandler(config.LogFile)
66         else:
67                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
68         filehandler.setFormatter(logformatter)
69         rootlogger.addHandler(filehandler)
70
71 def RaiseRedFlags(reason):
72         logging.getLogger('redflag').critical(reason)
73         return reason
74
75
76 from bitcoin.node import BitcoinLink, BitcoinNode
77 bcnode = BitcoinNode(config.UpstreamNetworkId)
78 bcnode.userAgent += b'Eloipool:0.1/'
79 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80
81 import jsonrpc
82
83 try:
84         import jsonrpc.authproxy
85         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
86 except:
87         pass
88
89
90 from bitcoin.script import BitcoinScript
91 from bitcoin.txn import Txn
92 from base58 import b58decode
93 from struct import pack
94 import subprocess
95 from time import time
96
97 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
98         txn = Txn.new()
99         
100         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
101                 coinbased = 0
102                 try:
103                         cmd = config.CoinbaserCmd
104                         cmd = cmd.replace('%d', str(coinbaseValue))
105                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
106                         nout = int(p.stdout.readline())
107                         for i in range(nout):
108                                 amount = int(p.stdout.readline())
109                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
110                                 pkScript = BitcoinScript.toAddress(addr)
111                                 txn.addOutput(amount, pkScript)
112                                 coinbased += amount
113                 except:
114                         coinbased = coinbaseValue + 1
115                 if coinbased >= coinbaseValue:
116                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
117                         txn.outputs = []
118                 else:
119                         coinbaseValue -= coinbased
120         
121         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
122         txn.addOutput(coinbaseValue, pkScript)
123         
124         # TODO
125         # TODO: red flag on dupe coinbase
126         return txn
127
128
129 import jsonrpc_getwork
130 from util import Bits2Target
131
132 workLog = {}
133 userStatus = {}
134 networkTarget = None
135 DupeShareHACK = {}
136
137 server = None
138 stratumsrv = None
139 def updateBlocks():
140         server.wakeLongpoll()
141         stratumsrv.updateJob()
142
143 def blockChanged():
144         global MM, networkTarget, server
145         bits = MM.currentBlock[2]
146         if bits is None:
147                 networkTarget = None
148         else:
149                 networkTarget = Bits2Target(bits)
150         if MM.lastBlock != (None, None, None):
151                 global DupeShareHACK
152                 DupeShareHACK = {}
153                 jsonrpc_getwork._CheckForDupesHACK = {}
154                 workLog.clear()
155         server.wakeLongpoll(wantClear=True)
156         stratumsrv.updateJob(wantClear=True)
157
158
159 from time import sleep, time
160 import traceback
161
162 def _WorkLogPruner_I(wl):
163         now = time()
164         pruned = 0
165         for username in wl:
166                 userwork = wl[username]
167                 for wli in tuple(userwork.keys()):
168                         if now > userwork[wli][1] + 120:
169                                 del userwork[wli]
170                                 pruned += 1
171         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
172
173 def WorkLogPruner(wl):
174         while True:
175                 try:
176                         sleep(60)
177                         _WorkLogPruner_I(wl)
178                 except:
179                         WorkLogPruner.logger.error(traceback.format_exc())
180 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
181
182
183 from merklemaker import merkleMaker
184 MM = merkleMaker()
185 MM.__dict__.update(config.__dict__)
186 MM.makeCoinbaseTxn = makeCoinbaseTxn
187 MM.onBlockChange = blockChanged
188 MM.onBlockUpdate = updateBlocks
189
190
191 from binascii import b2a_hex
192 from copy import deepcopy
193 from math import ceil, log
194 from merklemaker import MakeBlockHeader
195 from struct import pack, unpack
196 import threading
197 from time import time
198 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
199 import jsonrpc
200 import traceback
201
202 gotwork = None
203 if hasattr(config, 'GotWorkURI'):
204         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
205
206 if not hasattr(config, 'DelayLogForUpstream'):
207         config.DelayLogForUpstream = False
208
209 if not hasattr(config, 'DynamicTargetting'):
210         config.DynamicTargetting = 0
211 else:
212         if not hasattr(config, 'DynamicTargetWindow'):
213                 config.DynamicTargetWindow = 120
214         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
215
216 def submitGotwork(info):
217         try:
218                 gotwork.gotwork(info)
219         except:
220                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
221
222 def clampTarget(target, DTMode):
223         # ShareTarget is the minimum
224         if target is None or target > config.ShareTarget:
225                 target = config.ShareTarget
226         
227         # Never target above the network, as we'd lose blocks
228         if target < networkTarget:
229                 target = networkTarget
230         
231         if DTMode == 2:
232                 # Ceil target to a power of two :)
233                 truebits = log(target, 2)
234                 if target <= 2**int(truebits):
235                         # Workaround for bug in Python's math.log function
236                         truebits = int(truebits)
237                 target = 2**ceil(truebits) - 1
238         elif DTMode == 3:
239                 # Round target to multiple of bdiff 1
240                 target = bdiff1target / int(round(target2bdiff(target)))
241         
242         # Return None for ShareTarget to save memory
243         if target == config.ShareTarget:
244                 return None
245         return target
246
247 def getTarget(username, now, DTMode = None, RequestedTarget = None):
248         if DTMode is None:
249                 DTMode = config.DynamicTargetting
250         if not DTMode:
251                 return None
252         if username in userStatus:
253                 status = userStatus[username]
254         else:
255                 # No record, use default target
256                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
257                 userStatus[username] = [RequestedTarget, now, 0]
258                 return RequestedTarget
259         (targetIn, lastUpdate, work) = status
260         if work <= config.DynamicTargetGoal:
261                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
262                         # No reason to change it just yet
263                         return clampTarget(targetIn, DTMode)
264                 if not work:
265                         # No shares received, reset to minimum
266                         if targetIn:
267                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
268                                 userStatus[username] = [None, now, 0]
269                         return clampTarget(None, DTMode)
270         
271         deltaSec = now - lastUpdate
272         target = targetIn or config.ShareTarget
273         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
274         target = clampTarget(target, DTMode)
275         if target != targetIn:
276                 pfx = 'Retargetting %s' % (repr(username),)
277                 tin = targetIn or config.ShareTarget
278                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
279                 tgt = target or config.ShareTarget
280                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
281         userStatus[username] = [target, now, 0]
282         return target
283 getTarget.logger = logging.getLogger('getTarget')
284
285 def TopTargets(n = 0x10):
286         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
287         tmp.sort(key=lambda k: -userStatus[k][0])
288         tmp2 = {}
289         def t2d(t):
290                 if t not in tmp2:
291                         tmp2[t] = target2pdiff(t)
292                 return tmp2[t]
293         for k in tmp[-n:]:
294                 tgt = userStatus[k][0]
295                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
296
297 def RegisterWork(username, wli, wld, RequestedTarget = None):
298         now = time()
299         target = getTarget(username, now, RequestedTarget=RequestedTarget)
300         wld = tuple(wld) + (target,)
301         workLog.setdefault(username, {})[wli] = (wld, now)
302         return target or config.ShareTarget
303
304 def getBlockHeader(username):
305         MRD = MM.getMRD()
306         merkleRoot = MRD[0]
307         hdr = MakeBlockHeader(MRD)
308         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
309         target = RegisterWork(username, merkleRoot, MRD)
310         return (hdr, workLog[username][merkleRoot], target)
311
312 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
313         if server.tls.wantClear:
314                 wantClear = True
315         elif p_magic and username not in workLog:
316                 wantClear = True
317                 p_magic[0] = True
318         else:
319                 wantClear = False
320         MC = MM.getMC(wantClear)
321         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
322         wliPos = coinbase[0] + 2
323         wliLen = coinbase[wliPos - 1]
324         wli = coinbase[wliPos:wliPos+wliLen]
325         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
326         return (MC, workLog[username][wli], target)
327
328 def getStratumJob(jobid, wantClear = False):
329         MC = MM.getMC(wantClear)
330         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
331         now = time()
332         workLog.setdefault(None, {})[jobid] = (MC, now)
333         return (MC, workLog[None][jobid])
334
335 def getExistingStratumJob(jobid):
336         wld = workLog[None][jobid]
337         return (wld[0], wld)
338
339 loggersShare = []
340 authenticators = []
341
342 RBDs = []
343 RBPs = []
344
345 from bitcoin.varlen import varlenEncode, varlenDecode
346 import bitcoin.txn
347 from merklemaker import assembleBlock
348
349 if not hasattr(config, 'BlockSubmissions'):
350         config.BlockSubmissions = None
351
352 RBFs = []
353 def blockSubmissionThread(payload, blkhash, share):
354         if config.BlockSubmissions is None:
355                 servers = list(a for b in MM.TemplateSources for a in b)
356         else:
357                 servers = list(config.BlockSubmissions)
358         
359         if hasattr(share['merkletree'], 'source_uri'):
360                 servers.insert(0, {
361                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
362                         'name': share['merkletree'].source,
363                 })
364         elif not servers:
365                 servers = list(a for b in MM.TemplateSources for a in b)
366         
367         myblock = (blkhash, payload[4:36])
368         payload = b2a_hex(payload).decode('ascii')
369         nexterr = 0
370         tries = 0
371         success = False
372         while len(servers):
373                 tries += 1
374                 TS = servers.pop(0)
375                 UpstreamBitcoindJSONRPC = TS['access']
376                 try:
377                         # BIP 22 standard submitblock
378                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
379                 except BaseException as gbterr:
380                         gbterr_fmt = traceback.format_exc()
381                         try:
382                                 try:
383                                         # bitcoind 0.5/0.6 getmemorypool
384                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
385                                 except:
386                                         # Old BIP 22 draft getmemorypool
387                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
388                                 if reason is True:
389                                         reason = None
390                                 elif reason is False:
391                                         reason = 'rejected'
392                         except BaseException as gmperr:
393                                 now = time()
394                                 if now > nexterr:
395                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
396                                         RaiseRedFlags(gbterr_fmt)
397                                         nexterr = now + 5
398                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
399                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
400                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
401                                         if share['upstreamRejectReason'] is PendingUpstream:
402                                                 share['upstreamRejectReason'] = 'GAVE UP'
403                                                 share['upstreamResult'] = False
404                                                 logShare(share)
405                                         return
406                                 
407                                 servers.append(TS)
408                                 continue
409                 
410                 # At this point, we have a reason back
411                 if reason:
412                         # FIXME: The returned value could be a list of multiple responses
413                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
414                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
415                                 # no big deal
416                                 blockSubmissionThread.logger.debug(msg)
417                         else:
418                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
419                                 RaiseRedFlags(msg)
420                 else:
421                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
422                         success = True
423                 if share['upstreamRejectReason'] is PendingUpstream:
424                         share['upstreamRejectReason'] = reason
425                         share['upstreamResult'] = not reason
426                         logShare(share)
427 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
428
429 def checkData(share):
430         data = share['data']
431         data = data[:80]
432         (prevBlock, height, bits) = MM.currentBlock
433         sharePrevBlock = data[4:36]
434         if sharePrevBlock != prevBlock:
435                 if sharePrevBlock == MM.lastBlock[0]:
436                         raise RejectedShare('stale-prevblk')
437                 raise RejectedShare('bad-prevblk')
438         
439         if data[72:76] != bits:
440                 raise RejectedShare('bad-diffbits')
441         
442         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
443         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
444         if data[1:4] != b'\0\0\0' or data[0] > 2:
445                 raise RejectedShare('bad-version')
446
447 def buildStratumData(share, merkleroot):
448         (prevBlock, height, bits) = MM.currentBlock
449         
450         data = b'\x02\0\0\0'
451         data += prevBlock
452         data += merkleroot
453         data += share['ntime'][::-1]
454         data += bits
455         data += share['nonce'][::-1]
456         
457         share['data'] = data
458         return data
459
460 def IsJobValid(wli, wluser = None):
461         if wluser not in workLog:
462                 return False
463         if wli not in workLog[wluser]:
464                 return False
465         (wld, issueT) = workLog[wluser][wli]
466         if time() < issueT - 120:
467                 return False
468         return True
469
470 def checkShare(share):
471         shareTime = share['time'] = time()
472         
473         username = share['username']
474         if 'data' in share:
475                 # getwork/GBT
476                 checkData(share)
477                 data = share['data']
478                 
479                 if username not in workLog:
480                         raise RejectedShare('unknown-user')
481                 MWL = workLog[username]
482                 
483                 shareMerkleRoot = data[36:68]
484                 if 'blkdata' in share:
485                         pl = share['blkdata']
486                         (txncount, pl) = varlenDecode(pl)
487                         cbtxn = bitcoin.txn.Txn(pl)
488                         othertxndata = cbtxn.disassemble(retExtra=True)
489                         coinbase = cbtxn.getCoinbase()
490                         wliPos = coinbase[0] + 2
491                         wliLen = coinbase[wliPos - 1]
492                         wli = coinbase[wliPos:wliPos+wliLen]
493                         mode = 'MC'
494                         moden = 1
495                 else:
496                         wli = shareMerkleRoot
497                         mode = 'MRD'
498                         moden = 0
499                         coinbase = None
500         else:
501                 # Stratum
502                 MWL = workLog[None]
503                 wli = share['jobid']
504                 buildStratumData(share, b'\0' * 32)
505                 mode = 'MC'
506                 moden = 1
507                 othertxndata = b''
508         
509         if wli not in MWL:
510                 raise RejectedShare('unknown-work')
511         (wld, issueT) = MWL[wli]
512         share[mode] = wld
513         
514         share['issuetime'] = issueT
515         
516         (workMerkleTree, workCoinbase) = wld[1:3]
517         share['merkletree'] = workMerkleTree
518         if 'jobid' in share:
519                 cbtxn = deepcopy(workMerkleTree.data[0])
520                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
521                 cbtxn.setCoinbase(coinbase)
522                 cbtxn.assemble()
523                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
524                 shareMerkleRoot = data[36:68]
525         
526         if data in DupeShareHACK:
527                 raise RejectedShare('duplicate')
528         DupeShareHACK[data] = None
529         
530         blkhash = dblsha(data)
531         if blkhash[28:] != b'\0\0\0\0':
532                 raise RejectedShare('H-not-zero')
533         blkhashn = LEhash2int(blkhash)
534         
535         global networkTarget
536         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
537         logfunc('BLKHASH: %64x' % (blkhashn,))
538         logfunc(' TARGET: %64x' % (networkTarget,))
539         
540         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
541         txlist = workMerkleTree.data
542         txlist = [deepcopy(txlist[0]),] + txlist[1:]
543         cbtxn = txlist[0]
544         cbtxn.setCoinbase(coinbase or workCoinbase)
545         cbtxn.assemble()
546         
547         if blkhashn <= networkTarget:
548                 logfunc("Submitting upstream")
549                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
550                 if not moden:
551                         payload = assembleBlock(data, txlist)
552                 else:
553                         payload = share['data']
554                         if len(othertxndata):
555                                 payload += share['blkdata']
556                         else:
557                                 payload += assembleBlock(data, txlist)[80:]
558                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
559                 RBPs.append(payload)
560                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
561                 bcnode.submitBlock(payload)
562                 if config.DelayLogForUpstream:
563                         share['upstreamRejectReason'] = PendingUpstream
564                 else:
565                         share['upstreamRejectReason'] = None
566                         share['upstreamResult'] = True
567                 MM.updateBlock(blkhash)
568         
569         # Gotwork hack...
570         if gotwork and blkhashn <= config.GotWorkTarget:
571                 try:
572                         coinbaseMrkl = cbtxn.data
573                         coinbaseMrkl += blkhash
574                         steps = workMerkleTree._steps
575                         coinbaseMrkl += pack('B', len(steps))
576                         for step in steps:
577                                 coinbaseMrkl += step
578                         coinbaseMrkl += b"\0\0\0\0"
579                         info = {}
580                         info['hash'] = b2a_hex(blkhash).decode('ascii')
581                         info['header'] = b2a_hex(data).decode('ascii')
582                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
583                         thr = threading.Thread(target=submitGotwork, args=(info,))
584                         thr.daemon = True
585                         thr.start()
586                 except:
587                         checkShare.logger.warning('Failed to build gotwork request')
588         
589         if 'target' in share:
590                 workTarget = share['target']
591         elif len(wld) > 6:
592                 workTarget = wld[6]
593         else:
594                 workTarget = None
595         
596         if workTarget is None:
597                 workTarget = config.ShareTarget
598         if blkhashn > workTarget:
599                 raise RejectedShare('high-hash')
600         share['target'] = workTarget
601         share['_targethex'] = '%064x' % (workTarget,)
602         
603         shareTimestamp = unpack('<L', data[68:72])[0]
604         if shareTime < issueT - 120:
605                 raise RejectedShare('stale-work')
606         if shareTimestamp < shareTime - 300:
607                 raise RejectedShare('time-too-old')
608         if shareTimestamp > shareTime + 7200:
609                 raise RejectedShare('time-too-new')
610         
611         if config.DynamicTargetting and username in userStatus:
612                 # NOTE: userStatus[username] only doesn't exist across restarts
613                 status = userStatus[username]
614                 target = status[0] or config.ShareTarget
615                 if target == workTarget:
616                         userStatus[username][2] += 1
617                 else:
618                         userStatus[username][2] += float(target) / workTarget
619         
620         if moden:
621                 cbpre = workCoinbase
622                 cbpreLen = len(cbpre)
623                 if coinbase[:cbpreLen] != cbpre:
624                         raise RejectedShare('bad-cb-prefix')
625                 
626                 # Filter out known "I support" flags, to prevent exploits
627                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
628                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
629                                 raise RejectedShare('bad-cb-flag')
630                 
631                 if len(coinbase) > 100:
632                         raise RejectedShare('bad-cb-length')
633                 
634                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
635                         raise RejectedShare('bad-txnmrklroot')
636                 
637                 if len(othertxndata):
638                         allowed = assembleBlock(data, txlist)[80:]
639                         if allowed != share['blkdata']:
640                                 raise RejectedShare('bad-txns')
641 checkShare.logger = logging.getLogger('checkShare')
642
643 def logShare(share):
644         if '_origdata' in share:
645                 share['solution'] = share['_origdata']
646         else:
647                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
648         for i in loggersShare:
649                 i.logShare(share)
650
651 def checkAuthentication(username, password):
652         for i in authenticators:
653                 if i.checkAuthentication(username, password):
654                         return True
655         return False
656
657 def receiveShare(share):
658         # TODO: username => userid
659         try:
660                 checkShare(share)
661         except RejectedShare as rej:
662                 share['rejectReason'] = str(rej)
663                 raise
664         except BaseException as e:
665                 share['rejectReason'] = 'ERROR'
666                 raise
667         finally:
668                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
669                         logShare(share)
670
671 def newBlockNotification():
672         logging.getLogger('newBlockNotification').info('Received new block notification')
673         MM.updateMerkleTree()
674         # TODO: Force RESPOND TO LONGPOLLS?
675         pass
676
677 def newBlockNotificationSIGNAL(signum, frame):
678         # Use a new thread, in case the signal handler is called with locks held
679         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
680         thr.daemon = True
681         thr.start()
682
683 from signal import signal, SIGUSR1
684 signal(SIGUSR1, newBlockNotificationSIGNAL)
685
686
687 import os
688 import os.path
689 import pickle
690 import signal
691 import sys
692 from time import sleep
693 import traceback
694
695 if getattr(config, 'SaveStateFilename', None) is None:
696         config.SaveStateFilename = 'eloipool.worklog'
697
698 def stopServers():
699         logger = logging.getLogger('stopServers')
700         
701         if hasattr(stopServers, 'already'):
702                 logger.debug('Already tried to stop servers before')
703                 return
704         stopServers.already = True
705         
706         logger.info('Stopping servers...')
707         global bcnode, server
708         servers = (bcnode, server, stratumsrv)
709         for s in servers:
710                 s.keepgoing = False
711         for s in servers:
712                 try:
713                         s.wakeup()
714                 except:
715                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
716         i = 0
717         while True:
718                 sl = []
719                 for s in servers:
720                         if s.running:
721                                 sl.append(s.__class__.__name__)
722                 if not sl:
723                         break
724                 i += 1
725                 if i >= 0x100:
726                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
727                         break
728                 sleep(0.01)
729         
730         for s in servers:
731                 for fd in s._fd.keys():
732                         os.close(fd)
733
734 def stopLoggers():
735         for i in loggersShare:
736                 if hasattr(i, 'stop'):
737                         i.stop()
738
739 def saveState(SAVE_STATE_FILENAME, t = None):
740         logger = logging.getLogger('saveState')
741         
742         # Then, save data needed to resume work
743         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
744         i = 0
745         while True:
746                 try:
747                         with open(SAVE_STATE_FILENAME, 'wb') as f:
748                                 pickle.dump(t, f)
749                                 pickle.dump(DupeShareHACK, f)
750                                 pickle.dump(workLog, f)
751                         break
752                 except:
753                         i += 1
754                         if i >= 0x10000:
755                                 logger.error('Failed to save work\n' + traceback.format_exc())
756                                 try:
757                                         os.unlink(SAVE_STATE_FILENAME)
758                                 except:
759                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
760
761 def exit():
762         t = time()
763         stopServers()
764         stopLoggers()
765         saveState(config.SaveStateFilename, t=t)
766         logging.getLogger('exit').info('Goodbye...')
767         os.kill(os.getpid(), signal.SIGTERM)
768         sys.exit(0)
769
770 def restart():
771         t = time()
772         stopServers()
773         stopLoggers()
774         saveState(config.SaveStateFilename, t=t)
775         logging.getLogger('restart').info('Restarting...')
776         try:
777                 os.execv(sys.argv[0], sys.argv)
778         except:
779                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
780
781 def restoreState(SAVE_STATE_FILENAME):
782         if not os.path.exists(SAVE_STATE_FILENAME):
783                 return
784         
785         global workLog, DupeShareHACK
786         
787         logger = logging.getLogger('restoreState')
788         s = os.stat(SAVE_STATE_FILENAME)
789         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
790         try:
791                 with open(SAVE_STATE_FILENAME, 'rb') as f:
792                         t = pickle.load(f)
793                         if type(t) == tuple:
794                                 if len(t) > 2:
795                                         # Future formats, not supported here
796                                         ver = t[3]
797                                         # TODO
798                                 
799                                 # Old format, from 2012-02-02 to 2012-02-03
800                                 workLog = t[0]
801                                 DupeShareHACK = t[1]
802                                 t = None
803                         else:
804                                 if isinstance(t, dict):
805                                         # Old format, from 2012-02-03 to 2012-02-03
806                                         DupeShareHACK = t
807                                         t = None
808                                 else:
809                                         # Current format, from 2012-02-03 onward
810                                         DupeShareHACK = pickle.load(f)
811                                 
812                                 if t + 120 >= time():
813                                         workLog = pickle.load(f)
814                                 else:
815                                         logger.debug('Skipping restore of expired workLog')
816         except:
817                 logger.error('Failed to restore state\n' + traceback.format_exc())
818                 return
819         logger.info('State restored successfully')
820         if t:
821                 logger.info('Total downtime: %g seconds' % (time() - t,))
822
823
824 from jsonrpcserver import JSONRPCListener, JSONRPCServer
825 import interactivemode
826 from networkserver import NetworkListener
827 import threading
828 import sharelogging
829 import authentication
830 from stratumserver import StratumServer
831 import imp
832
833 if __name__ == "__main__":
834         if not hasattr(config, 'ShareLogging'):
835                 config.ShareLogging = ()
836         if hasattr(config, 'DbOptions'):
837                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
838                 config.ShareLogging = list(config.ShareLogging)
839                 config.ShareLogging.append( {
840                         'type': 'sql',
841                         'engine': 'postgres',
842                         'dbopts': config.DbOptions,
843                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
844                 } )
845         for i in config.ShareLogging:
846                 if not hasattr(i, 'keys'):
847                         name, parameters = i
848                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
849                         if name == 'postgres':
850                                 name = 'sql'
851                                 i = {
852                                         'engine': 'postgres',
853                                         'dbopts': parameters,
854                                 }
855                         elif name == 'logfile':
856                                 i = {}
857                                 i['thropts'] = parameters
858                                 if 'filename' in parameters:
859                                         i['filename'] = parameters['filename']
860                                         i['thropts'] = dict(i['thropts'])
861                                         del i['thropts']['filename']
862                         else:
863                                 i = parameters
864                         i['type'] = name
865                 
866                 name = i['type']
867                 parameters = i
868                 try:
869                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
870                         m = imp.load_module(name, fp, pathname, description)
871                         lo = getattr(m, name)(**parameters)
872                         loggersShare.append(lo)
873                 except:
874                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
875         
876         if not hasattr(config, 'Authentication'):
877                 config.Authentication = ({'module': 'allowall'},)
878         
879         for i in config.Authentication:
880                 name = i['module']
881                 parameters = i
882                 try:
883                         fp, pathname, description = imp.find_module(name, authentication.__path__)
884                         m = imp.load_module(name, fp, pathname, description)
885                         lo = getattr(m, name)(**parameters)
886                         authenticators.append(lo)
887                 except:
888                         logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
889         
890         LSbc = []
891         if not hasattr(config, 'BitcoinNodeAddresses'):
892                 config.BitcoinNodeAddresses = ()
893         for a in config.BitcoinNodeAddresses:
894                 LSbc.append(NetworkListener(bcnode, a))
895         
896         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
897                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
898         
899         import jsonrpc_getblocktemplate
900         import jsonrpc_getwork
901         import jsonrpc_setworkaux
902         
903         server = JSONRPCServer()
904         server.tls = threading.local()
905         server.tls.wantClear = False
906         if hasattr(config, 'JSONRPCAddress'):
907                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
908                 if not hasattr(config, 'JSONRPCAddresses'):
909                         config.JSONRPCAddresses = []
910                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
911         LS = []
912         for a in config.JSONRPCAddresses:
913                 LS.append(JSONRPCListener(server, a))
914         if hasattr(config, 'SecretUser'):
915                 server.SecretUser = config.SecretUser
916         server.aux = MM.CoinbaseAux
917         server.getBlockHeader = getBlockHeader
918         server.getBlockTemplate = getBlockTemplate
919         server.receiveShare = receiveShare
920         server.RaiseRedFlags = RaiseRedFlags
921         server.ShareTarget = config.ShareTarget
922         server.checkAuthentication = checkAuthentication
923         
924         if hasattr(config, 'TrustedForwarders'):
925                 server.TrustedForwarders = config.TrustedForwarders
926         server.ServerName = config.ServerName
927         
928         stratumsrv = StratumServer()
929         stratumsrv.getStratumJob = getStratumJob
930         stratumsrv.getExistingStratumJob = getExistingStratumJob
931         stratumsrv.receiveShare = receiveShare
932         stratumsrv.getTarget = getTarget
933         stratumsrv.defaultTarget = config.ShareTarget
934         stratumsrv.IsJobValid = IsJobValid
935         stratumsrv.checkAuthentication = checkAuthentication
936         if not hasattr(config, 'StratumAddresses'):
937                 config.StratumAddresses = ()
938         for a in config.StratumAddresses:
939                 NetworkListener(stratumsrv, a)
940         
941         MM.start()
942         
943         restoreState(config.SaveStateFilename)
944         
945         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
946         prune_thr.daemon = True
947         prune_thr.start()
948         
949         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
950         bcnode_thr.daemon = True
951         bcnode_thr.start()
952         
953         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
954         stratum_thr.daemon = True
955         stratum_thr.start()
956         
957         server.serve_forever()