StratumHandler: Safety check against issuing the same SessionId to multiple clients
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # Portions written by Peter Leurs <kinlo@triplemining.com>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Affero General Public License for more details.
15 #
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import argparse
20 import importlib
21 argparser = argparse.ArgumentParser()
22 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
23 args = argparser.parse_args()
24 configmod = 'config'
25 if not args.config is None:
26         configmod = 'config_%s' % (args.config,)
27 __import__(configmod)
28 config = importlib.import_module(configmod)
29
30 if not hasattr(config, 'ServerName'):
31         config.ServerName = 'Unnamed Eloipool'
32
33 if not hasattr(config, 'ShareTarget'):
34         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
35
36
37 import logging
38 import logging.handlers
39
40 rootlogger = logging.getLogger(None)
41 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
42 logformatter = logging.Formatter(logformat)
43 if len(rootlogger.handlers) == 0:
44         logging.basicConfig(
45                 format=logformat,
46                 level=logging.DEBUG,
47         )
48         for infoOnly in (
49                 'checkShare',
50                 'getTarget',
51                 'JSONRPCHandler',
52                 'JSONRPCServer',
53                 'merkleMaker',
54                 'StratumServer',
55                 'Waker for JSONRPCServer',
56                 'Waker for StratumServer',
57                 'WorkLogPruner'
58         ):
59                 logging.getLogger(infoOnly).setLevel(logging.INFO)
60 if getattr(config, 'LogToSysLog', False):
61         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
62         rootlogger.addHandler(sysloghandler)
63 if hasattr(config, 'LogFile'):
64         if isinstance(config.LogFile, str):
65                 filehandler = logging.FileHandler(config.LogFile)
66         else:
67                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
68         filehandler.setFormatter(logformatter)
69         rootlogger.addHandler(filehandler)
70
71 def RaiseRedFlags(reason):
72         logging.getLogger('redflag').critical(reason)
73         return reason
74
75
76 from bitcoin.node import BitcoinLink, BitcoinNode
77 bcnode = BitcoinNode(config.UpstreamNetworkId)
78 bcnode.userAgent += b'Eloipool:0.1/'
79 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80
81 import jsonrpc
82
83 try:
84         import jsonrpc.authproxy
85         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
86 except:
87         pass
88
89
90 from bitcoin.script import BitcoinScript
91 from bitcoin.txn import Txn
92 from base58 import b58decode
93 from binascii import b2a_hex
94 from struct import pack
95 import subprocess
96 from time import time
97
98 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True, prevBlockHex = None):
99         txn = Txn.new()
100         
101         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
102                 coinbased = 0
103                 try:
104                         cmd = config.CoinbaserCmd
105                         cmd = cmd.replace('%d', str(coinbaseValue))
106                         cmd = cmd.replace('%p', prevBlockHex or '""')
107                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
108                         nout = int(p.stdout.readline())
109                         for i in range(nout):
110                                 amount = int(p.stdout.readline())
111                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
112                                 pkScript = BitcoinScript.toAddress(addr)
113                                 txn.addOutput(amount, pkScript)
114                                 coinbased += amount
115                 except:
116                         coinbased = coinbaseValue + 1
117                 if coinbased >= coinbaseValue:
118                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
119                         txn.outputs = []
120                 else:
121                         coinbaseValue -= coinbased
122         
123         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
124         txn.addOutput(coinbaseValue, pkScript)
125         
126         # TODO
127         # TODO: red flag on dupe coinbase
128         return txn
129
130
131 import jsonrpc_getwork
132 from util import Bits2Target
133
134 workLog = {}
135 userStatus = {}
136 networkTarget = None
137 DupeShareHACK = {}
138
139 server = None
140 stratumsrv = None
141 def updateBlocks():
142         server.wakeLongpoll()
143         stratumsrv.updateJob()
144
145 def blockChanged():
146         global MM, networkTarget, server
147         bits = MM.currentBlock[2]
148         if bits is None:
149                 networkTarget = None
150         else:
151                 networkTarget = Bits2Target(bits)
152         if MM.lastBlock != (None, None, None):
153                 global DupeShareHACK
154                 DupeShareHACK = {}
155                 jsonrpc_getwork._CheckForDupesHACK = {}
156                 workLog.clear()
157         server.wakeLongpoll(wantClear=True)
158         stratumsrv.updateJob(wantClear=True)
159
160
161 from time import sleep, time
162 import traceback
163
164 def _WorkLogPruner_I(wl):
165         now = time()
166         pruned = 0
167         for username in wl:
168                 userwork = wl[username]
169                 for wli in tuple(userwork.keys()):
170                         if now > userwork[wli][1] + 120:
171                                 del userwork[wli]
172                                 pruned += 1
173         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
174
175 def WorkLogPruner(wl):
176         while True:
177                 try:
178                         sleep(60)
179                         _WorkLogPruner_I(wl)
180                 except:
181                         WorkLogPruner.logger.error(traceback.format_exc())
182 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
183
184
185 from merklemaker import merkleMaker
186 MM = merkleMaker()
187 MM.__dict__.update(config.__dict__)
188 MM.makeCoinbaseTxn = makeCoinbaseTxn
189 MM.onBlockChange = blockChanged
190 MM.onBlockUpdate = updateBlocks
191
192
193 from binascii import b2a_hex
194 from copy import deepcopy
195 from math import ceil, log
196 from merklemaker import MakeBlockHeader
197 from struct import pack, unpack
198 import threading
199 from time import time
200 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
201 import jsonrpc
202 import traceback
203
204 gotwork = None
205 if hasattr(config, 'GotWorkURI'):
206         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
207
208 if not hasattr(config, 'DelayLogForUpstream'):
209         config.DelayLogForUpstream = False
210
211 if not hasattr(config, 'DynamicTargetting'):
212         config.DynamicTargetting = 0
213 else:
214         if not hasattr(config, 'DynamicTargetWindow'):
215                 config.DynamicTargetWindow = 120
216         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
217
218 def submitGotwork(info):
219         try:
220                 gotwork.gotwork(info)
221         except:
222                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
223
224 def clampTarget(target, DTMode):
225         # ShareTarget is the minimum
226         if target is None or target > config.ShareTarget:
227                 target = config.ShareTarget
228         
229         # Never target above the network, as we'd lose blocks
230         if target < networkTarget:
231                 target = networkTarget
232         
233         if DTMode == 2:
234                 # Ceil target to a power of two :)
235                 truebits = log(target, 2)
236                 if target <= 2**int(truebits):
237                         # Workaround for bug in Python's math.log function
238                         truebits = int(truebits)
239                 target = 2**ceil(truebits) - 1
240         elif DTMode == 3:
241                 # Round target to multiple of bdiff 1
242                 target = bdiff1target / int(round(target2bdiff(target)))
243         
244         # Return None for ShareTarget to save memory
245         if target == config.ShareTarget:
246                 return None
247         return target
248
249 def getTarget(username, now, DTMode = None, RequestedTarget = None):
250         if DTMode is None:
251                 DTMode = config.DynamicTargetting
252         if not DTMode:
253                 return None
254         if username in userStatus:
255                 status = userStatus[username]
256         else:
257                 # No record, use default target
258                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
259                 userStatus[username] = [RequestedTarget, now, 0]
260                 return RequestedTarget
261         (targetIn, lastUpdate, work) = status
262         if work <= config.DynamicTargetGoal:
263                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
264                         # No reason to change it just yet
265                         return clampTarget(targetIn, DTMode)
266                 if not work:
267                         # No shares received, reset to minimum
268                         if targetIn:
269                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
270                                 userStatus[username] = [None, now, 0]
271                         return clampTarget(None, DTMode)
272         
273         deltaSec = now - lastUpdate
274         target = targetIn or config.ShareTarget
275         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
276         target = clampTarget(target, DTMode)
277         if target != targetIn:
278                 pfx = 'Retargetting %s' % (repr(username),)
279                 tin = targetIn or config.ShareTarget
280                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
281                 tgt = target or config.ShareTarget
282                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
283         userStatus[username] = [target, now, 0]
284         return target
285 getTarget.logger = logging.getLogger('getTarget')
286
287 def TopTargets(n = 0x10):
288         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
289         tmp.sort(key=lambda k: -userStatus[k][0])
290         tmp2 = {}
291         def t2d(t):
292                 if t not in tmp2:
293                         tmp2[t] = target2pdiff(t)
294                 return tmp2[t]
295         for k in tmp[-n:]:
296                 tgt = userStatus[k][0]
297                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
298
299 def RegisterWork(username, wli, wld, RequestedTarget = None):
300         now = time()
301         target = getTarget(username, now, RequestedTarget=RequestedTarget)
302         wld = tuple(wld) + (target,)
303         workLog.setdefault(username, {})[wli] = (wld, now)
304         return target or config.ShareTarget
305
306 def getBlockHeader(username):
307         MRD = MM.getMRD()
308         merkleRoot = MRD[0]
309         hdr = MakeBlockHeader(MRD)
310         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
311         target = RegisterWork(username, merkleRoot, MRD)
312         return (hdr, workLog[username][merkleRoot], target)
313
314 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
315         if server.tls.wantClear:
316                 wantClear = True
317         elif p_magic and username not in workLog:
318                 wantClear = True
319                 p_magic[0] = True
320         else:
321                 wantClear = False
322         MC = MM.getMC(wantClear)
323         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
324         wliPos = coinbase[0] + 2
325         wliLen = coinbase[wliPos - 1]
326         wli = coinbase[wliPos:wliPos+wliLen]
327         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
328         return (MC, workLog[username][wli], target)
329
330 def getStratumJob(jobid, wantClear = False):
331         MC = MM.getMC(wantClear)
332         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
333         now = time()
334         workLog.setdefault(None, {})[jobid] = (MC, now)
335         return (MC, workLog[None][jobid])
336
337 def getExistingStratumJob(jobid):
338         wld = workLog[None][jobid]
339         return (wld[0], wld)
340
341 loggersShare = []
342 authenticators = []
343
344 RBDs = []
345 RBPs = []
346
347 from bitcoin.varlen import varlenEncode, varlenDecode
348 import bitcoin.txn
349 from merklemaker import assembleBlock
350
351 if not hasattr(config, 'BlockSubmissions'):
352         config.BlockSubmissions = None
353
354 RBFs = []
355 def blockSubmissionThread(payload, blkhash, share):
356         if config.BlockSubmissions is None:
357                 servers = list(a for b in MM.TemplateSources for a in b)
358         else:
359                 servers = list(config.BlockSubmissions)
360         
361         if hasattr(share['merkletree'], 'source_uri'):
362                 servers.insert(0, {
363                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
364                         'name': share['merkletree'].source,
365                 })
366         elif not servers:
367                 servers = list(a for b in MM.TemplateSources for a in b)
368         
369         myblock = (blkhash, payload[4:36])
370         payload = b2a_hex(payload).decode('ascii')
371         nexterr = 0
372         tries = 0
373         success = False
374         while len(servers):
375                 tries += 1
376                 TS = servers.pop(0)
377                 UpstreamBitcoindJSONRPC = TS['access']
378                 try:
379                         # BIP 22 standard submitblock
380                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
381                 except BaseException as gbterr:
382                         gbterr_fmt = traceback.format_exc()
383                         try:
384                                 try:
385                                         # bitcoind 0.5/0.6 getmemorypool
386                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
387                                 except:
388                                         # Old BIP 22 draft getmemorypool
389                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
390                                 if reason is True:
391                                         reason = None
392                                 elif reason is False:
393                                         reason = 'rejected'
394                         except BaseException as gmperr:
395                                 now = time()
396                                 if now > nexterr:
397                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
398                                         RaiseRedFlags(gbterr_fmt)
399                                         nexterr = now + 5
400                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
401                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
402                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
403                                         if share['upstreamRejectReason'] is PendingUpstream:
404                                                 share['upstreamRejectReason'] = 'GAVE UP'
405                                                 share['upstreamResult'] = False
406                                                 logShare(share)
407                                         return
408                                 
409                                 servers.append(TS)
410                                 continue
411                 
412                 # At this point, we have a reason back
413                 if reason:
414                         # FIXME: The returned value could be a list of multiple responses
415                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
416                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
417                                 # no big deal
418                                 blockSubmissionThread.logger.debug(msg)
419                         else:
420                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
421                                 RaiseRedFlags(msg)
422                 else:
423                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
424                         success = True
425                 if share['upstreamRejectReason'] is PendingUpstream:
426                         share['upstreamRejectReason'] = reason
427                         share['upstreamResult'] = not reason
428                         logShare(share)
429 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
430
431 def checkData(share):
432         data = share['data']
433         data = data[:80]
434         (prevBlock, height, bits) = MM.currentBlock
435         sharePrevBlock = data[4:36]
436         if sharePrevBlock != prevBlock:
437                 if sharePrevBlock == MM.lastBlock[0]:
438                         raise RejectedShare('stale-prevblk')
439                 raise RejectedShare('bad-prevblk')
440         
441         if data[72:76] != bits:
442                 raise RejectedShare('bad-diffbits')
443         
444         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
445         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
446         if data[1:4] != b'\0\0\0' or data[0] > 2:
447                 raise RejectedShare('bad-version')
448
449 def buildStratumData(share, merkleroot):
450         (prevBlock, height, bits) = MM.currentBlock
451         
452         data = b'\x02\0\0\0'
453         data += prevBlock
454         data += merkleroot
455         data += share['ntime'][::-1]
456         data += bits
457         data += share['nonce'][::-1]
458         
459         share['data'] = data
460         return data
461
462 def IsJobValid(wli, wluser = None):
463         if wluser not in workLog:
464                 return False
465         if wli not in workLog[wluser]:
466                 return False
467         (wld, issueT) = workLog[wluser][wli]
468         if time() < issueT - 120:
469                 return False
470         return True
471
472 def checkShare(share):
473         shareTime = share['time'] = time()
474         
475         username = share['username']
476         if 'data' in share:
477                 # getwork/GBT
478                 checkData(share)
479                 data = share['data']
480                 
481                 if username not in workLog:
482                         raise RejectedShare('unknown-user')
483                 MWL = workLog[username]
484                 
485                 shareMerkleRoot = data[36:68]
486                 if 'blkdata' in share:
487                         pl = share['blkdata']
488                         (txncount, pl) = varlenDecode(pl)
489                         cbtxn = bitcoin.txn.Txn(pl)
490                         othertxndata = cbtxn.disassemble(retExtra=True)
491                         coinbase = cbtxn.getCoinbase()
492                         wliPos = coinbase[0] + 2
493                         wliLen = coinbase[wliPos - 1]
494                         wli = coinbase[wliPos:wliPos+wliLen]
495                         mode = 'MC'
496                         moden = 1
497                 else:
498                         wli = shareMerkleRoot
499                         mode = 'MRD'
500                         moden = 0
501                         coinbase = None
502         else:
503                 # Stratum
504                 MWL = workLog[None]
505                 wli = share['jobid']
506                 buildStratumData(share, b'\0' * 32)
507                 mode = 'MC'
508                 moden = 1
509                 othertxndata = b''
510         
511         if wli not in MWL:
512                 raise RejectedShare('unknown-work')
513         (wld, issueT) = MWL[wli]
514         share[mode] = wld
515         
516         share['issuetime'] = issueT
517         
518         (workMerkleTree, workCoinbase) = wld[1:3]
519         share['merkletree'] = workMerkleTree
520         if 'jobid' in share:
521                 cbtxn = deepcopy(workMerkleTree.data[0])
522                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
523                 cbtxn.setCoinbase(coinbase)
524                 cbtxn.assemble()
525                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
526                 shareMerkleRoot = data[36:68]
527         
528         if data in DupeShareHACK:
529                 raise RejectedShare('duplicate')
530         DupeShareHACK[data] = None
531         
532         blkhash = dblsha(data)
533         if blkhash[28:] != b'\0\0\0\0':
534                 raise RejectedShare('H-not-zero')
535         blkhashn = LEhash2int(blkhash)
536         
537         global networkTarget
538         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
539         logfunc('BLKHASH: %64x' % (blkhashn,))
540         logfunc(' TARGET: %64x' % (networkTarget,))
541         
542         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
543         txlist = workMerkleTree.data
544         txlist = [deepcopy(txlist[0]),] + txlist[1:]
545         cbtxn = txlist[0]
546         cbtxn.setCoinbase(coinbase or workCoinbase)
547         cbtxn.assemble()
548         
549         if blkhashn <= networkTarget:
550                 logfunc("Submitting upstream")
551                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
552                 if not moden:
553                         payload = assembleBlock(data, txlist)
554                 else:
555                         payload = share['data']
556                         if len(othertxndata):
557                                 payload += share['blkdata']
558                         else:
559                                 payload += assembleBlock(data, txlist)[80:]
560                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
561                 RBPs.append(payload)
562                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
563                 bcnode.submitBlock(payload)
564                 if config.DelayLogForUpstream:
565                         share['upstreamRejectReason'] = PendingUpstream
566                 else:
567                         share['upstreamRejectReason'] = None
568                         share['upstreamResult'] = True
569                 MM.updateBlock(blkhash)
570         
571         # Gotwork hack...
572         if gotwork and blkhashn <= config.GotWorkTarget:
573                 try:
574                         coinbaseMrkl = cbtxn.data
575                         coinbaseMrkl += blkhash
576                         steps = workMerkleTree._steps
577                         coinbaseMrkl += pack('B', len(steps))
578                         for step in steps:
579                                 coinbaseMrkl += step
580                         coinbaseMrkl += b"\0\0\0\0"
581                         info = {}
582                         info['hash'] = b2a_hex(blkhash).decode('ascii')
583                         info['header'] = b2a_hex(data).decode('ascii')
584                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
585                         thr = threading.Thread(target=submitGotwork, args=(info,))
586                         thr.daemon = True
587                         thr.start()
588                 except:
589                         checkShare.logger.warning('Failed to build gotwork request')
590         
591         if 'target' in share:
592                 workTarget = share['target']
593         elif len(wld) > 6:
594                 workTarget = wld[6]
595         else:
596                 workTarget = None
597         
598         if workTarget is None:
599                 workTarget = config.ShareTarget
600         if blkhashn > workTarget:
601                 raise RejectedShare('high-hash')
602         share['target'] = workTarget
603         share['_targethex'] = '%064x' % (workTarget,)
604         
605         shareTimestamp = unpack('<L', data[68:72])[0]
606         if shareTime < issueT - 120:
607                 raise RejectedShare('stale-work')
608         if shareTimestamp < shareTime - 300:
609                 raise RejectedShare('time-too-old')
610         if shareTimestamp > shareTime + 7200:
611                 raise RejectedShare('time-too-new')
612         
613         if config.DynamicTargetting and username in userStatus:
614                 # NOTE: userStatus[username] only doesn't exist across restarts
615                 status = userStatus[username]
616                 target = status[0] or config.ShareTarget
617                 if target == workTarget:
618                         userStatus[username][2] += 1
619                 else:
620                         userStatus[username][2] += float(target) / workTarget
621         
622         if moden:
623                 cbpre = workCoinbase
624                 cbpreLen = len(cbpre)
625                 if coinbase[:cbpreLen] != cbpre:
626                         raise RejectedShare('bad-cb-prefix')
627                 
628                 # Filter out known "I support" flags, to prevent exploits
629                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
630                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
631                                 raise RejectedShare('bad-cb-flag')
632                 
633                 if len(coinbase) > 100:
634                         raise RejectedShare('bad-cb-length')
635                 
636                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
637                         raise RejectedShare('bad-txnmrklroot')
638                 
639                 if len(othertxndata):
640                         allowed = assembleBlock(data, txlist)[80:]
641                         if allowed != share['blkdata']:
642                                 raise RejectedShare('bad-txns')
643 checkShare.logger = logging.getLogger('checkShare')
644
645 def logShare(share):
646         if '_origdata' in share:
647                 share['solution'] = share['_origdata']
648         else:
649                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
650         for i in loggersShare:
651                 i.logShare(share)
652
653 def checkAuthentication(username, password):
654         # HTTPServer uses bytes, and StratumServer uses str
655         if hasattr(username, 'decode'): username = username.decode('utf8')
656         if hasattr(password, 'decode'): password = password.decode('utf8')
657         
658         for i in authenticators:
659                 if i.checkAuthentication(username, password):
660                         return True
661         return False
662
663 def receiveShare(share):
664         # TODO: username => userid
665         try:
666                 checkShare(share)
667         except RejectedShare as rej:
668                 share['rejectReason'] = str(rej)
669                 raise
670         except BaseException as e:
671                 share['rejectReason'] = 'ERROR'
672                 raise
673         finally:
674                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
675                         logShare(share)
676
677 def newBlockNotification():
678         logging.getLogger('newBlockNotification').info('Received new block notification')
679         MM.updateMerkleTree()
680         # TODO: Force RESPOND TO LONGPOLLS?
681         pass
682
683 def newBlockNotificationSIGNAL(signum, frame):
684         # Use a new thread, in case the signal handler is called with locks held
685         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
686         thr.daemon = True
687         thr.start()
688
689 from signal import signal, SIGUSR1
690 signal(SIGUSR1, newBlockNotificationSIGNAL)
691
692
693 import os
694 import os.path
695 import pickle
696 import signal
697 import sys
698 from time import sleep
699 import traceback
700
701 if getattr(config, 'SaveStateFilename', None) is None:
702         config.SaveStateFilename = 'eloipool.worklog'
703
704 def stopServers():
705         logger = logging.getLogger('stopServers')
706         
707         if hasattr(stopServers, 'already'):
708                 logger.debug('Already tried to stop servers before')
709                 return
710         stopServers.already = True
711         
712         logger.info('Stopping servers...')
713         global bcnode, server
714         servers = (bcnode, server, stratumsrv)
715         for s in servers:
716                 s.keepgoing = False
717         for s in servers:
718                 try:
719                         s.wakeup()
720                 except:
721                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
722         i = 0
723         while True:
724                 sl = []
725                 for s in servers:
726                         if s.running:
727                                 sl.append(s.__class__.__name__)
728                 if not sl:
729                         break
730                 i += 1
731                 if i >= 0x100:
732                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
733                         break
734                 sleep(0.01)
735         
736         for s in servers:
737                 for fd in s._fd.keys():
738                         os.close(fd)
739
740 def stopLoggers():
741         for i in loggersShare:
742                 if hasattr(i, 'stop'):
743                         i.stop()
744
745 def saveState(SAVE_STATE_FILENAME, t = None):
746         logger = logging.getLogger('saveState')
747         
748         # Then, save data needed to resume work
749         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
750         i = 0
751         while True:
752                 try:
753                         with open(SAVE_STATE_FILENAME, 'wb') as f:
754                                 pickle.dump(t, f)
755                                 pickle.dump(DupeShareHACK, f)
756                                 pickle.dump(workLog, f)
757                         break
758                 except:
759                         i += 1
760                         if i >= 0x10000:
761                                 logger.error('Failed to save work\n' + traceback.format_exc())
762                                 try:
763                                         os.unlink(SAVE_STATE_FILENAME)
764                                 except:
765                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
766
767 def exit():
768         t = time()
769         stopServers()
770         stopLoggers()
771         saveState(config.SaveStateFilename, t=t)
772         logging.getLogger('exit').info('Goodbye...')
773         os.kill(os.getpid(), signal.SIGTERM)
774         sys.exit(0)
775
776 def restart():
777         t = time()
778         stopServers()
779         stopLoggers()
780         saveState(config.SaveStateFilename, t=t)
781         logging.getLogger('restart').info('Restarting...')
782         try:
783                 os.execv(sys.argv[0], sys.argv)
784         except:
785                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
786
787 def restoreState(SAVE_STATE_FILENAME):
788         if not os.path.exists(SAVE_STATE_FILENAME):
789                 return
790         
791         global workLog, DupeShareHACK
792         
793         logger = logging.getLogger('restoreState')
794         s = os.stat(SAVE_STATE_FILENAME)
795         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
796         try:
797                 with open(SAVE_STATE_FILENAME, 'rb') as f:
798                         t = pickle.load(f)
799                         if type(t) == tuple:
800                                 if len(t) > 2:
801                                         # Future formats, not supported here
802                                         ver = t[3]
803                                         # TODO
804                                 
805                                 # Old format, from 2012-02-02 to 2012-02-03
806                                 workLog = t[0]
807                                 DupeShareHACK = t[1]
808                                 t = None
809                         else:
810                                 if isinstance(t, dict):
811                                         # Old format, from 2012-02-03 to 2012-02-03
812                                         DupeShareHACK = t
813                                         t = None
814                                 else:
815                                         # Current format, from 2012-02-03 onward
816                                         DupeShareHACK = pickle.load(f)
817                                 
818                                 if t + 120 >= time():
819                                         workLog = pickle.load(f)
820                                 else:
821                                         logger.debug('Skipping restore of expired workLog')
822         except:
823                 logger.error('Failed to restore state\n' + traceback.format_exc())
824                 return
825         logger.info('State restored successfully')
826         if t:
827                 logger.info('Total downtime: %g seconds' % (time() - t,))
828
829
830 from jsonrpcserver import JSONRPCListener, JSONRPCServer
831 import interactivemode
832 from networkserver import NetworkListener
833 import threading
834 import sharelogging
835 import authentication
836 from stratumserver import StratumServer
837 import imp
838
839 if __name__ == "__main__":
840         if not hasattr(config, 'ShareLogging'):
841                 config.ShareLogging = ()
842         if hasattr(config, 'DbOptions'):
843                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
844                 config.ShareLogging = list(config.ShareLogging)
845                 config.ShareLogging.append( {
846                         'type': 'sql',
847                         'engine': 'postgres',
848                         'dbopts': config.DbOptions,
849                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
850                 } )
851         for i in config.ShareLogging:
852                 if not hasattr(i, 'keys'):
853                         name, parameters = i
854                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
855                         if name == 'postgres':
856                                 name = 'sql'
857                                 i = {
858                                         'engine': 'postgres',
859                                         'dbopts': parameters,
860                                 }
861                         elif name == 'logfile':
862                                 i = {}
863                                 i['thropts'] = parameters
864                                 if 'filename' in parameters:
865                                         i['filename'] = parameters['filename']
866                                         i['thropts'] = dict(i['thropts'])
867                                         del i['thropts']['filename']
868                         else:
869                                 i = parameters
870                         i['type'] = name
871                 
872                 name = i['type']
873                 parameters = i
874                 try:
875                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
876                         m = imp.load_module(name, fp, pathname, description)
877                         lo = getattr(m, name)(**parameters)
878                         loggersShare.append(lo)
879                 except:
880                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
881         
882         if not hasattr(config, 'Authentication'):
883                 config.Authentication = ({'module': 'allowall'},)
884         
885         for i in config.Authentication:
886                 name = i['module']
887                 parameters = i
888                 try:
889                         fp, pathname, description = imp.find_module(name, authentication.__path__)
890                         m = imp.load_module(name, fp, pathname, description)
891                         lo = getattr(m, name)(**parameters)
892                         authenticators.append(lo)
893                 except:
894                         logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
895         
896         LSbc = []
897         if not hasattr(config, 'BitcoinNodeAddresses'):
898                 config.BitcoinNodeAddresses = ()
899         for a in config.BitcoinNodeAddresses:
900                 LSbc.append(NetworkListener(bcnode, a))
901         
902         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
903                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
904         
905         import jsonrpc_getblocktemplate
906         import jsonrpc_getwork
907         import jsonrpc_setworkaux
908         
909         server = JSONRPCServer()
910         server.tls = threading.local()
911         server.tls.wantClear = False
912         if hasattr(config, 'JSONRPCAddress'):
913                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
914                 if not hasattr(config, 'JSONRPCAddresses'):
915                         config.JSONRPCAddresses = []
916                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
917         LS = []
918         for a in config.JSONRPCAddresses:
919                 LS.append(JSONRPCListener(server, a))
920         if hasattr(config, 'SecretUser'):
921                 server.SecretUser = config.SecretUser
922         server.aux = MM.CoinbaseAux
923         server.getBlockHeader = getBlockHeader
924         server.getBlockTemplate = getBlockTemplate
925         server.receiveShare = receiveShare
926         server.RaiseRedFlags = RaiseRedFlags
927         server.ShareTarget = config.ShareTarget
928         server.checkAuthentication = checkAuthentication
929         
930         if hasattr(config, 'TrustedForwarders'):
931                 server.TrustedForwarders = config.TrustedForwarders
932         server.ServerName = config.ServerName
933         
934         stratumsrv = StratumServer()
935         stratumsrv.getStratumJob = getStratumJob
936         stratumsrv.getExistingStratumJob = getExistingStratumJob
937         stratumsrv.receiveShare = receiveShare
938         stratumsrv.RaiseRedFlags = RaiseRedFlags
939         stratumsrv.getTarget = getTarget
940         stratumsrv.defaultTarget = config.ShareTarget
941         stratumsrv.IsJobValid = IsJobValid
942         stratumsrv.checkAuthentication = checkAuthentication
943         if not hasattr(config, 'StratumAddresses'):
944                 config.StratumAddresses = ()
945         for a in config.StratumAddresses:
946                 NetworkListener(stratumsrv, a)
947         
948         MM.start()
949         
950         restoreState(config.SaveStateFilename)
951         
952         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
953         prune_thr.daemon = True
954         prune_thr.start()
955         
956         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
957         bcnode_thr.daemon = True
958         bcnode_thr.start()
959         
960         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
961         stratum_thr.daemon = True
962         stratum_thr.start()
963         
964         server.serve_forever()