stratumserver: Truncate overlong extranonce2 in share submissions to workaround BitFo...
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # Portions written by Peter Leurs <kinlo@triplemining.com>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Affero General Public License for more details.
15 #
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import argparse
20 import importlib
21 argparser = argparse.ArgumentParser()
22 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
23 args = argparser.parse_args()
24 configmod = 'config'
25 if not args.config is None:
26         configmod = 'config_%s' % (args.config,)
27 __import__(configmod)
28 config = importlib.import_module(configmod)
29
30 if not hasattr(config, 'ServerName'):
31         config.ServerName = 'Unnamed Eloipool'
32
33 if not hasattr(config, 'ShareTarget'):
34         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
35
36
37 import logging
38 import logging.handlers
39
40 rootlogger = logging.getLogger(None)
41 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
42 logformatter = logging.Formatter(logformat)
43 if len(rootlogger.handlers) == 0:
44         logging.basicConfig(
45                 format=logformat,
46                 level=logging.DEBUG,
47         )
48         for infoOnly in (
49                 'checkShare',
50                 'getTarget',
51                 'JSONRPCHandler',
52                 'JSONRPCServer',
53                 'merkleMaker',
54                 'StratumServer',
55                 'Waker for JSONRPCServer',
56                 'Waker for StratumServer',
57                 'WorkLogPruner'
58         ):
59                 logging.getLogger(infoOnly).setLevel(logging.INFO)
60 if getattr(config, 'LogToSysLog', False):
61         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
62         rootlogger.addHandler(sysloghandler)
63 if hasattr(config, 'LogFile'):
64         if isinstance(config.LogFile, str):
65                 filehandler = logging.FileHandler(config.LogFile)
66         else:
67                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
68         filehandler.setFormatter(logformatter)
69         rootlogger.addHandler(filehandler)
70
71 def RaiseRedFlags(reason):
72         logging.getLogger('redflag').critical(reason)
73         return reason
74
75
76 from bitcoin.node import BitcoinLink, BitcoinNode
77 bcnode = BitcoinNode(config.UpstreamNetworkId)
78 bcnode.userAgent += b'Eloipool:0.1/'
79 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80
81 import jsonrpc
82
83 try:
84         import jsonrpc.authproxy
85         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
86 except:
87         pass
88
89
90 from bitcoin.script import BitcoinScript
91 from bitcoin.txn import Txn
92 from base58 import b58decode
93 from binascii import b2a_hex
94 from struct import pack
95 import subprocess
96 from time import time
97
98 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True, prevBlockHex = None):
99         txn = Txn.new()
100         
101         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
102                 coinbased = 0
103                 try:
104                         cmd = config.CoinbaserCmd
105                         cmd = cmd.replace('%d', str(coinbaseValue))
106                         cmd = cmd.replace('%p', prevBlockHex or '""')
107                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
108                         nout = int(p.stdout.readline())
109                         for i in range(nout):
110                                 amount = int(p.stdout.readline())
111                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
112                                 pkScript = BitcoinScript.toAddress(addr)
113                                 txn.addOutput(amount, pkScript)
114                                 coinbased += amount
115                 except:
116                         coinbased = coinbaseValue + 1
117                 if coinbased >= coinbaseValue:
118                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
119                         txn.outputs = []
120                 else:
121                         coinbaseValue -= coinbased
122         
123         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
124         txn.addOutput(coinbaseValue, pkScript)
125         
126         # TODO
127         # TODO: red flag on dupe coinbase
128         return txn
129
130
131 import jsonrpc_getwork
132 from util import Bits2Target
133
134 workLog = {}
135 userStatus = {}
136 networkTarget = None
137 DupeShareHACK = {}
138
139 server = None
140 stratumsrv = None
141 def updateBlocks():
142         server.wakeLongpoll()
143         stratumsrv.updateJob()
144
145 def blockChanged():
146         global MM, networkTarget, server
147         bits = MM.currentBlock[2]
148         if bits is None:
149                 networkTarget = None
150         else:
151                 networkTarget = Bits2Target(bits)
152         if MM.lastBlock != (None, None, None):
153                 global DupeShareHACK
154                 DupeShareHACK = {}
155                 jsonrpc_getwork._CheckForDupesHACK = {}
156                 workLog.clear()
157         server.wakeLongpoll(wantClear=True)
158         stratumsrv.updateJob(wantClear=True)
159
160
161 from time import sleep, time
162 import traceback
163
164 def _WorkLogPruner_I(wl):
165         now = time()
166         pruned = 0
167         for username in wl:
168                 userwork = wl[username]
169                 for wli in tuple(userwork.keys()):
170                         if now > userwork[wli][1] + 120:
171                                 del userwork[wli]
172                                 pruned += 1
173         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
174
175 def WorkLogPruner(wl):
176         while True:
177                 try:
178                         sleep(60)
179                         _WorkLogPruner_I(wl)
180                 except:
181                         WorkLogPruner.logger.error(traceback.format_exc())
182 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
183
184
185 from merklemaker import merkleMaker
186 MM = merkleMaker()
187 MM.__dict__.update(config.__dict__)
188 MM.makeCoinbaseTxn = makeCoinbaseTxn
189 MM.onBlockChange = blockChanged
190 MM.onBlockUpdate = updateBlocks
191
192
193 from binascii import b2a_hex
194 from copy import deepcopy
195 from math import ceil, log
196 from merklemaker import MakeBlockHeader
197 from struct import pack, unpack
198 import threading
199 from time import time
200 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
201 import jsonrpc
202 import traceback
203
204 gotwork = None
205 if hasattr(config, 'GotWorkURI'):
206         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
207
208 if not hasattr(config, 'DelayLogForUpstream'):
209         config.DelayLogForUpstream = False
210
211 if not hasattr(config, 'DynamicTargetting'):
212         config.DynamicTargetting = 0
213 else:
214         if not hasattr(config, 'DynamicTargetWindow'):
215                 config.DynamicTargetWindow = 120
216         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
217
218 def submitGotwork(info):
219         try:
220                 gotwork.gotwork(info)
221         except:
222                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
223
224 if not hasattr(config, 'GotWorkTarget'):
225         config.GotWorkTarget = 0
226
227 def clampTarget(target, DTMode):
228         # ShareTarget is the minimum
229         if target is None or target > config.ShareTarget:
230                 target = config.ShareTarget
231         
232         # Never target above upstream(s), as we'd lose blocks
233         target = max(target, networkTarget, config.GotWorkTarget)
234         
235         if DTMode == 2:
236                 # Ceil target to a power of two :)
237                 truebits = log(target, 2)
238                 if target <= 2**int(truebits):
239                         # Workaround for bug in Python's math.log function
240                         truebits = int(truebits)
241                 target = 2**ceil(truebits) - 1
242         elif DTMode == 3:
243                 # Round target to multiple of bdiff 1
244                 target = bdiff1target / int(round(target2bdiff(target)))
245         
246         # Return None for ShareTarget to save memory
247         if target == config.ShareTarget:
248                 return None
249         return target
250
251 def getTarget(username, now, DTMode = None, RequestedTarget = None):
252         if DTMode is None:
253                 DTMode = config.DynamicTargetting
254         if not DTMode:
255                 return None
256         if username in userStatus:
257                 status = userStatus[username]
258         else:
259                 # No record, use default target
260                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
261                 userStatus[username] = [RequestedTarget, now, 0]
262                 return RequestedTarget
263         (targetIn, lastUpdate, work) = status
264         if work <= config.DynamicTargetGoal:
265                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
266                         # No reason to change it just yet
267                         return clampTarget(targetIn, DTMode)
268                 if not work:
269                         # No shares received, reset to minimum
270                         if targetIn:
271                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
272                                 userStatus[username] = [None, now, 0]
273                         return clampTarget(None, DTMode)
274         
275         deltaSec = now - lastUpdate
276         target = targetIn or config.ShareTarget
277         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
278         target = clampTarget(target, DTMode)
279         if target != targetIn:
280                 pfx = 'Retargetting %s' % (repr(username),)
281                 tin = targetIn or config.ShareTarget
282                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
283                 tgt = target or config.ShareTarget
284                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
285         userStatus[username] = [target, now, 0]
286         return target
287 getTarget.logger = logging.getLogger('getTarget')
288
289 def TopTargets(n = 0x10):
290         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
291         tmp.sort(key=lambda k: -userStatus[k][0])
292         tmp2 = {}
293         def t2d(t):
294                 if t not in tmp2:
295                         tmp2[t] = target2pdiff(t)
296                 return tmp2[t]
297         for k in tmp[-n:]:
298                 tgt = userStatus[k][0]
299                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
300
301 def RegisterWork(username, wli, wld, RequestedTarget = None):
302         now = time()
303         target = getTarget(username, now, RequestedTarget=RequestedTarget)
304         wld = tuple(wld) + (target,)
305         workLog.setdefault(username, {})[wli] = (wld, now)
306         return target or config.ShareTarget
307
308 def getBlockHeader(username):
309         MRD = MM.getMRD()
310         merkleRoot = MRD[0]
311         hdr = MakeBlockHeader(MRD)
312         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
313         target = RegisterWork(username, merkleRoot, MRD)
314         return (hdr, workLog[username][merkleRoot], target)
315
316 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
317         if server.tls.wantClear:
318                 wantClear = True
319         elif p_magic and username not in workLog:
320                 wantClear = True
321                 p_magic[0] = True
322         else:
323                 wantClear = False
324         MC = MM.getMC(wantClear)
325         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
326         wliPos = coinbase[0] + 2
327         wliLen = coinbase[wliPos - 1]
328         wli = coinbase[wliPos:wliPos+wliLen]
329         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
330         return (MC, workLog[username][wli], target)
331
332 def getStratumJob(jobid, wantClear = False):
333         MC = MM.getMC(wantClear)
334         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
335         now = time()
336         workLog.setdefault(None, {})[jobid] = (MC, now)
337         return (MC, workLog[None][jobid])
338
339 def getExistingStratumJob(jobid):
340         wld = workLog[None][jobid]
341         return (wld[0], wld)
342
343 loggersShare = []
344 authenticators = []
345
346 RBDs = []
347 RBPs = []
348
349 from bitcoin.varlen import varlenEncode, varlenDecode
350 import bitcoin.txn
351 from merklemaker import assembleBlock
352
353 if not hasattr(config, 'BlockSubmissions'):
354         config.BlockSubmissions = None
355
356 RBFs = []
357 def blockSubmissionThread(payload, blkhash, share):
358         if config.BlockSubmissions is None:
359                 servers = list(a for b in MM.TemplateSources for a in b)
360         else:
361                 servers = list(config.BlockSubmissions)
362         
363         if hasattr(share['merkletree'], 'source_uri'):
364                 servers.insert(0, {
365                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
366                         'name': share['merkletree'].source,
367                 })
368         elif not servers:
369                 servers = list(a for b in MM.TemplateSources for a in b)
370         
371         myblock = (blkhash, payload[4:36])
372         payload = b2a_hex(payload).decode('ascii')
373         nexterr = 0
374         tries = 0
375         success = False
376         while len(servers):
377                 tries += 1
378                 TS = servers.pop(0)
379                 UpstreamBitcoindJSONRPC = TS['access']
380                 try:
381                         # BIP 22 standard submitblock
382                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
383                 except BaseException as gbterr:
384                         gbterr_fmt = traceback.format_exc()
385                         try:
386                                 try:
387                                         # bitcoind 0.5/0.6 getmemorypool
388                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
389                                 except:
390                                         # Old BIP 22 draft getmemorypool
391                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
392                                 if reason is True:
393                                         reason = None
394                                 elif reason is False:
395                                         reason = 'rejected'
396                         except BaseException as gmperr:
397                                 now = time()
398                                 if now > nexterr:
399                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
400                                         RaiseRedFlags(gbterr_fmt)
401                                         nexterr = now + 5
402                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
403                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
404                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
405                                         if share['upstreamRejectReason'] is PendingUpstream:
406                                                 share['upstreamRejectReason'] = 'GAVE UP'
407                                                 share['upstreamResult'] = False
408                                                 logShare(share)
409                                         return
410                                 
411                                 servers.append(TS)
412                                 continue
413                 
414                 # At this point, we have a reason back
415                 if reason:
416                         # FIXME: The returned value could be a list of multiple responses
417                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
418                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
419                                 # no big deal
420                                 blockSubmissionThread.logger.debug(msg)
421                         else:
422                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
423                                 RaiseRedFlags(msg)
424                 else:
425                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
426                         success = True
427                 if share['upstreamRejectReason'] is PendingUpstream:
428                         share['upstreamRejectReason'] = reason
429                         share['upstreamResult'] = not reason
430                         logShare(share)
431 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
432
433 def checkData(share):
434         data = share['data']
435         data = data[:80]
436         (prevBlock, height, bits) = MM.currentBlock
437         sharePrevBlock = data[4:36]
438         if sharePrevBlock != prevBlock:
439                 if sharePrevBlock == MM.lastBlock[0]:
440                         raise RejectedShare('stale-prevblk')
441                 raise RejectedShare('bad-prevblk')
442         
443         if data[72:76] != bits:
444                 raise RejectedShare('bad-diffbits')
445         
446         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
447         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
448         if data[1:4] != b'\0\0\0' or data[0] > 2:
449                 raise RejectedShare('bad-version')
450
451 def buildStratumData(share, merkleroot):
452         (prevBlock, height, bits) = MM.currentBlock
453         
454         data = b'\x02\0\0\0'
455         data += prevBlock
456         data += merkleroot
457         data += share['ntime'][::-1]
458         data += bits
459         data += share['nonce'][::-1]
460         
461         share['data'] = data
462         return data
463
464 def IsJobValid(wli, wluser = None):
465         if wluser not in workLog:
466                 return False
467         if wli not in workLog[wluser]:
468                 return False
469         (wld, issueT) = workLog[wluser][wli]
470         if time() < issueT - 120:
471                 return False
472         return True
473
474 def checkShare(share):
475         shareTime = share['time'] = time()
476         
477         username = share['username']
478         isstratum = False
479         if 'data' in share:
480                 # getwork/GBT
481                 checkData(share)
482                 data = share['data']
483                 
484                 if username not in workLog:
485                         raise RejectedShare('unknown-user')
486                 MWL = workLog[username]
487                 
488                 shareMerkleRoot = data[36:68]
489                 if 'blkdata' in share:
490                         pl = share['blkdata']
491                         (txncount, pl) = varlenDecode(pl)
492                         cbtxn = bitcoin.txn.Txn(pl)
493                         othertxndata = cbtxn.disassemble(retExtra=True)
494                         coinbase = cbtxn.getCoinbase()
495                         wliPos = coinbase[0] + 2
496                         wliLen = coinbase[wliPos - 1]
497                         wli = coinbase[wliPos:wliPos+wliLen]
498                         mode = 'MC'
499                         moden = 1
500                 else:
501                         wli = shareMerkleRoot
502                         mode = 'MRD'
503                         moden = 0
504                         coinbase = None
505         else:
506                 # Stratum
507                 isstratum = True
508                 wli = share['jobid']
509                 buildStratumData(share, b'\0' * 32)
510                 mode = 'MC'
511                 moden = 1
512                 othertxndata = b''
513                 if None not in workLog:
514                         # We haven't yet sent any stratum work for this block
515                         raise RejectedShare('unknown-work')
516                 MWL = workLog[None]
517         
518         if wli not in MWL:
519                 raise RejectedShare('unknown-work')
520         (wld, issueT) = MWL[wli]
521         share[mode] = wld
522         
523         share['issuetime'] = issueT
524         
525         (workMerkleTree, workCoinbase) = wld[1:3]
526         share['merkletree'] = workMerkleTree
527         if 'jobid' in share:
528                 cbtxn = deepcopy(workMerkleTree.data[0])
529                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
530                 cbtxn.setCoinbase(coinbase)
531                 cbtxn.assemble()
532                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
533                 shareMerkleRoot = data[36:68]
534         
535         if data in DupeShareHACK:
536                 raise RejectedShare('duplicate')
537         DupeShareHACK[data] = None
538         
539         blkhash = dblsha(data)
540         if blkhash[28:] != b'\0\0\0\0':
541                 raise RejectedShare('H-not-zero')
542         blkhashn = LEhash2int(blkhash)
543         
544         global networkTarget
545         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
546         logfunc('BLKHASH: %64x' % (blkhashn,))
547         logfunc(' TARGET: %64x' % (networkTarget,))
548         
549         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
550         txlist = workMerkleTree.data
551         txlist = [deepcopy(txlist[0]),] + txlist[1:]
552         cbtxn = txlist[0]
553         cbtxn.setCoinbase(coinbase or workCoinbase)
554         cbtxn.assemble()
555         
556         if blkhashn <= networkTarget:
557                 logfunc("Submitting upstream")
558                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
559                 if not moden:
560                         payload = assembleBlock(data, txlist)
561                 else:
562                         payload = share['data']
563                         if len(othertxndata):
564                                 payload += share['blkdata']
565                         else:
566                                 payload += assembleBlock(data, txlist)[80:]
567                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
568                 RBPs.append(payload)
569                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
570                 bcnode.submitBlock(payload)
571                 if config.DelayLogForUpstream:
572                         share['upstreamRejectReason'] = PendingUpstream
573                 else:
574                         share['upstreamRejectReason'] = None
575                         share['upstreamResult'] = True
576                 MM.updateBlock(blkhash)
577         
578         # Gotwork hack...
579         if gotwork and blkhashn <= config.GotWorkTarget:
580                 try:
581                         coinbaseMrkl = cbtxn.data
582                         coinbaseMrkl += blkhash
583                         steps = workMerkleTree._steps
584                         coinbaseMrkl += pack('B', len(steps))
585                         for step in steps:
586                                 coinbaseMrkl += step
587                         coinbaseMrkl += b"\0\0\0\0"
588                         info = {}
589                         info['hash'] = b2a_hex(blkhash).decode('ascii')
590                         info['header'] = b2a_hex(data).decode('ascii')
591                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
592                         thr = threading.Thread(target=submitGotwork, args=(info,))
593                         thr.daemon = True
594                         thr.start()
595                 except:
596                         checkShare.logger.warning('Failed to build gotwork request')
597         
598         if 'target' in share:
599                 workTarget = share['target']
600         elif len(wld) > 6:
601                 workTarget = wld[6]
602         else:
603                 workTarget = None
604         
605         if workTarget is None:
606                 workTarget = config.ShareTarget
607         if blkhashn > workTarget:
608                 raise RejectedShare('high-hash')
609         share['target'] = workTarget
610         share['_targethex'] = '%064x' % (workTarget,)
611         
612         shareTimestamp = unpack('<L', data[68:72])[0]
613         if shareTime < issueT - 120:
614                 raise RejectedShare('stale-work')
615         if shareTimestamp < shareTime - 300:
616                 raise RejectedShare('time-too-old')
617         if shareTimestamp > shareTime + 7200:
618                 raise RejectedShare('time-too-new')
619         
620         if moden:
621                 cbpre = workCoinbase
622                 cbpreLen = len(cbpre)
623                 if coinbase[:cbpreLen] != cbpre:
624                         raise RejectedShare('bad-cb-prefix')
625                 
626                 # Filter out known "I support" flags, to prevent exploits
627                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
628                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
629                                 raise RejectedShare('bad-cb-flag')
630                 
631                 if len(coinbase) > 100:
632                         raise RejectedShare('bad-cb-length')
633                 
634                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
635                         raise RejectedShare('bad-txnmrklroot')
636                 
637                 if len(othertxndata):
638                         allowed = assembleBlock(data, txlist)[80:]
639                         if allowed != share['blkdata']:
640                                 raise RejectedShare('bad-txns')
641         
642         if config.DynamicTargetting and username in userStatus:
643                 # NOTE: userStatus[username] only doesn't exist across restarts
644                 status = userStatus[username]
645                 target = status[0] or config.ShareTarget
646                 if target == workTarget:
647                         userStatus[username][2] += 1
648                 else:
649                         userStatus[username][2] += float(target) / workTarget
650                 if isstratum and userStatus[username][2] > config.DynamicTargetGoal * 2:
651                         stratumsrv.quickDifficultyUpdate(username)
652 checkShare.logger = logging.getLogger('checkShare')
653
654 def logShare(share):
655         if '_origdata' in share:
656                 share['solution'] = share['_origdata']
657         else:
658                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
659         for i in loggersShare:
660                 i.logShare(share)
661
662 def checkAuthentication(username, password):
663         # HTTPServer uses bytes, and StratumServer uses str
664         if hasattr(username, 'decode'): username = username.decode('utf8')
665         if hasattr(password, 'decode'): password = password.decode('utf8')
666         
667         for i in authenticators:
668                 if i.checkAuthentication(username, password):
669                         return True
670         return False
671
672 def receiveShare(share):
673         # TODO: username => userid
674         try:
675                 checkShare(share)
676         except RejectedShare as rej:
677                 share['rejectReason'] = str(rej)
678                 raise
679         except BaseException as e:
680                 share['rejectReason'] = 'ERROR'
681                 raise
682         finally:
683                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
684                         logShare(share)
685
686 def newBlockNotification():
687         logging.getLogger('newBlockNotification').info('Received new block notification')
688         MM.updateMerkleTree()
689         # TODO: Force RESPOND TO LONGPOLLS?
690         pass
691
692 def newBlockNotificationSIGNAL(signum, frame):
693         # Use a new thread, in case the signal handler is called with locks held
694         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
695         thr.daemon = True
696         thr.start()
697
698 from signal import signal, SIGUSR1
699 signal(SIGUSR1, newBlockNotificationSIGNAL)
700
701
702 import os
703 import os.path
704 import pickle
705 import signal
706 import sys
707 from time import sleep
708 import traceback
709
710 if getattr(config, 'SaveStateFilename', None) is None:
711         config.SaveStateFilename = 'eloipool.worklog'
712
713 def stopServers():
714         logger = logging.getLogger('stopServers')
715         
716         if hasattr(stopServers, 'already'):
717                 logger.debug('Already tried to stop servers before')
718                 return
719         stopServers.already = True
720         
721         logger.info('Stopping servers...')
722         global bcnode, server
723         servers = (bcnode, server, stratumsrv)
724         for s in servers:
725                 s.keepgoing = False
726         for s in servers:
727                 try:
728                         s.wakeup()
729                 except:
730                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
731         i = 0
732         while True:
733                 sl = []
734                 for s in servers:
735                         if s.running:
736                                 sl.append(s.__class__.__name__)
737                 if not sl:
738                         break
739                 i += 1
740                 if i >= 0x100:
741                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
742                         break
743                 sleep(0.01)
744         
745         for s in servers:
746                 for fd in s._fd.keys():
747                         os.close(fd)
748
749 def stopLoggers():
750         for i in loggersShare:
751                 if hasattr(i, 'stop'):
752                         i.stop()
753
754 def saveState(SAVE_STATE_FILENAME, t = None):
755         logger = logging.getLogger('saveState')
756         
757         # Then, save data needed to resume work
758         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
759         i = 0
760         while True:
761                 try:
762                         with open(SAVE_STATE_FILENAME, 'wb') as f:
763                                 pickle.dump(t, f)
764                                 pickle.dump(DupeShareHACK, f)
765                                 pickle.dump(workLog, f)
766                         break
767                 except:
768                         i += 1
769                         if i >= 0x10000:
770                                 logger.error('Failed to save work\n' + traceback.format_exc())
771                                 try:
772                                         os.unlink(SAVE_STATE_FILENAME)
773                                 except:
774                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
775
776 def exit():
777         t = time()
778         stopServers()
779         stopLoggers()
780         saveState(config.SaveStateFilename, t=t)
781         logging.getLogger('exit').info('Goodbye...')
782         os.kill(os.getpid(), signal.SIGTERM)
783         sys.exit(0)
784
785 def restart():
786         t = time()
787         stopServers()
788         stopLoggers()
789         saveState(config.SaveStateFilename, t=t)
790         logging.getLogger('restart').info('Restarting...')
791         try:
792                 os.execv(sys.argv[0], sys.argv)
793         except:
794                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
795
796 def restoreState(SAVE_STATE_FILENAME):
797         if not os.path.exists(SAVE_STATE_FILENAME):
798                 return
799         
800         global workLog, DupeShareHACK
801         
802         logger = logging.getLogger('restoreState')
803         s = os.stat(SAVE_STATE_FILENAME)
804         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
805         try:
806                 with open(SAVE_STATE_FILENAME, 'rb') as f:
807                         t = pickle.load(f)
808                         if type(t) == tuple:
809                                 if len(t) > 2:
810                                         # Future formats, not supported here
811                                         ver = t[3]
812                                         # TODO
813                                 
814                                 # Old format, from 2012-02-02 to 2012-02-03
815                                 workLog = t[0]
816                                 DupeShareHACK = t[1]
817                                 t = None
818                         else:
819                                 if isinstance(t, dict):
820                                         # Old format, from 2012-02-03 to 2012-02-03
821                                         DupeShareHACK = t
822                                         t = None
823                                 else:
824                                         # Current format, from 2012-02-03 onward
825                                         DupeShareHACK = pickle.load(f)
826                                 
827                                 if t + 120 >= time():
828                                         workLog = pickle.load(f)
829                                 else:
830                                         logger.debug('Skipping restore of expired workLog')
831         except:
832                 logger.error('Failed to restore state\n' + traceback.format_exc())
833                 return
834         logger.info('State restored successfully')
835         if t:
836                 logger.info('Total downtime: %g seconds' % (time() - t,))
837
838
839 from jsonrpcserver import JSONRPCListener, JSONRPCServer
840 import interactivemode
841 from networkserver import NetworkListener
842 import threading
843 import sharelogging
844 import authentication
845 from stratumserver import StratumServer
846 import imp
847
848 if __name__ == "__main__":
849         if not hasattr(config, 'ShareLogging'):
850                 config.ShareLogging = ()
851         if hasattr(config, 'DbOptions'):
852                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
853                 config.ShareLogging = list(config.ShareLogging)
854                 config.ShareLogging.append( {
855                         'type': 'sql',
856                         'engine': 'postgres',
857                         'dbopts': config.DbOptions,
858                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
859                 } )
860         for i in config.ShareLogging:
861                 if not hasattr(i, 'keys'):
862                         name, parameters = i
863                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
864                         if name == 'postgres':
865                                 name = 'sql'
866                                 i = {
867                                         'engine': 'postgres',
868                                         'dbopts': parameters,
869                                 }
870                         elif name == 'logfile':
871                                 i = {}
872                                 i['thropts'] = parameters
873                                 if 'filename' in parameters:
874                                         i['filename'] = parameters['filename']
875                                         i['thropts'] = dict(i['thropts'])
876                                         del i['thropts']['filename']
877                         else:
878                                 i = parameters
879                         i['type'] = name
880                 
881                 name = i['type']
882                 parameters = i
883                 try:
884                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
885                         m = imp.load_module(name, fp, pathname, description)
886                         lo = getattr(m, name)(**parameters)
887                         loggersShare.append(lo)
888                 except:
889                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
890         
891         if not hasattr(config, 'Authentication'):
892                 config.Authentication = ({'module': 'allowall'},)
893         
894         for i in config.Authentication:
895                 name = i['module']
896                 parameters = i
897                 try:
898                         fp, pathname, description = imp.find_module(name, authentication.__path__)
899                         m = imp.load_module(name, fp, pathname, description)
900                         lo = getattr(m, name)(**parameters)
901                         authenticators.append(lo)
902                 except:
903                         logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
904         
905         LSbc = []
906         if not hasattr(config, 'BitcoinNodeAddresses'):
907                 config.BitcoinNodeAddresses = ()
908         for a in config.BitcoinNodeAddresses:
909                 LSbc.append(NetworkListener(bcnode, a))
910         
911         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
912                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
913         
914         import jsonrpc_getblocktemplate
915         import jsonrpc_getwork
916         import jsonrpc_setworkaux
917         
918         server = JSONRPCServer()
919         server.tls = threading.local()
920         server.tls.wantClear = False
921         if hasattr(config, 'JSONRPCAddress'):
922                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
923                 if not hasattr(config, 'JSONRPCAddresses'):
924                         config.JSONRPCAddresses = []
925                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
926         LS = []
927         for a in config.JSONRPCAddresses:
928                 LS.append(JSONRPCListener(server, a))
929         if hasattr(config, 'SecretUser'):
930                 server.SecretUser = config.SecretUser
931         server.aux = MM.CoinbaseAux
932         server.getBlockHeader = getBlockHeader
933         server.getBlockTemplate = getBlockTemplate
934         server.receiveShare = receiveShare
935         server.RaiseRedFlags = RaiseRedFlags
936         server.ShareTarget = config.ShareTarget
937         server.checkAuthentication = checkAuthentication
938         
939         if hasattr(config, 'TrustedForwarders'):
940                 server.TrustedForwarders = config.TrustedForwarders
941         server.ServerName = config.ServerName
942         
943         stratumsrv = StratumServer()
944         stratumsrv.getStratumJob = getStratumJob
945         stratumsrv.getExistingStratumJob = getExistingStratumJob
946         stratumsrv.receiveShare = receiveShare
947         stratumsrv.RaiseRedFlags = RaiseRedFlags
948         stratumsrv.getTarget = getTarget
949         stratumsrv.defaultTarget = config.ShareTarget
950         stratumsrv.IsJobValid = IsJobValid
951         stratumsrv.checkAuthentication = checkAuthentication
952         if not hasattr(config, 'StratumAddresses'):
953                 config.StratumAddresses = ()
954         for a in config.StratumAddresses:
955                 NetworkListener(stratumsrv, a)
956         
957         MM.start()
958         
959         restoreState(config.SaveStateFilename)
960         
961         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
962         prune_thr.daemon = True
963         prune_thr.start()
964         
965         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
966         bcnode_thr.daemon = True
967         bcnode_thr.start()
968         
969         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
970         stratum_thr.daemon = True
971         stratum_thr.start()
972         
973         server.serve_forever()