New config option SaveStateFilename to control what file is used for saved state
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import argparse
19 import importlib
20 argparser = argparse.ArgumentParser()
21 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
22 args = argparser.parse_args()
23 configmod = 'config'
24 if not args.config is None:
25         configmod = 'config_%s' % (args.config,)
26 config = importlib.import_module(configmod)
27
28 if not hasattr(config, 'ServerName'):
29         config.ServerName = 'Unnamed Eloipool'
30
31 if not hasattr(config, 'ShareTarget'):
32         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
33
34
35 import logging
36 import logging.handlers
37
38 rootlogger = logging.getLogger(None)
39 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
40 logformatter = logging.Formatter(logformat)
41 if len(rootlogger.handlers) == 0:
42         logging.basicConfig(
43                 format=logformat,
44                 level=logging.DEBUG,
45         )
46         for infoOnly in (
47                 'checkShare',
48                 'getTarget',
49                 'JSONRPCHandler',
50                 'JSONRPCServer',
51                 'merkleMaker',
52                 'StratumServer',
53                 'Waker for JSONRPCServer',
54                 'Waker for StratumServer',
55                 'WorkLogPruner'
56         ):
57                 logging.getLogger(infoOnly).setLevel(logging.INFO)
58 if getattr(config, 'LogToSysLog', False):
59         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
60         rootlogger.addHandler(sysloghandler)
61 if hasattr(config, 'LogFile'):
62         if isinstance(config.LogFile, str):
63                 filehandler = logging.FileHandler(config.LogFile)
64         else:
65                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
66         filehandler.setFormatter(logformatter)
67         rootlogger.addHandler(filehandler)
68
69 def RaiseRedFlags(reason):
70         logging.getLogger('redflag').critical(reason)
71         return reason
72
73
74 from bitcoin.node import BitcoinLink, BitcoinNode
75 bcnode = BitcoinNode(config.UpstreamNetworkId)
76 bcnode.userAgent += b'Eloipool:0.1/'
77 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
78
79 import jsonrpc
80
81 try:
82         import jsonrpc.authproxy
83         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
84 except:
85         pass
86
87
88 from bitcoin.script import BitcoinScript
89 from bitcoin.txn import Txn
90 from base58 import b58decode
91 from struct import pack
92 import subprocess
93 from time import time
94
95 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
96         txn = Txn.new()
97         
98         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
99                 coinbased = 0
100                 try:
101                         cmd = config.CoinbaserCmd
102                         cmd = cmd.replace('%d', str(coinbaseValue))
103                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
104                         nout = int(p.stdout.readline())
105                         for i in range(nout):
106                                 amount = int(p.stdout.readline())
107                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
108                                 pkScript = BitcoinScript.toAddress(addr)
109                                 txn.addOutput(amount, pkScript)
110                                 coinbased += amount
111                 except:
112                         coinbased = coinbaseValue + 1
113                 if coinbased >= coinbaseValue:
114                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
115                         txn.outputs = []
116                 else:
117                         coinbaseValue -= coinbased
118         
119         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
120         txn.addOutput(coinbaseValue, pkScript)
121         
122         # TODO
123         # TODO: red flag on dupe coinbase
124         return txn
125
126
127 import jsonrpc_getwork
128 from util import Bits2Target
129
130 workLog = {}
131 userStatus = {}
132 networkTarget = None
133 DupeShareHACK = {}
134
135 server = None
136 stratumsrv = None
137 def updateBlocks():
138         server.wakeLongpoll()
139         stratumsrv.updateJob()
140
141 def blockChanged():
142         global MM, networkTarget, server
143         bits = MM.currentBlock[2]
144         if bits is None:
145                 networkTarget = None
146         else:
147                 networkTarget = Bits2Target(bits)
148         if MM.lastBlock != (None, None, None):
149                 global DupeShareHACK
150                 DupeShareHACK = {}
151                 jsonrpc_getwork._CheckForDupesHACK = {}
152                 workLog.clear()
153         server.wakeLongpoll(wantClear=True)
154         stratumsrv.updateJob(wantClear=True)
155
156
157 from time import sleep, time
158 import traceback
159
160 def _WorkLogPruner_I(wl):
161         now = time()
162         pruned = 0
163         for username in wl:
164                 userwork = wl[username]
165                 for wli in tuple(userwork.keys()):
166                         if now > userwork[wli][1] + 120:
167                                 del userwork[wli]
168                                 pruned += 1
169         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
170
171 def WorkLogPruner(wl):
172         while True:
173                 try:
174                         sleep(60)
175                         _WorkLogPruner_I(wl)
176                 except:
177                         WorkLogPruner.logger.error(traceback.format_exc())
178 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
179
180
181 from merklemaker import merkleMaker
182 MM = merkleMaker()
183 MM.__dict__.update(config.__dict__)
184 MM.makeCoinbaseTxn = makeCoinbaseTxn
185 MM.onBlockChange = blockChanged
186 MM.onBlockUpdate = updateBlocks
187
188
189 from binascii import b2a_hex
190 from copy import deepcopy
191 from math import ceil, log
192 from merklemaker import MakeBlockHeader
193 from struct import pack, unpack
194 import threading
195 from time import time
196 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
197 import jsonrpc
198 import traceback
199
200 gotwork = None
201 if hasattr(config, 'GotWorkURI'):
202         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
203
204 if not hasattr(config, 'DelayLogForUpstream'):
205         config.DelayLogForUpstream = False
206
207 if not hasattr(config, 'DynamicTargetting'):
208         config.DynamicTargetting = 0
209 else:
210         if not hasattr(config, 'DynamicTargetWindow'):
211                 config.DynamicTargetWindow = 120
212         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
213
214 def submitGotwork(info):
215         try:
216                 gotwork.gotwork(info)
217         except:
218                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
219
220 def clampTarget(target, DTMode):
221         # ShareTarget is the minimum
222         if target is None or target > config.ShareTarget:
223                 target = config.ShareTarget
224         
225         # Never target above the network, as we'd lose blocks
226         if target < networkTarget:
227                 target = networkTarget
228         
229         if DTMode == 2:
230                 # Ceil target to a power of two :)
231                 truebits = log(target, 2)
232                 if target <= 2**int(truebits):
233                         # Workaround for bug in Python's math.log function
234                         truebits = int(truebits)
235                 target = 2**ceil(truebits) - 1
236         elif DTMode == 3:
237                 # Round target to multiple of bdiff 1
238                 target = bdiff1target / int(round(target2bdiff(target)))
239         
240         # Return None for ShareTarget to save memory
241         if target == config.ShareTarget:
242                 return None
243         return target
244
245 def getTarget(username, now, DTMode = None, RequestedTarget = None):
246         if DTMode is None:
247                 DTMode = config.DynamicTargetting
248         if not DTMode:
249                 return None
250         if username in userStatus:
251                 status = userStatus[username]
252         else:
253                 # No record, use default target
254                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
255                 userStatus[username] = [RequestedTarget, now, 0]
256                 return RequestedTarget
257         (targetIn, lastUpdate, work) = status
258         if work <= config.DynamicTargetGoal:
259                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
260                         # No reason to change it just yet
261                         return clampTarget(targetIn, DTMode)
262                 if not work:
263                         # No shares received, reset to minimum
264                         if targetIn:
265                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
266                                 userStatus[username] = [None, now, 0]
267                         return clampTarget(None, DTMode)
268         
269         deltaSec = now - lastUpdate
270         target = targetIn or config.ShareTarget
271         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
272         target = clampTarget(target, DTMode)
273         if target != targetIn:
274                 pfx = 'Retargetting %s' % (repr(username),)
275                 tin = targetIn or config.ShareTarget
276                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
277                 tgt = target or config.ShareTarget
278                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
279         userStatus[username] = [target, now, 0]
280         return target
281 getTarget.logger = logging.getLogger('getTarget')
282
283 def TopTargets(n = 0x10):
284         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
285         tmp.sort(key=lambda k: -userStatus[k][0])
286         tmp2 = {}
287         def t2d(t):
288                 if t not in tmp2:
289                         tmp2[t] = target2pdiff(t)
290                 return tmp2[t]
291         for k in tmp[-n:]:
292                 tgt = userStatus[k][0]
293                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
294
295 def RegisterWork(username, wli, wld, RequestedTarget = None):
296         now = time()
297         target = getTarget(username, now, RequestedTarget=RequestedTarget)
298         wld = tuple(wld) + (target,)
299         workLog.setdefault(username, {})[wli] = (wld, now)
300         return target or config.ShareTarget
301
302 def getBlockHeader(username):
303         MRD = MM.getMRD()
304         merkleRoot = MRD[0]
305         hdr = MakeBlockHeader(MRD)
306         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
307         target = RegisterWork(username, merkleRoot, MRD)
308         return (hdr, workLog[username][merkleRoot], target)
309
310 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
311         if server.tls.wantClear:
312                 wantClear = True
313         elif p_magic and username not in workLog:
314                 wantClear = True
315                 p_magic[0] = True
316         else:
317                 wantClear = False
318         MC = MM.getMC(wantClear)
319         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
320         wliPos = coinbase[0] + 2
321         wliLen = coinbase[wliPos - 1]
322         wli = coinbase[wliPos:wliPos+wliLen]
323         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
324         return (MC, workLog[username][wli], target)
325
326 def getStratumJob(jobid, wantClear = False):
327         MC = MM.getMC(wantClear)
328         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
329         now = time()
330         workLog.setdefault(None, {})[jobid] = (MC, now)
331         return (MC, workLog[None][jobid])
332
333 def getExistingStratumJob(jobid):
334         wld = workLog[None][jobid]
335         return (wld[0], wld)
336
337 loggersShare = []
338
339 RBDs = []
340 RBPs = []
341
342 from bitcoin.varlen import varlenEncode, varlenDecode
343 import bitcoin.txn
344 from merklemaker import assembleBlock
345
346 if not hasattr(config, 'BlockSubmissions'):
347         config.BlockSubmissions = None
348
349 RBFs = []
350 def blockSubmissionThread(payload, blkhash, share):
351         if config.BlockSubmissions is None:
352                 servers = list(a for b in MM.TemplateSources for a in b)
353         else:
354                 servers = list(config.BlockSubmissions)
355         
356         if hasattr(share['merkletree'], 'source_uri'):
357                 servers.insert(0, {
358                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
359                         'name': share['merkletree'].source,
360                 })
361         elif not servers:
362                 servers = list(a for b in MM.TemplateSources for a in b)
363         
364         myblock = (blkhash, payload[4:36])
365         payload = b2a_hex(payload).decode('ascii')
366         nexterr = 0
367         tries = 0
368         success = False
369         while len(servers):
370                 tries += 1
371                 TS = servers.pop(0)
372                 UpstreamBitcoindJSONRPC = TS['access']
373                 try:
374                         # BIP 22 standard submitblock
375                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
376                 except BaseException as gbterr:
377                         gbterr_fmt = traceback.format_exc()
378                         try:
379                                 try:
380                                         # bitcoind 0.5/0.6 getmemorypool
381                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
382                                 except:
383                                         # Old BIP 22 draft getmemorypool
384                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
385                                 if reason is True:
386                                         reason = None
387                                 elif reason is False:
388                                         reason = 'rejected'
389                         except BaseException as gmperr:
390                                 now = time()
391                                 if now > nexterr:
392                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
393                                         RaiseRedFlags(gbterr_fmt)
394                                         nexterr = now + 5
395                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
396                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
397                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
398                                         if share['upstreamRejectReason'] is PendingUpstream:
399                                                 share['upstreamRejectReason'] = 'GAVE UP'
400                                                 share['upstreamResult'] = False
401                                                 logShare(share)
402                                         return
403                                 
404                                 servers.append(TS)
405                                 continue
406                 
407                 # At this point, we have a reason back
408                 if reason:
409                         # FIXME: The returned value could be a list of multiple responses
410                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
411                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
412                                 # no big deal
413                                 blockSubmissionThread.logger.debug(msg)
414                         else:
415                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
416                                 RaiseRedFlags(msg)
417                 else:
418                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
419                         success = True
420                 if share['upstreamRejectReason'] is PendingUpstream:
421                         share['upstreamRejectReason'] = reason
422                         share['upstreamResult'] = not reason
423                         logShare(share)
424 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
425
426 def checkData(share):
427         data = share['data']
428         data = data[:80]
429         (prevBlock, height, bits) = MM.currentBlock
430         sharePrevBlock = data[4:36]
431         if sharePrevBlock != prevBlock:
432                 if sharePrevBlock == MM.lastBlock[0]:
433                         raise RejectedShare('stale-prevblk')
434                 raise RejectedShare('bad-prevblk')
435         
436         if data[72:76] != bits:
437                 raise RejectedShare('bad-diffbits')
438         
439         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
440         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
441         if data[1:4] != b'\0\0\0' or data[0] > 2:
442                 raise RejectedShare('bad-version')
443
444 def buildStratumData(share, merkleroot):
445         (prevBlock, height, bits) = MM.currentBlock
446         
447         data = b'\x02\0\0\0'
448         data += prevBlock
449         data += merkleroot
450         data += share['ntime'][::-1]
451         data += bits
452         data += share['nonce'][::-1]
453         
454         share['data'] = data
455         return data
456
457 def IsJobValid(wli, wluser = None):
458         if wluser not in workLog:
459                 return False
460         if wli not in workLog[wluser]:
461                 return False
462         (wld, issueT) = workLog[wluser][wli]
463         if time() < issueT - 120:
464                 return False
465         return True
466
467 def checkShare(share):
468         shareTime = share['time'] = time()
469         
470         username = share['username']
471         if 'data' in share:
472                 # getwork/GBT
473                 checkData(share)
474                 data = share['data']
475                 
476                 if username not in workLog:
477                         raise RejectedShare('unknown-user')
478                 MWL = workLog[username]
479                 
480                 shareMerkleRoot = data[36:68]
481                 if 'blkdata' in share:
482                         pl = share['blkdata']
483                         (txncount, pl) = varlenDecode(pl)
484                         cbtxn = bitcoin.txn.Txn(pl)
485                         othertxndata = cbtxn.disassemble(retExtra=True)
486                         coinbase = cbtxn.getCoinbase()
487                         wliPos = coinbase[0] + 2
488                         wliLen = coinbase[wliPos - 1]
489                         wli = coinbase[wliPos:wliPos+wliLen]
490                         mode = 'MC'
491                         moden = 1
492                 else:
493                         wli = shareMerkleRoot
494                         mode = 'MRD'
495                         moden = 0
496                         coinbase = None
497         else:
498                 # Stratum
499                 MWL = workLog[None]
500                 wli = share['jobid']
501                 buildStratumData(share, b'\0' * 32)
502                 mode = 'MC'
503                 moden = 1
504                 othertxndata = b''
505         
506         if wli not in MWL:
507                 raise RejectedShare('unknown-work')
508         (wld, issueT) = MWL[wli]
509         share[mode] = wld
510         
511         share['issuetime'] = issueT
512         
513         (workMerkleTree, workCoinbase) = wld[1:3]
514         share['merkletree'] = workMerkleTree
515         if 'jobid' in share:
516                 cbtxn = deepcopy(workMerkleTree.data[0])
517                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
518                 cbtxn.setCoinbase(coinbase)
519                 cbtxn.assemble()
520                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
521                 shareMerkleRoot = data[36:68]
522         
523         if data in DupeShareHACK:
524                 raise RejectedShare('duplicate')
525         DupeShareHACK[data] = None
526         
527         blkhash = dblsha(data)
528         if blkhash[28:] != b'\0\0\0\0':
529                 raise RejectedShare('H-not-zero')
530         blkhashn = LEhash2int(blkhash)
531         
532         global networkTarget
533         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
534         logfunc('BLKHASH: %64x' % (blkhashn,))
535         logfunc(' TARGET: %64x' % (networkTarget,))
536         
537         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
538         txlist = workMerkleTree.data
539         txlist = [deepcopy(txlist[0]),] + txlist[1:]
540         cbtxn = txlist[0]
541         cbtxn.setCoinbase(coinbase or workCoinbase)
542         cbtxn.assemble()
543         
544         if blkhashn <= networkTarget:
545                 logfunc("Submitting upstream")
546                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
547                 if not moden:
548                         payload = assembleBlock(data, txlist)
549                 else:
550                         payload = share['data']
551                         if len(othertxndata):
552                                 payload += share['blkdata']
553                         else:
554                                 payload += assembleBlock(data, txlist)[80:]
555                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
556                 RBPs.append(payload)
557                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
558                 bcnode.submitBlock(payload)
559                 if config.DelayLogForUpstream:
560                         share['upstreamRejectReason'] = PendingUpstream
561                 else:
562                         share['upstreamRejectReason'] = None
563                         share['upstreamResult'] = True
564                 MM.updateBlock(blkhash)
565         
566         # Gotwork hack...
567         if gotwork and blkhashn <= config.GotWorkTarget:
568                 try:
569                         coinbaseMrkl = cbtxn.data
570                         coinbaseMrkl += blkhash
571                         steps = workMerkleTree._steps
572                         coinbaseMrkl += pack('B', len(steps))
573                         for step in steps:
574                                 coinbaseMrkl += step
575                         coinbaseMrkl += b"\0\0\0\0"
576                         info = {}
577                         info['hash'] = b2a_hex(blkhash).decode('ascii')
578                         info['header'] = b2a_hex(data).decode('ascii')
579                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
580                         thr = threading.Thread(target=submitGotwork, args=(info,))
581                         thr.daemon = True
582                         thr.start()
583                 except:
584                         checkShare.logger.warning('Failed to build gotwork request')
585         
586         if 'target' in share:
587                 workTarget = share['target']
588         elif len(wld) > 6:
589                 workTarget = wld[6]
590         else:
591                 workTarget = None
592         
593         if workTarget is None:
594                 workTarget = config.ShareTarget
595         if blkhashn > workTarget:
596                 raise RejectedShare('high-hash')
597         share['target'] = workTarget
598         share['_targethex'] = '%064x' % (workTarget,)
599         
600         shareTimestamp = unpack('<L', data[68:72])[0]
601         if shareTime < issueT - 120:
602                 raise RejectedShare('stale-work')
603         if shareTimestamp < shareTime - 300:
604                 raise RejectedShare('time-too-old')
605         if shareTimestamp > shareTime + 7200:
606                 raise RejectedShare('time-too-new')
607         
608         if config.DynamicTargetting and username in userStatus:
609                 # NOTE: userStatus[username] only doesn't exist across restarts
610                 status = userStatus[username]
611                 target = status[0] or config.ShareTarget
612                 if target == workTarget:
613                         userStatus[username][2] += 1
614                 else:
615                         userStatus[username][2] += float(target) / workTarget
616         
617         if moden:
618                 cbpre = workCoinbase
619                 cbpreLen = len(cbpre)
620                 if coinbase[:cbpreLen] != cbpre:
621                         raise RejectedShare('bad-cb-prefix')
622                 
623                 # Filter out known "I support" flags, to prevent exploits
624                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
625                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
626                                 raise RejectedShare('bad-cb-flag')
627                 
628                 if len(coinbase) > 100:
629                         raise RejectedShare('bad-cb-length')
630                 
631                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
632                         raise RejectedShare('bad-txnmrklroot')
633                 
634                 if len(othertxndata):
635                         allowed = assembleBlock(data, txlist)[80:]
636                         if allowed != share['blkdata']:
637                                 raise RejectedShare('bad-txns')
638 checkShare.logger = logging.getLogger('checkShare')
639
640 def logShare(share):
641         if '_origdata' in share:
642                 share['solution'] = share['_origdata']
643         else:
644                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
645         for i in loggersShare:
646                 i.logShare(share)
647
648 def receiveShare(share):
649         # TODO: username => userid
650         try:
651                 checkShare(share)
652         except RejectedShare as rej:
653                 share['rejectReason'] = str(rej)
654                 raise
655         except BaseException as e:
656                 share['rejectReason'] = 'ERROR'
657                 raise
658         finally:
659                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
660                         logShare(share)
661
662 def newBlockNotification():
663         logging.getLogger('newBlockNotification').info('Received new block notification')
664         MM.updateMerkleTree()
665         # TODO: Force RESPOND TO LONGPOLLS?
666         pass
667
668 def newBlockNotificationSIGNAL(signum, frame):
669         # Use a new thread, in case the signal handler is called with locks held
670         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
671         thr.daemon = True
672         thr.start()
673
674 from signal import signal, SIGUSR1
675 signal(SIGUSR1, newBlockNotificationSIGNAL)
676
677
678 import os
679 import os.path
680 import pickle
681 import signal
682 import sys
683 from time import sleep
684 import traceback
685
686 if getattr(config, 'SaveStateFilename', None) is None:
687         config.SaveStateFilename = 'eloipool.worklog'
688
689 def stopServers():
690         logger = logging.getLogger('stopServers')
691         
692         if hasattr(stopServers, 'already'):
693                 logger.debug('Already tried to stop servers before')
694                 return
695         stopServers.already = True
696         
697         logger.info('Stopping servers...')
698         global bcnode, server
699         servers = (bcnode, server, stratumsrv)
700         for s in servers:
701                 s.keepgoing = False
702         for s in servers:
703                 try:
704                         s.wakeup()
705                 except:
706                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
707         i = 0
708         while True:
709                 sl = []
710                 for s in servers:
711                         if s.running:
712                                 sl.append(s.__class__.__name__)
713                 if not sl:
714                         break
715                 i += 1
716                 if i >= 0x100:
717                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
718                         break
719                 sleep(0.01)
720         
721         for s in servers:
722                 for fd in s._fd.keys():
723                         os.close(fd)
724
725 def stopLoggers():
726         for i in loggersShare:
727                 if hasattr(i, 'stop'):
728                         i.stop()
729
730 def saveState(SAVE_STATE_FILENAME, t = None):
731         logger = logging.getLogger('saveState')
732         
733         # Then, save data needed to resume work
734         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
735         i = 0
736         while True:
737                 try:
738                         with open(SAVE_STATE_FILENAME, 'wb') as f:
739                                 pickle.dump(t, f)
740                                 pickle.dump(DupeShareHACK, f)
741                                 pickle.dump(workLog, f)
742                         break
743                 except:
744                         i += 1
745                         if i >= 0x10000:
746                                 logger.error('Failed to save work\n' + traceback.format_exc())
747                                 try:
748                                         os.unlink(SAVE_STATE_FILENAME)
749                                 except:
750                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
751
752 def exit():
753         t = time()
754         stopServers()
755         stopLoggers()
756         saveState(config.SaveStateFilename, t=t)
757         logging.getLogger('exit').info('Goodbye...')
758         os.kill(os.getpid(), signal.SIGTERM)
759         sys.exit(0)
760
761 def restart():
762         t = time()
763         stopServers()
764         stopLoggers()
765         saveState(config.SaveStateFilename, t=t)
766         logging.getLogger('restart').info('Restarting...')
767         try:
768                 os.execv(sys.argv[0], sys.argv)
769         except:
770                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
771
772 def restoreState(SAVE_STATE_FILENAME):
773         if not os.path.exists(SAVE_STATE_FILENAME):
774                 return
775         
776         global workLog, DupeShareHACK
777         
778         logger = logging.getLogger('restoreState')
779         s = os.stat(SAVE_STATE_FILENAME)
780         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
781         try:
782                 with open(SAVE_STATE_FILENAME, 'rb') as f:
783                         t = pickle.load(f)
784                         if type(t) == tuple:
785                                 if len(t) > 2:
786                                         # Future formats, not supported here
787                                         ver = t[3]
788                                         # TODO
789                                 
790                                 # Old format, from 2012-02-02 to 2012-02-03
791                                 workLog = t[0]
792                                 DupeShareHACK = t[1]
793                                 t = None
794                         else:
795                                 if isinstance(t, dict):
796                                         # Old format, from 2012-02-03 to 2012-02-03
797                                         DupeShareHACK = t
798                                         t = None
799                                 else:
800                                         # Current format, from 2012-02-03 onward
801                                         DupeShareHACK = pickle.load(f)
802                                 
803                                 if t + 120 >= time():
804                                         workLog = pickle.load(f)
805                                 else:
806                                         logger.debug('Skipping restore of expired workLog')
807         except:
808                 logger.error('Failed to restore state\n' + traceback.format_exc())
809                 return
810         logger.info('State restored successfully')
811         if t:
812                 logger.info('Total downtime: %g seconds' % (time() - t,))
813
814
815 from jsonrpcserver import JSONRPCListener, JSONRPCServer
816 import interactivemode
817 from networkserver import NetworkListener
818 import threading
819 import sharelogging
820 from stratumserver import StratumServer
821 import imp
822
823 if __name__ == "__main__":
824         if not hasattr(config, 'ShareLogging'):
825                 config.ShareLogging = ()
826         if hasattr(config, 'DbOptions'):
827                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
828                 config.ShareLogging = list(config.ShareLogging)
829                 config.ShareLogging.append( {
830                         'type': 'sql',
831                         'engine': 'postgres',
832                         'dbopts': config.DbOptions,
833                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
834                 } )
835         for i in config.ShareLogging:
836                 if not hasattr(i, 'keys'):
837                         name, parameters = i
838                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
839                         if name == 'postgres':
840                                 name = 'sql'
841                                 i = {
842                                         'engine': 'postgres',
843                                         'dbopts': parameters,
844                                 }
845                         elif name == 'logfile':
846                                 i = {}
847                                 i['thropts'] = parameters
848                                 if 'filename' in parameters:
849                                         i['filename'] = parameters['filename']
850                                         i['thropts'] = dict(i['thropts'])
851                                         del i['thropts']['filename']
852                         else:
853                                 i = parameters
854                         i['type'] = name
855                 
856                 name = i['type']
857                 parameters = i
858                 try:
859                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
860                         m = imp.load_module(name, fp, pathname, description)
861                         lo = getattr(m, name)(**parameters)
862                         loggersShare.append(lo)
863                 except:
864                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
865
866         LSbc = []
867         if not hasattr(config, 'BitcoinNodeAddresses'):
868                 config.BitcoinNodeAddresses = ()
869         for a in config.BitcoinNodeAddresses:
870                 LSbc.append(NetworkListener(bcnode, a))
871         
872         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
873                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
874         
875         import jsonrpc_getblocktemplate
876         import jsonrpc_getwork
877         import jsonrpc_setworkaux
878         
879         server = JSONRPCServer()
880         server.tls = threading.local()
881         server.tls.wantClear = False
882         if hasattr(config, 'JSONRPCAddress'):
883                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
884                 if not hasattr(config, 'JSONRPCAddresses'):
885                         config.JSONRPCAddresses = []
886                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
887         LS = []
888         for a in config.JSONRPCAddresses:
889                 LS.append(JSONRPCListener(server, a))
890         if hasattr(config, 'SecretUser'):
891                 server.SecretUser = config.SecretUser
892         server.aux = MM.CoinbaseAux
893         server.getBlockHeader = getBlockHeader
894         server.getBlockTemplate = getBlockTemplate
895         server.receiveShare = receiveShare
896         server.RaiseRedFlags = RaiseRedFlags
897         server.ShareTarget = config.ShareTarget
898         
899         if hasattr(config, 'TrustedForwarders'):
900                 server.TrustedForwarders = config.TrustedForwarders
901         server.ServerName = config.ServerName
902         
903         stratumsrv = StratumServer()
904         stratumsrv.getStratumJob = getStratumJob
905         stratumsrv.getExistingStratumJob = getExistingStratumJob
906         stratumsrv.receiveShare = receiveShare
907         stratumsrv.getTarget = getTarget
908         stratumsrv.defaultTarget = config.ShareTarget
909         stratumsrv.IsJobValid = IsJobValid
910         if not hasattr(config, 'StratumAddresses'):
911                 config.StratumAddresses = ()
912         for a in config.StratumAddresses:
913                 NetworkListener(stratumsrv, a)
914         
915         MM.start()
916         
917         restoreState(config.SaveStateFilename)
918         
919         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
920         prune_thr.daemon = True
921         prune_thr.start()
922         
923         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
924         bcnode_thr.daemon = True
925         bcnode_thr.start()
926         
927         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
928         stratum_thr.daemon = True
929         stratum_thr.start()
930         
931         server.serve_forever()