Merge commit 'a71ece8'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 # Portions written by Peter Leurs <kinlo@triplemining.com>
5 #
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as
8 # published by the Free Software Foundation, either version 3 of the
9 # License, or (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU Affero General Public License for more details.
15 #
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18
19 import argparse
20 import importlib
21 argparser = argparse.ArgumentParser()
22 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
23 args = argparser.parse_args()
24 configmod = 'config'
25 if not args.config is None:
26         configmod = 'config_%s' % (args.config,)
27 __import__(configmod)
28 config = importlib.import_module(configmod)
29
30 if not hasattr(config, 'ServerName'):
31         config.ServerName = 'Unnamed Eloipool'
32
33 if not hasattr(config, 'ShareTarget'):
34         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
35
36
37 import logging
38 import logging.handlers
39
40 rootlogger = logging.getLogger(None)
41 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
42 logformatter = logging.Formatter(logformat)
43 if len(rootlogger.handlers) == 0:
44         logging.basicConfig(
45                 format=logformat,
46                 level=logging.DEBUG,
47         )
48         for infoOnly in (
49                 'checkShare',
50                 'getTarget',
51                 'JSONRPCHandler',
52                 'JSONRPCServer',
53                 'merkleMaker',
54                 'StratumServer',
55                 'Waker for JSONRPCServer',
56                 'Waker for StratumServer',
57                 'WorkLogPruner'
58         ):
59                 logging.getLogger(infoOnly).setLevel(logging.INFO)
60 if getattr(config, 'LogToSysLog', False):
61         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
62         rootlogger.addHandler(sysloghandler)
63 if hasattr(config, 'LogFile'):
64         if isinstance(config.LogFile, str):
65                 filehandler = logging.FileHandler(config.LogFile)
66         else:
67                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
68         filehandler.setFormatter(logformatter)
69         rootlogger.addHandler(filehandler)
70
71 def RaiseRedFlags(reason):
72         logging.getLogger('redflag').critical(reason)
73         return reason
74
75
76 from bitcoin.node import BitcoinLink, BitcoinNode
77 bcnode = BitcoinNode(config.UpstreamNetworkId)
78 bcnode.userAgent += b'Eloipool:0.1/'
79 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
80
81 import jsonrpc
82
83 try:
84         import jsonrpc.authproxy
85         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
86 except:
87         pass
88
89
90 from bitcoin.script import BitcoinScript
91 from bitcoin.txn import Txn
92 from base58 import b58decode
93 from binascii import b2a_hex
94 from struct import pack
95 import subprocess
96 from time import time
97
98 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True, prevBlockHex = None):
99         txn = Txn.new()
100         
101         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
102                 coinbased = 0
103                 try:
104                         cmd = config.CoinbaserCmd
105                         cmd = cmd.replace('%d', str(coinbaseValue))
106                         cmd = cmd.replace('%p', prevBlockHex or '""')
107                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
108                         nout = int(p.stdout.readline())
109                         for i in range(nout):
110                                 amount = int(p.stdout.readline())
111                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
112                                 pkScript = BitcoinScript.toAddress(addr)
113                                 txn.addOutput(amount, pkScript)
114                                 coinbased += amount
115                 except:
116                         coinbased = coinbaseValue + 1
117                 if coinbased >= coinbaseValue:
118                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
119                         txn.outputs = []
120                 else:
121                         coinbaseValue -= coinbased
122         
123         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
124         txn.addOutput(coinbaseValue, pkScript)
125         
126         # TODO
127         # TODO: red flag on dupe coinbase
128         return txn
129
130
131 import jsonrpc_getwork
132 from util import Bits2Target
133
134 workLog = {}
135 userStatus = {}
136 networkTarget = None
137 DupeShareHACK = {}
138
139 server = None
140 stratumsrv = None
141 def updateBlocks():
142         server.wakeLongpoll()
143         stratumsrv.updateJob()
144
145 def blockChanged():
146         global MM, networkTarget, server
147         bits = MM.currentBlock[2]
148         if bits is None:
149                 networkTarget = None
150         else:
151                 networkTarget = Bits2Target(bits)
152         if MM.lastBlock != (None, None, None):
153                 global DupeShareHACK
154                 DupeShareHACK = {}
155                 jsonrpc_getwork._CheckForDupesHACK = {}
156                 workLog.clear()
157         server.wakeLongpoll(wantClear=True)
158         stratumsrv.updateJob(wantClear=True)
159
160
161 from time import sleep, time
162 import traceback
163
164 def _WorkLogPruner_I(wl):
165         now = time()
166         pruned = 0
167         for username in wl:
168                 userwork = wl[username]
169                 for wli in tuple(userwork.keys()):
170                         if now > userwork[wli][1] + 120:
171                                 del userwork[wli]
172                                 pruned += 1
173         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
174
175 def WorkLogPruner(wl):
176         while True:
177                 try:
178                         sleep(60)
179                         _WorkLogPruner_I(wl)
180                 except:
181                         WorkLogPruner.logger.error(traceback.format_exc())
182 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
183
184
185 from merklemaker import merkleMaker
186 MM = merkleMaker()
187 MM.__dict__.update(config.__dict__)
188 MM.makeCoinbaseTxn = makeCoinbaseTxn
189 MM.onBlockChange = blockChanged
190 MM.onBlockUpdate = updateBlocks
191
192
193 from binascii import b2a_hex
194 from copy import deepcopy
195 from math import ceil, log
196 from merklemaker import MakeBlockHeader
197 from struct import pack, unpack
198 import threading
199 from time import time
200 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
201 import jsonrpc
202 import traceback
203
204 gotwork = None
205 if hasattr(config, 'GotWorkURI'):
206         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
207
208 if not hasattr(config, 'DelayLogForUpstream'):
209         config.DelayLogForUpstream = False
210
211 if not hasattr(config, 'DynamicTargetting'):
212         config.DynamicTargetting = 0
213 else:
214         if not hasattr(config, 'DynamicTargetWindow'):
215                 config.DynamicTargetWindow = 120
216         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
217
218 def submitGotwork(info):
219         try:
220                 gotwork.gotwork(info)
221         except:
222                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
223
224 if not hasattr(config, 'GotWorkTarget'):
225         config.GotWorkTarget = 0
226
227 def clampTarget(target, DTMode):
228         # ShareTarget is the minimum
229         if target is None or target > config.ShareTarget:
230                 target = config.ShareTarget
231         
232         # Never target above upstream(s), as we'd lose blocks
233         target = max(target, networkTarget, config.GotWorkTarget)
234         
235         if DTMode == 2:
236                 # Ceil target to a power of two :)
237                 truebits = log(target, 2)
238                 if target <= 2**int(truebits):
239                         # Workaround for bug in Python's math.log function
240                         truebits = int(truebits)
241                 target = 2**ceil(truebits) - 1
242         elif DTMode == 3:
243                 # Round target to multiple of bdiff 1
244                 target = bdiff1target / int(round(target2bdiff(target)))
245         
246         # Return None for ShareTarget to save memory
247         if target == config.ShareTarget:
248                 return None
249         return target
250
251 def getTarget(username, now, DTMode = None, RequestedTarget = None):
252         if DTMode is None:
253                 DTMode = config.DynamicTargetting
254         if not DTMode:
255                 return None
256         if username in userStatus:
257                 status = userStatus[username]
258         else:
259                 # No record, use default target
260                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
261                 userStatus[username] = [RequestedTarget, now, 0]
262                 return RequestedTarget
263         (targetIn, lastUpdate, work) = status
264         if work <= config.DynamicTargetGoal:
265                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
266                         # No reason to change it just yet
267                         return clampTarget(targetIn, DTMode)
268                 if not work:
269                         # No shares received, reset to minimum
270                         if targetIn:
271                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
272                                 userStatus[username] = [None, now, 0]
273                         return clampTarget(None, DTMode)
274         
275         deltaSec = now - lastUpdate
276         target = targetIn or config.ShareTarget
277         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
278         target = clampTarget(target, DTMode)
279         if target != targetIn:
280                 pfx = 'Retargetting %s' % (repr(username),)
281                 tin = targetIn or config.ShareTarget
282                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
283                 tgt = target or config.ShareTarget
284                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
285         userStatus[username] = [target, now, 0]
286         return target
287 getTarget.logger = logging.getLogger('getTarget')
288
289 def TopTargets(n = 0x10):
290         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
291         tmp.sort(key=lambda k: -userStatus[k][0])
292         tmp2 = {}
293         def t2d(t):
294                 if t not in tmp2:
295                         tmp2[t] = target2pdiff(t)
296                 return tmp2[t]
297         for k in tmp[-n:]:
298                 tgt = userStatus[k][0]
299                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
300
301 def RegisterWork(username, wli, wld, RequestedTarget = None):
302         now = time()
303         target = getTarget(username, now, RequestedTarget=RequestedTarget)
304         wld = tuple(wld) + (target,)
305         workLog.setdefault(username, {})[wli] = (wld, now)
306         return target or config.ShareTarget
307
308 def getBlockHeader(username):
309         MRD = MM.getMRD()
310         merkleRoot = MRD[0]
311         hdr = MakeBlockHeader(MRD)
312         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
313         target = RegisterWork(username, merkleRoot, MRD)
314         return (hdr, workLog[username][merkleRoot], target)
315
316 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
317         if server.tls.wantClear:
318                 wantClear = True
319         elif p_magic and username not in workLog:
320                 wantClear = True
321                 p_magic[0] = True
322         else:
323                 wantClear = False
324         MC = MM.getMC(wantClear)
325         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
326         wliPos = coinbase[0] + 2
327         wliLen = coinbase[wliPos - 1]
328         wli = coinbase[wliPos:wliPos+wliLen]
329         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
330         return (MC, workLog[username][wli], target)
331
332 def getStratumJob(jobid, wantClear = False):
333         MC = MM.getMC(wantClear)
334         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
335         now = time()
336         workLog.setdefault(None, {})[jobid] = (MC, now)
337         return (MC, workLog[None][jobid])
338
339 def getExistingStratumJob(jobid):
340         wld = workLog[None][jobid]
341         return (wld[0], wld)
342
343 loggersShare = []
344 authenticators = []
345
346 RBDs = []
347 RBPs = []
348
349 from bitcoin.varlen import varlenEncode, varlenDecode
350 import bitcoin.txn
351 from merklemaker import assembleBlock
352
353 if not hasattr(config, 'BlockSubmissions'):
354         config.BlockSubmissions = None
355
356 RBFs = []
357 def blockSubmissionThread(payload, blkhash, share):
358         if config.BlockSubmissions is None:
359                 servers = list(a for b in MM.TemplateSources for a in b)
360         else:
361                 servers = list(config.BlockSubmissions)
362         
363         if hasattr(share['merkletree'], 'source_uri'):
364                 servers.insert(0, {
365                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
366                         'name': share['merkletree'].source,
367                 })
368         elif not servers:
369                 servers = list(a for b in MM.TemplateSources for a in b)
370         
371         myblock = (blkhash, payload[4:36])
372         payload = b2a_hex(payload).decode('ascii')
373         nexterr = 0
374         tries = 0
375         success = False
376         while len(servers):
377                 tries += 1
378                 TS = servers.pop(0)
379                 UpstreamBitcoindJSONRPC = TS['access']
380                 try:
381                         # BIP 22 standard submitblock
382                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
383                 except BaseException as gbterr:
384                         gbterr_fmt = traceback.format_exc()
385                         try:
386                                 try:
387                                         # bitcoind 0.5/0.6 getmemorypool
388                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
389                                 except:
390                                         # Old BIP 22 draft getmemorypool
391                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
392                                 if reason is True:
393                                         reason = None
394                                 elif reason is False:
395                                         reason = 'rejected'
396                         except BaseException as gmperr:
397                                 now = time()
398                                 if now > nexterr:
399                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
400                                         RaiseRedFlags(gbterr_fmt)
401                                         nexterr = now + 5
402                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
403                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
404                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
405                                         if share['upstreamRejectReason'] is PendingUpstream:
406                                                 share['upstreamRejectReason'] = 'GAVE UP'
407                                                 share['upstreamResult'] = False
408                                                 logShare(share)
409                                         return
410                                 
411                                 servers.append(TS)
412                                 continue
413                 
414                 # At this point, we have a reason back
415                 if reason:
416                         # FIXME: The returned value could be a list of multiple responses
417                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
418                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
419                                 # no big deal
420                                 blockSubmissionThread.logger.debug(msg)
421                         else:
422                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
423                                 RaiseRedFlags(msg)
424                 else:
425                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
426                         success = True
427                 if share['upstreamRejectReason'] is PendingUpstream:
428                         share['upstreamRejectReason'] = reason
429                         share['upstreamResult'] = not reason
430                         logShare(share)
431 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
432
433 def checkData(share):
434         data = share['data']
435         data = data[:80]
436         (prevBlock, height, bits) = MM.currentBlock
437         sharePrevBlock = data[4:36]
438         if sharePrevBlock != prevBlock:
439                 if sharePrevBlock == MM.lastBlock[0]:
440                         raise RejectedShare('stale-prevblk')
441                 raise RejectedShare('bad-prevblk')
442         
443         if data[72:76] != bits:
444                 raise RejectedShare('bad-diffbits')
445         
446         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
447         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
448         if data[1:4] != b'\0\0\0' or data[0] > 2:
449                 raise RejectedShare('bad-version')
450
451 def buildStratumData(share, merkleroot):
452         (prevBlock, height, bits) = MM.currentBlock
453         
454         data = b'\x02\0\0\0'
455         data += prevBlock
456         data += merkleroot
457         data += share['ntime'][::-1]
458         data += bits
459         data += share['nonce'][::-1]
460         
461         share['data'] = data
462         return data
463
464 def IsJobValid(wli, wluser = None):
465         if wluser not in workLog:
466                 return False
467         if wli not in workLog[wluser]:
468                 return False
469         (wld, issueT) = workLog[wluser][wli]
470         if time() < issueT - 120:
471                 return False
472         return True
473
474 def checkShare(share):
475         shareTime = share['time'] = time()
476         
477         username = share['username']
478         if 'data' in share:
479                 # getwork/GBT
480                 checkData(share)
481                 data = share['data']
482                 
483                 if username not in workLog:
484                         raise RejectedShare('unknown-user')
485                 MWL = workLog[username]
486                 
487                 shareMerkleRoot = data[36:68]
488                 if 'blkdata' in share:
489                         pl = share['blkdata']
490                         (txncount, pl) = varlenDecode(pl)
491                         cbtxn = bitcoin.txn.Txn(pl)
492                         othertxndata = cbtxn.disassemble(retExtra=True)
493                         coinbase = cbtxn.getCoinbase()
494                         wliPos = coinbase[0] + 2
495                         wliLen = coinbase[wliPos - 1]
496                         wli = coinbase[wliPos:wliPos+wliLen]
497                         mode = 'MC'
498                         moden = 1
499                 else:
500                         wli = shareMerkleRoot
501                         mode = 'MRD'
502                         moden = 0
503                         coinbase = None
504         else:
505                 # Stratum
506                 MWL = workLog[None]
507                 wli = share['jobid']
508                 buildStratumData(share, b'\0' * 32)
509                 mode = 'MC'
510                 moden = 1
511                 othertxndata = b''
512         
513         if wli not in MWL:
514                 raise RejectedShare('unknown-work')
515         (wld, issueT) = MWL[wli]
516         share[mode] = wld
517         
518         share['issuetime'] = issueT
519         
520         (workMerkleTree, workCoinbase) = wld[1:3]
521         share['merkletree'] = workMerkleTree
522         if 'jobid' in share:
523                 cbtxn = deepcopy(workMerkleTree.data[0])
524                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
525                 cbtxn.setCoinbase(coinbase)
526                 cbtxn.assemble()
527                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
528                 shareMerkleRoot = data[36:68]
529         
530         if data in DupeShareHACK:
531                 raise RejectedShare('duplicate')
532         DupeShareHACK[data] = None
533         
534         blkhash = dblsha(data)
535         if blkhash[28:] != b'\0\0\0\0':
536                 raise RejectedShare('H-not-zero')
537         blkhashn = LEhash2int(blkhash)
538         
539         global networkTarget
540         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
541         logfunc('BLKHASH: %64x' % (blkhashn,))
542         logfunc(' TARGET: %64x' % (networkTarget,))
543         
544         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
545         txlist = workMerkleTree.data
546         txlist = [deepcopy(txlist[0]),] + txlist[1:]
547         cbtxn = txlist[0]
548         cbtxn.setCoinbase(coinbase or workCoinbase)
549         cbtxn.assemble()
550         
551         if blkhashn <= networkTarget:
552                 logfunc("Submitting upstream")
553                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
554                 if not moden:
555                         payload = assembleBlock(data, txlist)
556                 else:
557                         payload = share['data']
558                         if len(othertxndata):
559                                 payload += share['blkdata']
560                         else:
561                                 payload += assembleBlock(data, txlist)[80:]
562                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
563                 RBPs.append(payload)
564                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
565                 bcnode.submitBlock(payload)
566                 if config.DelayLogForUpstream:
567                         share['upstreamRejectReason'] = PendingUpstream
568                 else:
569                         share['upstreamRejectReason'] = None
570                         share['upstreamResult'] = True
571                 MM.updateBlock(blkhash)
572         
573         # Gotwork hack...
574         if gotwork and blkhashn <= config.GotWorkTarget:
575                 try:
576                         coinbaseMrkl = cbtxn.data
577                         coinbaseMrkl += blkhash
578                         steps = workMerkleTree._steps
579                         coinbaseMrkl += pack('B', len(steps))
580                         for step in steps:
581                                 coinbaseMrkl += step
582                         coinbaseMrkl += b"\0\0\0\0"
583                         info = {}
584                         info['hash'] = b2a_hex(blkhash).decode('ascii')
585                         info['header'] = b2a_hex(data).decode('ascii')
586                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
587                         thr = threading.Thread(target=submitGotwork, args=(info,))
588                         thr.daemon = True
589                         thr.start()
590                 except:
591                         checkShare.logger.warning('Failed to build gotwork request')
592         
593         if 'target' in share:
594                 workTarget = share['target']
595         elif len(wld) > 6:
596                 workTarget = wld[6]
597         else:
598                 workTarget = None
599         
600         if workTarget is None:
601                 workTarget = config.ShareTarget
602         if blkhashn > workTarget:
603                 raise RejectedShare('high-hash')
604         share['target'] = workTarget
605         share['_targethex'] = '%064x' % (workTarget,)
606         
607         shareTimestamp = unpack('<L', data[68:72])[0]
608         if shareTime < issueT - 120:
609                 raise RejectedShare('stale-work')
610         if shareTimestamp < shareTime - 300:
611                 raise RejectedShare('time-too-old')
612         if shareTimestamp > shareTime + 7200:
613                 raise RejectedShare('time-too-new')
614         
615         if config.DynamicTargetting and username in userStatus:
616                 # NOTE: userStatus[username] only doesn't exist across restarts
617                 status = userStatus[username]
618                 target = status[0] or config.ShareTarget
619                 if target == workTarget:
620                         userStatus[username][2] += 1
621                 else:
622                         userStatus[username][2] += float(target) / workTarget
623         
624         if moden:
625                 cbpre = workCoinbase
626                 cbpreLen = len(cbpre)
627                 if coinbase[:cbpreLen] != cbpre:
628                         raise RejectedShare('bad-cb-prefix')
629                 
630                 # Filter out known "I support" flags, to prevent exploits
631                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
632                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
633                                 raise RejectedShare('bad-cb-flag')
634                 
635                 if len(coinbase) > 100:
636                         raise RejectedShare('bad-cb-length')
637                 
638                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
639                         raise RejectedShare('bad-txnmrklroot')
640                 
641                 if len(othertxndata):
642                         allowed = assembleBlock(data, txlist)[80:]
643                         if allowed != share['blkdata']:
644                                 raise RejectedShare('bad-txns')
645 checkShare.logger = logging.getLogger('checkShare')
646
647 def logShare(share):
648         if '_origdata' in share:
649                 share['solution'] = share['_origdata']
650         else:
651                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
652         for i in loggersShare:
653                 i.logShare(share)
654
655 def checkAuthentication(username, password):
656         # HTTPServer uses bytes, and StratumServer uses str
657         if hasattr(username, 'decode'): username = username.decode('utf8')
658         if hasattr(password, 'decode'): password = password.decode('utf8')
659         
660         for i in authenticators:
661                 if i.checkAuthentication(username, password):
662                         return True
663         return False
664
665 def receiveShare(share):
666         # TODO: username => userid
667         try:
668                 checkShare(share)
669         except RejectedShare as rej:
670                 share['rejectReason'] = str(rej)
671                 raise
672         except BaseException as e:
673                 share['rejectReason'] = 'ERROR'
674                 raise
675         finally:
676                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
677                         logShare(share)
678
679 def newBlockNotification():
680         logging.getLogger('newBlockNotification').info('Received new block notification')
681         MM.updateMerkleTree()
682         # TODO: Force RESPOND TO LONGPOLLS?
683         pass
684
685 def newBlockNotificationSIGNAL(signum, frame):
686         # Use a new thread, in case the signal handler is called with locks held
687         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
688         thr.daemon = True
689         thr.start()
690
691 from signal import signal, SIGUSR1
692 signal(SIGUSR1, newBlockNotificationSIGNAL)
693
694
695 import os
696 import os.path
697 import pickle
698 import signal
699 import sys
700 from time import sleep
701 import traceback
702
703 if getattr(config, 'SaveStateFilename', None) is None:
704         config.SaveStateFilename = 'eloipool.worklog'
705
706 def stopServers():
707         logger = logging.getLogger('stopServers')
708         
709         if hasattr(stopServers, 'already'):
710                 logger.debug('Already tried to stop servers before')
711                 return
712         stopServers.already = True
713         
714         logger.info('Stopping servers...')
715         global bcnode, server
716         servers = (bcnode, server, stratumsrv)
717         for s in servers:
718                 s.keepgoing = False
719         for s in servers:
720                 try:
721                         s.wakeup()
722                 except:
723                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
724         i = 0
725         while True:
726                 sl = []
727                 for s in servers:
728                         if s.running:
729                                 sl.append(s.__class__.__name__)
730                 if not sl:
731                         break
732                 i += 1
733                 if i >= 0x100:
734                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
735                         break
736                 sleep(0.01)
737         
738         for s in servers:
739                 for fd in s._fd.keys():
740                         os.close(fd)
741
742 def stopLoggers():
743         for i in loggersShare:
744                 if hasattr(i, 'stop'):
745                         i.stop()
746
747 def saveState(SAVE_STATE_FILENAME, t = None):
748         logger = logging.getLogger('saveState')
749         
750         # Then, save data needed to resume work
751         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
752         i = 0
753         while True:
754                 try:
755                         with open(SAVE_STATE_FILENAME, 'wb') as f:
756                                 pickle.dump(t, f)
757                                 pickle.dump(DupeShareHACK, f)
758                                 pickle.dump(workLog, f)
759                         break
760                 except:
761                         i += 1
762                         if i >= 0x10000:
763                                 logger.error('Failed to save work\n' + traceback.format_exc())
764                                 try:
765                                         os.unlink(SAVE_STATE_FILENAME)
766                                 except:
767                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
768
769 def exit():
770         t = time()
771         stopServers()
772         stopLoggers()
773         saveState(config.SaveStateFilename, t=t)
774         logging.getLogger('exit').info('Goodbye...')
775         os.kill(os.getpid(), signal.SIGTERM)
776         sys.exit(0)
777
778 def restart():
779         t = time()
780         stopServers()
781         stopLoggers()
782         saveState(config.SaveStateFilename, t=t)
783         logging.getLogger('restart').info('Restarting...')
784         try:
785                 os.execv(sys.argv[0], sys.argv)
786         except:
787                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
788
789 def restoreState(SAVE_STATE_FILENAME):
790         if not os.path.exists(SAVE_STATE_FILENAME):
791                 return
792         
793         global workLog, DupeShareHACK
794         
795         logger = logging.getLogger('restoreState')
796         s = os.stat(SAVE_STATE_FILENAME)
797         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
798         try:
799                 with open(SAVE_STATE_FILENAME, 'rb') as f:
800                         t = pickle.load(f)
801                         if type(t) == tuple:
802                                 if len(t) > 2:
803                                         # Future formats, not supported here
804                                         ver = t[3]
805                                         # TODO
806                                 
807                                 # Old format, from 2012-02-02 to 2012-02-03
808                                 workLog = t[0]
809                                 DupeShareHACK = t[1]
810                                 t = None
811                         else:
812                                 if isinstance(t, dict):
813                                         # Old format, from 2012-02-03 to 2012-02-03
814                                         DupeShareHACK = t
815                                         t = None
816                                 else:
817                                         # Current format, from 2012-02-03 onward
818                                         DupeShareHACK = pickle.load(f)
819                                 
820                                 if t + 120 >= time():
821                                         workLog = pickle.load(f)
822                                 else:
823                                         logger.debug('Skipping restore of expired workLog')
824         except:
825                 logger.error('Failed to restore state\n' + traceback.format_exc())
826                 return
827         logger.info('State restored successfully')
828         if t:
829                 logger.info('Total downtime: %g seconds' % (time() - t,))
830
831
832 from jsonrpcserver import JSONRPCListener, JSONRPCServer
833 import interactivemode
834 from networkserver import NetworkListener
835 import threading
836 import sharelogging
837 import authentication
838 from stratumserver import StratumServer
839 import imp
840
841 if __name__ == "__main__":
842         if not hasattr(config, 'ShareLogging'):
843                 config.ShareLogging = ()
844         if hasattr(config, 'DbOptions'):
845                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
846                 config.ShareLogging = list(config.ShareLogging)
847                 config.ShareLogging.append( {
848                         'type': 'sql',
849                         'engine': 'postgres',
850                         'dbopts': config.DbOptions,
851                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
852                 } )
853         for i in config.ShareLogging:
854                 if not hasattr(i, 'keys'):
855                         name, parameters = i
856                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
857                         if name == 'postgres':
858                                 name = 'sql'
859                                 i = {
860                                         'engine': 'postgres',
861                                         'dbopts': parameters,
862                                 }
863                         elif name == 'logfile':
864                                 i = {}
865                                 i['thropts'] = parameters
866                                 if 'filename' in parameters:
867                                         i['filename'] = parameters['filename']
868                                         i['thropts'] = dict(i['thropts'])
869                                         del i['thropts']['filename']
870                         else:
871                                 i = parameters
872                         i['type'] = name
873                 
874                 name = i['type']
875                 parameters = i
876                 try:
877                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
878                         m = imp.load_module(name, fp, pathname, description)
879                         lo = getattr(m, name)(**parameters)
880                         loggersShare.append(lo)
881                 except:
882                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
883         
884         if not hasattr(config, 'Authentication'):
885                 config.Authentication = ({'module': 'allowall'},)
886         
887         for i in config.Authentication:
888                 name = i['module']
889                 parameters = i
890                 try:
891                         fp, pathname, description = imp.find_module(name, authentication.__path__)
892                         m = imp.load_module(name, fp, pathname, description)
893                         lo = getattr(m, name)(**parameters)
894                         authenticators.append(lo)
895                 except:
896                         logging.getLogger('authentication').error("Error setting up authentication module %s: %s", name, sys.exc_info())
897         
898         LSbc = []
899         if not hasattr(config, 'BitcoinNodeAddresses'):
900                 config.BitcoinNodeAddresses = ()
901         for a in config.BitcoinNodeAddresses:
902                 LSbc.append(NetworkListener(bcnode, a))
903         
904         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
905                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
906         
907         import jsonrpc_getblocktemplate
908         import jsonrpc_getwork
909         import jsonrpc_setworkaux
910         
911         server = JSONRPCServer()
912         server.tls = threading.local()
913         server.tls.wantClear = False
914         if hasattr(config, 'JSONRPCAddress'):
915                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
916                 if not hasattr(config, 'JSONRPCAddresses'):
917                         config.JSONRPCAddresses = []
918                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
919         LS = []
920         for a in config.JSONRPCAddresses:
921                 LS.append(JSONRPCListener(server, a))
922         if hasattr(config, 'SecretUser'):
923                 server.SecretUser = config.SecretUser
924         server.aux = MM.CoinbaseAux
925         server.getBlockHeader = getBlockHeader
926         server.getBlockTemplate = getBlockTemplate
927         server.receiveShare = receiveShare
928         server.RaiseRedFlags = RaiseRedFlags
929         server.ShareTarget = config.ShareTarget
930         server.checkAuthentication = checkAuthentication
931         
932         if hasattr(config, 'TrustedForwarders'):
933                 server.TrustedForwarders = config.TrustedForwarders
934         server.ServerName = config.ServerName
935         
936         stratumsrv = StratumServer()
937         stratumsrv.getStratumJob = getStratumJob
938         stratumsrv.getExistingStratumJob = getExistingStratumJob
939         stratumsrv.receiveShare = receiveShare
940         stratumsrv.RaiseRedFlags = RaiseRedFlags
941         stratumsrv.getTarget = getTarget
942         stratumsrv.defaultTarget = config.ShareTarget
943         stratumsrv.IsJobValid = IsJobValid
944         stratumsrv.checkAuthentication = checkAuthentication
945         if not hasattr(config, 'StratumAddresses'):
946                 config.StratumAddresses = ()
947         for a in config.StratumAddresses:
948                 NetworkListener(stratumsrv, a)
949         
950         MM.start()
951         
952         restoreState(config.SaveStateFilename)
953         
954         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
955         prune_thr.daemon = True
956         prune_thr.start()
957         
958         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
959         bcnode_thr.daemon = True
960         bcnode_thr.start()
961         
962         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
963         stratum_thr.daemon = True
964         stratum_thr.start()
965         
966         server.serve_forever()