Bugfix: authorization: Handle utf8 decoding outside of generic modules, safe if passw...
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # Portions written by Peter Leurs <kinlo@triplemining.com>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Affero General Public License for more details.
15 #
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import argparse
20 import importlib
21 argparser = argparse.ArgumentParser()
22 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
23 args = argparser.parse_args()
24 configmod = 'config'
25 if not args.config is None:
26         configmod = 'config_%s' % (args.config,)
27 __import__(configmod)
28 config = importlib.import_module(configmod)
29
30 if not hasattr(config, 'ServerName'):
31         config.ServerName = 'Unnamed Eloipool'
32
33 if not hasattr(config, 'ShareTarget'):
34         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
35
36
37 import logging
38 import logging.handlers
39
40 rootlogger = logging.getLogger(None)
41 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
42 logformatter = logging.Formatter(logformat)
43 if len(rootlogger.handlers) == 0:
44         logging.basicConfig(
45                 format=logformat,
46                 level=logging.DEBUG,
47         )
48         for infoOnly in (
49                 'checkShare',
50                 'getTarget',
51                 'JSONRPCHandler',
52                 'JSONRPCServer',
53                 'merkleMaker',
54                 'StratumServer',
55                 'Waker for JSONRPCServer',
56                 'Waker for StratumServer',
57                 'WorkLogPruner'
58         ):
59                 logging.getLogger(infoOnly).setLevel(logging.INFO)
60 if getattr(config, 'LogToSysLog', False):
61         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
62         rootlogger.addHandler(sysloghandler)
63 if hasattr(config, 'LogFile'):
64         if isinstance(config.LogFile, str):
65                 filehandler = logging.FileHandler(config.LogFile)
66         else:
67                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
68         filehandler.setFormatter(logformatter)
69         rootlogger.addHandler(filehandler)
70
71 def RaiseRedFlags(reason):
72         logging.getLogger('redflag').critical(reason)
73         return reason
74
75
76 from bitcoin.node import BitcoinLink, BitcoinNode
77 bcnode = BitcoinNode(config.UpstreamNetworkId)
78 bcnode.userAgent += b'Eloipool:0.1/'
79 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80
81 import jsonrpc
82
83 try:
84         import jsonrpc.authproxy
85         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
86 except:
87         pass
88
89
90 from bitcoin.script import BitcoinScript
91 from bitcoin.txn import Txn
92 from base58 import b58decode
93 from struct import pack
94 import subprocess
95 from time import time
96
97 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
98         txn = Txn.new()
99         
100         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
101                 coinbased = 0
102                 try:
103                         cmd = config.CoinbaserCmd
104                         cmd = cmd.replace('%d', str(coinbaseValue))
105                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
106                         nout = int(p.stdout.readline())
107                         for i in range(nout):
108                                 amount = int(p.stdout.readline())
109                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
110                                 pkScript = BitcoinScript.toAddress(addr)
111                                 txn.addOutput(amount, pkScript)
112                                 coinbased += amount
113                 except:
114                         coinbased = coinbaseValue + 1
115                 if coinbased >= coinbaseValue:
116                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
117                         txn.outputs = []
118                 else:
119                         coinbaseValue -= coinbased
120         
121         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
122         txn.addOutput(coinbaseValue, pkScript)
123         
124         # TODO
125         # TODO: red flag on dupe coinbase
126         return txn
127
128
129 import jsonrpc_getwork
130 from util import Bits2Target
131
132 workLog = {}
133 userStatus = {}
134 networkTarget = None
135 DupeShareHACK = {}
136
137 server = None
138 stratumsrv = None
139 def updateBlocks():
140         server.wakeLongpoll()
141         stratumsrv.updateJob()
142
143 def blockChanged():
144         global MM, networkTarget, server
145         bits = MM.currentBlock[2]
146         if bits is None:
147                 networkTarget = None
148         else:
149                 networkTarget = Bits2Target(bits)
150         if MM.lastBlock != (None, None, None):
151                 global DupeShareHACK
152                 DupeShareHACK = {}
153                 jsonrpc_getwork._CheckForDupesHACK = {}
154                 workLog.clear()
155         server.wakeLongpoll(wantClear=True)
156         stratumsrv.updateJob(wantClear=True)
157
158
159 from time import sleep, time
160 import traceback
161
162 def _WorkLogPruner_I(wl):
163         now = time()
164         pruned = 0
165         for username in wl:
166                 userwork = wl[username]
167                 for wli in tuple(userwork.keys()):
168                         if now > userwork[wli][1] + 120:
169                                 del userwork[wli]
170                                 pruned += 1
171         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
172
173 def WorkLogPruner(wl):
174         while True:
175                 try:
176                         sleep(60)
177                         _WorkLogPruner_I(wl)
178                 except:
179                         WorkLogPruner.logger.error(traceback.format_exc())
180 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
181
182
183 from merklemaker import merkleMaker
184 MM = merkleMaker()
185 MM.__dict__.update(config.__dict__)
186 MM.makeCoinbaseTxn = makeCoinbaseTxn
187 MM.onBlockChange = blockChanged
188 MM.onBlockUpdate = updateBlocks
189
190
191 from binascii import b2a_hex
192 from copy import deepcopy
193 from math import ceil, log
194 from merklemaker import MakeBlockHeader
195 from struct import pack, unpack
196 import threading
197 from time import time
198 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
199 import jsonrpc
200 import traceback
201
202 gotwork = None
203 if hasattr(config, 'GotWorkURI'):
204         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
205
206 if not hasattr(config, 'DelayLogForUpstream'):
207         config.DelayLogForUpstream = False
208
209 if not hasattr(config, 'DynamicTargetting'):
210         config.DynamicTargetting = 0
211 else:
212         if not hasattr(config, 'DynamicTargetWindow'):
213                 config.DynamicTargetWindow = 120
214         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
215
216 def submitGotwork(info):
217         try:
218                 gotwork.gotwork(info)
219         except:
220                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
221
222 def clampTarget(target, DTMode):
223         # ShareTarget is the minimum
224         if target is None or target > config.ShareTarget:
225                 target = config.ShareTarget
226         
227         # Never target above the network, as we'd lose blocks
228         if target < networkTarget:
229                 target = networkTarget
230         
231         if DTMode == 2:
232                 # Ceil target to a power of two :)
233                 truebits = log(target, 2)
234                 if target <= 2**int(truebits):
235                         # Workaround for bug in Python's math.log function
236                         truebits = int(truebits)
237                 target = 2**ceil(truebits) - 1
238         elif DTMode == 3:
239                 # Round target to multiple of bdiff 1
240                 target = bdiff1target / int(round(target2bdiff(target)))
241         
242         # Return None for ShareTarget to save memory
243         if target == config.ShareTarget:
244                 return None
245         return target
246
247 def getTarget(username, now, DTMode = None, RequestedTarget = None):
248         if DTMode is None:
249                 DTMode = config.DynamicTargetting
250         if not DTMode:
251                 return None
252         if username in userStatus:
253                 status = userStatus[username]
254         else:
255                 # No record, use default target
256                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
257                 userStatus[username] = [RequestedTarget, now, 0]
258                 return RequestedTarget
259         (targetIn, lastUpdate, work) = status
260         if work <= config.DynamicTargetGoal:
261                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
262                         # No reason to change it just yet
263                         return clampTarget(targetIn, DTMode)
264                 if not work:
265                         # No shares received, reset to minimum
266                         if targetIn:
267                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
268                                 userStatus[username] = [None, now, 0]
269                         return clampTarget(None, DTMode)
270         
271         deltaSec = now - lastUpdate
272         target = targetIn or config.ShareTarget
273         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
274         target = clampTarget(target, DTMode)
275         if target != targetIn:
276                 pfx = 'Retargetting %s' % (repr(username),)
277                 tin = targetIn or config.ShareTarget
278                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
279                 tgt = target or config.ShareTarget
280                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
281         userStatus[username] = [target, now, 0]
282         return target
283 getTarget.logger = logging.getLogger('getTarget')
284
285 def TopTargets(n = 0x10):
286         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
287         tmp.sort(key=lambda k: -userStatus[k][0])
288         tmp2 = {}
289         def t2d(t):
290                 if t not in tmp2:
291                         tmp2[t] = target2pdiff(t)
292                 return tmp2[t]
293         for k in tmp[-n:]:
294                 tgt = userStatus[k][0]
295                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
296
297 def RegisterWork(username, wli, wld, RequestedTarget = None):
298         now = time()
299         target = getTarget(username, now, RequestedTarget=RequestedTarget)
300         wld = tuple(wld) + (target,)
301         workLog.setdefault(username, {})[wli] = (wld, now)
302         return target or config.ShareTarget
303
304 def getBlockHeader(username):
305         MRD = MM.getMRD()
306         merkleRoot = MRD[0]
307         hdr = MakeBlockHeader(MRD)
308         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
309         target = RegisterWork(username, merkleRoot, MRD)
310         return (hdr, workLog[username][merkleRoot], target)
311
312 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
313         if server.tls.wantClear:
314                 wantClear = True
315         elif p_magic and username not in workLog:
316                 wantClear = True
317                 p_magic[0] = True
318         else:
319                 wantClear = False
320         MC = MM.getMC(wantClear)
321         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
322         wliPos = coinbase[0] + 2
323         wliLen = coinbase[wliPos - 1]
324         wli = coinbase[wliPos:wliPos+wliLen]
325         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
326         return (MC, workLog[username][wli], target)
327
328 def getStratumJob(jobid, wantClear = False):
329         MC = MM.getMC(wantClear)
330         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
331         now = time()
332         workLog.setdefault(None, {})[jobid] = (MC, now)
333         return (MC, workLog[None][jobid])
334
335 def getExistingStratumJob(jobid):
336         wld = workLog[None][jobid]
337         return (wld[0], wld)
338
339 loggersShare = []
340 authenticators = []
341
342 RBDs = []
343 RBPs = []
344
345 from bitcoin.varlen import varlenEncode, varlenDecode
346 import bitcoin.txn
347 from merklemaker import assembleBlock
348
349 if not hasattr(config, 'BlockSubmissions'):
350         config.BlockSubmissions = None
351
352 RBFs = []
353 def blockSubmissionThread(payload, blkhash, share):
354         if config.BlockSubmissions is None:
355                 servers = list(a for b in MM.TemplateSources for a in b)
356         else:
357                 servers = list(config.BlockSubmissions)
358         
359         if hasattr(share['merkletree'], 'source_uri'):
360                 servers.insert(0, {
361                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
362                         'name': share['merkletree'].source,
363                 })
364         elif not servers:
365                 servers = list(a for b in MM.TemplateSources for a in b)
366         
367         myblock = (blkhash, payload[4:36])
368         payload = b2a_hex(payload).decode('ascii')
369         nexterr = 0
370         tries = 0
371         success = False
372         while len(servers):
373                 tries += 1
374                 TS = servers.pop(0)
375                 UpstreamBitcoindJSONRPC = TS['access']
376                 try:
377                         # BIP 22 standard submitblock
378                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
379                 except BaseException as gbterr:
380                         gbterr_fmt = traceback.format_exc()
381                         try:
382                                 try:
383                                         # bitcoind 0.5/0.6 getmemorypool
384                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
385                                 except:
386                                         # Old BIP 22 draft getmemorypool
387                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
388                                 if reason is True:
389                                         reason = None
390                                 elif reason is False:
391                                         reason = 'rejected'
392                         except BaseException as gmperr:
393                                 now = time()
394                                 if now > nexterr:
395                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
396                                         RaiseRedFlags(gbterr_fmt)
397                                         nexterr = now + 5
398                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
399                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
400                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
401                                         if share['upstreamRejectReason'] is PendingUpstream:
402                                                 share['upstreamRejectReason'] = 'GAVE UP'
403                                                 share['upstreamResult'] = False
404                                                 logShare(share)
405                                         return
406                                 
407                                 servers.append(TS)
408                                 continue
409                 
410                 # At this point, we have a reason back
411                 if reason:
412                         # FIXME: The returned value could be a list of multiple responses
413                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
414                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
415                                 # no big deal
416                                 blockSubmissionThread.logger.debug(msg)
417                         else:
418                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
419                                 RaiseRedFlags(msg)
420                 else:
421                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
422                         success = True
423                 if share['upstreamRejectReason'] is PendingUpstream:
424                         share['upstreamRejectReason'] = reason
425                         share['upstreamResult'] = not reason
426                         logShare(share)
427 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
428
429 def checkData(share):
430         data = share['data']
431         data = data[:80]
432         (prevBlock, height, bits) = MM.currentBlock
433         sharePrevBlock = data[4:36]
434         if sharePrevBlock != prevBlock:
435                 if sharePrevBlock == MM.lastBlock[0]:
436                         raise RejectedShare('stale-prevblk')
437                 raise RejectedShare('bad-prevblk')
438         
439         if data[72:76] != bits:
440                 raise RejectedShare('bad-diffbits')
441         
442         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
443         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
444         if data[1:4] != b'\0\0\0' or data[0] > 2:
445                 raise RejectedShare('bad-version')
446
447 def buildStratumData(share, merkleroot):
448         (prevBlock, height, bits) = MM.currentBlock
449         
450         data = b'\x02\0\0\0'
451         data += prevBlock
452         data += merkleroot
453         data += share['ntime'][::-1]
454         data += bits
455         data += share['nonce'][::-1]
456         
457         share['data'] = data
458         return data
459
460 def IsJobValid(wli, wluser = None):
461         if wluser not in workLog:
462                 return False
463         if wli not in workLog[wluser]:
464                 return False
465         (wld, issueT) = workLog[wluser][wli]
466         if time() < issueT - 120:
467                 return False
468         return True
469
470 def checkShare(share):
471         shareTime = share['time'] = time()
472         
473         username = share['username']
474         if 'data' in share:
475                 # getwork/GBT
476                 checkData(share)
477                 data = share['data']
478                 
479                 if username not in workLog:
480                         raise RejectedShare('unknown-user')
481                 MWL = workLog[username]
482                 
483                 shareMerkleRoot = data[36:68]
484                 if 'blkdata' in share:
485                         pl = share['blkdata']
486                         (txncount, pl) = varlenDecode(pl)
487                         cbtxn = bitcoin.txn.Txn(pl)
488                         othertxndata = cbtxn.disassemble(retExtra=True)
489                         coinbase = cbtxn.getCoinbase()
490                         wliPos = coinbase[0] + 2
491                         wliLen = coinbase[wliPos - 1]
492                         wli = coinbase[wliPos:wliPos+wliLen]
493                         mode = 'MC'
494                         moden = 1
495                 else:
496                         wli = shareMerkleRoot
497                         mode = 'MRD'
498                         moden = 0
499                         coinbase = None
500         else:
501                 # Stratum
502                 MWL = workLog[None]
503                 wli = share['jobid']
504                 buildStratumData(share, b'\0' * 32)
505                 mode = 'MC'
506                 moden = 1
507                 othertxndata = b''
508         
509         if wli not in MWL:
510                 raise RejectedShare('unknown-work')
511         (wld, issueT) = MWL[wli]
512         share[mode] = wld
513         
514         share['issuetime'] = issueT
515         
516         (workMerkleTree, workCoinbase) = wld[1:3]
517         share['merkletree'] = workMerkleTree
518         if 'jobid' in share:
519                 cbtxn = deepcopy(workMerkleTree.data[0])
520                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
521                 cbtxn.setCoinbase(coinbase)
522                 cbtxn.assemble()
523                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
524                 shareMerkleRoot = data[36:68]
525         
526         if data in DupeShareHACK:
527                 raise RejectedShare('duplicate')
528         DupeShareHACK[data] = None
529         
530         blkhash = dblsha(data)
531         if blkhash[28:] != b'\0\0\0\0':
532                 raise RejectedShare('H-not-zero')
533         blkhashn = LEhash2int(blkhash)
534         
535         global networkTarget
536         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
537         logfunc('BLKHASH: %64x' % (blkhashn,))
538         logfunc(' TARGET: %64x' % (networkTarget,))
539         
540         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
541         txlist = workMerkleTree.data
542         txlist = [deepcopy(txlist[0]),] + txlist[1:]
543         cbtxn = txlist[0]
544         cbtxn.setCoinbase(coinbase or workCoinbase)
545         cbtxn.assemble()
546         
547         if blkhashn <= networkTarget:
548                 logfunc("Submitting upstream")
549                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
550                 if not moden:
551                         payload = assembleBlock(data, txlist)
552                 else:
553                         payload = share['data']
554                         if len(othertxndata):
555                                 payload += share['blkdata']
556                         else:
557                                 payload += assembleBlock(data, txlist)[80:]
558                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
559                 RBPs.append(payload)
560                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
561                 bcnode.submitBlock(payload)
562                 if config.DelayLogForUpstream:
563                         share['upstreamRejectReason'] = PendingUpstream
564                 else:
565                         share['upstreamRejectReason'] = None
566                         share['upstreamResult'] = True
567                 MM.updateBlock(blkhash)
568         
569         # Gotwork hack...
570         if gotwork and blkhashn <= config.GotWorkTarget:
571                 try:
572                         coinbaseMrkl = cbtxn.data
573                         coinbaseMrkl += blkhash
574                         steps = workMerkleTree._steps
575                         coinbaseMrkl += pack('B', len(steps))
576                         for step in steps:
577                                 coinbaseMrkl += step
578                         coinbaseMrkl += b"\0\0\0\0"
579                         info = {}
580                         info['hash'] = b2a_hex(blkhash).decode('ascii')
581                         info['header'] = b2a_hex(data).decode('ascii')
582                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
583                         thr = threading.Thread(target=submitGotwork, args=(info,))
584                         thr.daemon = True
585                         thr.start()
586                 except:
587                         checkShare.logger.warning('Failed to build gotwork request')
588         
589         if 'target' in share:
590                 workTarget = share['target']
591         elif len(wld) > 6:
592                 workTarget = wld[6]
593         else:
594                 workTarget = None
595         
596         if workTarget is None:
597                 workTarget = config.ShareTarget
598         if blkhashn > workTarget:
599                 raise RejectedShare('high-hash')
600         share['target'] = workTarget
601         share['_targethex'] = '%064x' % (workTarget,)
602         
603         shareTimestamp = unpack('<L', data[68:72])[0]
604         if shareTime < issueT - 120:
605                 raise RejectedShare('stale-work')
606         if shareTimestamp < shareTime - 300:
607                 raise RejectedShare('time-too-old')
608         if shareTimestamp > shareTime + 7200:
609                 raise RejectedShare('time-too-new')
610         
611         if config.DynamicTargetting and username in userStatus:
612                 # NOTE: userStatus[username] only doesn't exist across restarts
613                 status = userStatus[username]
614                 target = status[0] or config.ShareTarget
615                 if target == workTarget:
616                         userStatus[username][2] += 1
617                 else:
618                         userStatus[username][2] += float(target) / workTarget
619         
620         if moden:
621                 cbpre = workCoinbase
622                 cbpreLen = len(cbpre)
623                 if coinbase[:cbpreLen] != cbpre:
624                         raise RejectedShare('bad-cb-prefix')
625                 
626                 # Filter out known "I support" flags, to prevent exploits
627                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
628                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
629                                 raise RejectedShare('bad-cb-flag')
630                 
631                 if len(coinbase) > 100:
632                         raise RejectedShare('bad-cb-length')
633                 
634                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
635                         raise RejectedShare('bad-txnmrklroot')
636                 
637                 if len(othertxndata):
638                         allowed = assembleBlock(data, txlist)[80:]
639                         if allowed != share['blkdata']:
640                                 raise RejectedShare('bad-txns')
641 checkShare.logger = logging.getLogger('checkShare')
642
643 def logShare(share):
644         if '_origdata' in share:
645                 share['solution'] = share['_origdata']
646         else:
647                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
648         for i in loggersShare:
649                 i.logShare(share)
650
651 def checkAuthentication(username, password):
652         # HTTPServer uses bytes, and StratumServer uses str
653         if hasattr(username, 'decode'): username = username.decode('utf8')
654         if hasattr(password, 'decode'): password = password.decode('utf8')
655         
656         for i in authenticators:
657                 if i.checkAuthentication(username, password):
658                         return True
659         return False
660
661 def receiveShare(share):
662         # TODO: username => userid
663         try:
664                 checkShare(share)
665         except RejectedShare as rej:
666                 share['rejectReason'] = str(rej)
667                 raise
668         except BaseException as e:
669                 share['rejectReason'] = 'ERROR'
670                 raise
671         finally:
672                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
673                         logShare(share)
674
675 def newBlockNotification():
676         logging.getLogger('newBlockNotification').info('Received new block notification')
677         MM.updateMerkleTree()
678         # TODO: Force RESPOND TO LONGPOLLS?
679         pass
680
681 def newBlockNotificationSIGNAL(signum, frame):
682         # Use a new thread, in case the signal handler is called with locks held
683         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
684         thr.daemon = True
685         thr.start()
686
687 from signal import signal, SIGUSR1
688 signal(SIGUSR1, newBlockNotificationSIGNAL)
689
690
691 import os
692 import os.path
693 import pickle
694 import signal
695 import sys
696 from time import sleep
697 import traceback
698
699 if getattr(config, 'SaveStateFilename', None) is None:
700         config.SaveStateFilename = 'eloipool.worklog'
701
702 def stopServers():
703         logger = logging.getLogger('stopServers')
704         
705         if hasattr(stopServers, 'already'):
706                 logger.debug('Already tried to stop servers before')
707                 return
708         stopServers.already = True
709         
710         logger.info('Stopping servers...')
711         global bcnode, server
712         servers = (bcnode, server, stratumsrv)
713         for s in servers:
714                 s.keepgoing = False
715         for s in servers:
716                 try:
717                         s.wakeup()
718                 except:
719                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
720         i = 0
721         while True:
722                 sl = []
723                 for s in servers:
724                         if s.running:
725                                 sl.append(s.__class__.__name__)
726                 if not sl:
727                         break
728                 i += 1
729                 if i >= 0x100:
730                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
731                         break
732                 sleep(0.01)
733         
734         for s in servers:
735                 for fd in s._fd.keys():
736                         os.close(fd)
737
738 def stopLoggers():
739         for i in loggersShare:
740                 if hasattr(i, 'stop'):
741                         i.stop()
742
743 def saveState(SAVE_STATE_FILENAME, t = None):
744         logger = logging.getLogger('saveState')
745         
746         # Then, save data needed to resume work
747         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
748         i = 0
749         while True:
750                 try:
751                         with open(SAVE_STATE_FILENAME, 'wb') as f:
752                                 pickle.dump(t, f)
753                                 pickle.dump(DupeShareHACK, f)
754                                 pickle.dump(workLog, f)
755                         break
756                 except:
757                         i += 1
758                         if i >= 0x10000:
759                                 logger.error('Failed to save work\n' + traceback.format_exc())
760                                 try:
761                                         os.unlink(SAVE_STATE_FILENAME)
762                                 except:
763                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
764
765 def exit():
766         t = time()
767         stopServers()
768         stopLoggers()
769         saveState(config.SaveStateFilename, t=t)
770         logging.getLogger('exit').info('Goodbye...')
771         os.kill(os.getpid(), signal.SIGTERM)
772         sys.exit(0)
773
774 def restart():
775         t = time()
776         stopServers()
777         stopLoggers()
778         saveState(config.SaveStateFilename, t=t)
779         logging.getLogger('restart').info('Restarting...')
780         try:
781                 os.execv(sys.argv[0], sys.argv)
782         except:
783                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
784
785 def restoreState(SAVE_STATE_FILENAME):
786         if not os.path.exists(SAVE_STATE_FILENAME):
787                 return
788         
789         global workLog, DupeShareHACK
790         
791         logger = logging.getLogger('restoreState')
792         s = os.stat(SAVE_STATE_FILENAME)
793         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
794         try:
795                 with open(SAVE_STATE_FILENAME, 'rb') as f:
796                         t = pickle.load(f)
797                         if type(t) == tuple:
798                                 if len(t) > 2:
799                                         # Future formats, not supported here
800                                         ver = t[3]
801                                         # TODO
802                                 
803                                 # Old format, from 2012-02-02 to 2012-02-03
804                                 workLog = t[0]
805                                 DupeShareHACK = t[1]
806                                 t = None
807                         else:
808                                 if isinstance(t, dict):
809                                         # Old format, from 2012-02-03 to 2012-02-03
810                                         DupeShareHACK = t
811                                         t = None
812                                 else:
813                                         # Current format, from 2012-02-03 onward
814                                         DupeShareHACK = pickle.load(f)
815                                 
816                                 if t + 120 >= time():
817                                         workLog = pickle.load(f)
818                                 else:
819                                         logger.debug('Skipping restore of expired workLog')
820         except:
821                 logger.error('Failed to restore state\n' + traceback.format_exc())
822                 return
823         logger.info('State restored successfully')
824         if t:
825                 logger.info('Total downtime: %g seconds' % (time() - t,))
826
827
828 from jsonrpcserver import JSONRPCListener, JSONRPCServer
829 import interactivemode
830 from networkserver import NetworkListener
831 import threading
832 import sharelogging
833 import authentication
834 from stratumserver import StratumServer
835 import imp
836
837 if __name__ == "__main__":
838         if not hasattr(config, 'ShareLogging'):
839                 config.ShareLogging = ()
840         if hasattr(config, 'DbOptions'):
841                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
842                 config.ShareLogging = list(config.ShareLogging)
843                 config.ShareLogging.append( {
844                         'type': 'sql',
845                         'engine': 'postgres',
846                         'dbopts': config.DbOptions,
847                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
848                 } )
849         for i in config.ShareLogging:
850                 if not hasattr(i, 'keys'):
851                         name, parameters = i
852                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
853                         if name == 'postgres':
854                                 name = 'sql'
855                                 i = {
856                                         'engine': 'postgres',
857                                         'dbopts': parameters,
858                                 }
859                         elif name == 'logfile':
860                                 i = {}
861                                 i['thropts'] = parameters
862                                 if 'filename' in parameters:
863                                         i['filename'] = parameters['filename']
864                                         i['thropts'] = dict(i['thropts'])
865                                         del i['thropts']['filename']
866                         else:
867                                 i = parameters
868                         i['type'] = name
869                 
870                 name = i['type']
871                 parameters = i
872                 try:
873                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
874                         m = imp.load_module(name, fp, pathname, description)
875                         lo = getattr(m, name)(**parameters)
876                         loggersShare.append(lo)
877                 except:
878                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
879         
880         if not hasattr(config, 'Authentication'):
881                 config.Authentication = ({'module': 'allowall'},)
882         
883         for i in config.Authentication:
884                 name = i['module']
885                 parameters = i
886                 try:
887                         fp, pathname, description = imp.find_module(name, authentication.__path__)
888                         m = imp.load_module(name, fp, pathname, description)
889                         lo = getattr(m, name)(**parameters)
890                         authenticators.append(lo)
891                 except:
892                         logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
893         
894         LSbc = []
895         if not hasattr(config, 'BitcoinNodeAddresses'):
896                 config.BitcoinNodeAddresses = ()
897         for a in config.BitcoinNodeAddresses:
898                 LSbc.append(NetworkListener(bcnode, a))
899         
900         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
901                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
902         
903         import jsonrpc_getblocktemplate
904         import jsonrpc_getwork
905         import jsonrpc_setworkaux
906         
907         server = JSONRPCServer()
908         server.tls = threading.local()
909         server.tls.wantClear = False
910         if hasattr(config, 'JSONRPCAddress'):
911                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
912                 if not hasattr(config, 'JSONRPCAddresses'):
913                         config.JSONRPCAddresses = []
914                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
915         LS = []
916         for a in config.JSONRPCAddresses:
917                 LS.append(JSONRPCListener(server, a))
918         if hasattr(config, 'SecretUser'):
919                 server.SecretUser = config.SecretUser
920         server.aux = MM.CoinbaseAux
921         server.getBlockHeader = getBlockHeader
922         server.getBlockTemplate = getBlockTemplate
923         server.receiveShare = receiveShare
924         server.RaiseRedFlags = RaiseRedFlags
925         server.ShareTarget = config.ShareTarget
926         server.checkAuthentication = checkAuthentication
927         
928         if hasattr(config, 'TrustedForwarders'):
929                 server.TrustedForwarders = config.TrustedForwarders
930         server.ServerName = config.ServerName
931         
932         stratumsrv = StratumServer()
933         stratumsrv.getStratumJob = getStratumJob
934         stratumsrv.getExistingStratumJob = getExistingStratumJob
935         stratumsrv.receiveShare = receiveShare
936         stratumsrv.getTarget = getTarget
937         stratumsrv.defaultTarget = config.ShareTarget
938         stratumsrv.IsJobValid = IsJobValid
939         stratumsrv.checkAuthentication = checkAuthentication
940         if not hasattr(config, 'StratumAddresses'):
941                 config.StratumAddresses = ()
942         for a in config.StratumAddresses:
943                 NetworkListener(stratumsrv, a)
944         
945         MM.start()
946         
947         restoreState(config.SaveStateFilename)
948         
949         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
950         prune_thr.daemon = True
951         prune_thr.start()
952         
953         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
954         bcnode_thr.daemon = True
955         bcnode_thr.start()
956         
957         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
958         stratum_thr.daemon = True
959         stratum_thr.start()
960         
961         server.serve_forever()