Add command-line option --config (-c) for specifying a config to load rather than...
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import argparse
19 import importlib
20 argparser = argparse.ArgumentParser()
21 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
22 args = argparser.parse_args()
23 configmod = 'config'
24 if not args.config is None:
25         configmod = 'config_%s' % (args.config,)
26 config = importlib.import_module(configmod)
27
28 if not hasattr(config, 'ServerName'):
29         config.ServerName = 'Unnamed Eloipool'
30
31 if not hasattr(config, 'ShareTarget'):
32         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
33
34
35 import logging
36 import logging.handlers
37
38 rootlogger = logging.getLogger(None)
39 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
40 logformatter = logging.Formatter(logformat)
41 if len(rootlogger.handlers) == 0:
42         logging.basicConfig(
43                 format=logformat,
44                 level=logging.DEBUG,
45         )
46         for infoOnly in (
47                 'checkShare',
48                 'getTarget',
49                 'JSONRPCHandler',
50                 'JSONRPCServer',
51                 'merkleMaker',
52                 'StratumServer',
53                 'Waker for JSONRPCServer',
54                 'Waker for StratumServer',
55                 'WorkLogPruner'
56         ):
57                 logging.getLogger(infoOnly).setLevel(logging.INFO)
58 if getattr(config, 'LogToSysLog', False):
59         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
60         rootlogger.addHandler(sysloghandler)
61 if hasattr(config, 'LogFile'):
62         if isinstance(config.LogFile, str):
63                 filehandler = logging.FileHandler(config.LogFile)
64         else:
65                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
66         filehandler.setFormatter(logformatter)
67         rootlogger.addHandler(filehandler)
68
69 def RaiseRedFlags(reason):
70         logging.getLogger('redflag').critical(reason)
71         return reason
72
73
74 from bitcoin.node import BitcoinLink, BitcoinNode
75 bcnode = BitcoinNode(config.UpstreamNetworkId)
76 bcnode.userAgent += b'Eloipool:0.1/'
77 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
78
79 import jsonrpc
80
81 try:
82         import jsonrpc.authproxy
83         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
84 except:
85         pass
86
87
88 from bitcoin.script import BitcoinScript
89 from bitcoin.txn import Txn
90 from base58 import b58decode
91 from struct import pack
92 import subprocess
93 from time import time
94
95 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
96         txn = Txn.new()
97         
98         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
99                 coinbased = 0
100                 try:
101                         cmd = config.CoinbaserCmd
102                         cmd = cmd.replace('%d', str(coinbaseValue))
103                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
104                         nout = int(p.stdout.readline())
105                         for i in range(nout):
106                                 amount = int(p.stdout.readline())
107                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
108                                 pkScript = BitcoinScript.toAddress(addr)
109                                 txn.addOutput(amount, pkScript)
110                                 coinbased += amount
111                 except:
112                         coinbased = coinbaseValue + 1
113                 if coinbased >= coinbaseValue:
114                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
115                         txn.outputs = []
116                 else:
117                         coinbaseValue -= coinbased
118         
119         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
120         txn.addOutput(coinbaseValue, pkScript)
121         
122         # TODO
123         # TODO: red flag on dupe coinbase
124         return txn
125
126
127 import jsonrpc_getwork
128 from util import Bits2Target
129
130 workLog = {}
131 userStatus = {}
132 networkTarget = None
133 DupeShareHACK = {}
134
135 server = None
136 stratumsrv = None
137 def updateBlocks():
138         server.wakeLongpoll()
139         stratumsrv.updateJob()
140
141 def blockChanged():
142         global MM, networkTarget, server
143         bits = MM.currentBlock[2]
144         if bits is None:
145                 networkTarget = None
146         else:
147                 networkTarget = Bits2Target(bits)
148         if MM.lastBlock != (None, None, None):
149                 global DupeShareHACK
150                 DupeShareHACK = {}
151                 jsonrpc_getwork._CheckForDupesHACK = {}
152                 workLog.clear()
153         server.wakeLongpoll(wantClear=True)
154         stratumsrv.updateJob(wantClear=True)
155
156
157 from time import sleep, time
158 import traceback
159
160 def _WorkLogPruner_I(wl):
161         now = time()
162         pruned = 0
163         for username in wl:
164                 userwork = wl[username]
165                 for wli in tuple(userwork.keys()):
166                         if now > userwork[wli][1] + 120:
167                                 del userwork[wli]
168                                 pruned += 1
169         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
170
171 def WorkLogPruner(wl):
172         while True:
173                 try:
174                         sleep(60)
175                         _WorkLogPruner_I(wl)
176                 except:
177                         WorkLogPruner.logger.error(traceback.format_exc())
178 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
179
180
181 from merklemaker import merkleMaker
182 MM = merkleMaker()
183 MM.__dict__.update(config.__dict__)
184 MM.makeCoinbaseTxn = makeCoinbaseTxn
185 MM.onBlockChange = blockChanged
186 MM.onBlockUpdate = updateBlocks
187
188
189 from binascii import b2a_hex
190 from copy import deepcopy
191 from math import ceil, log
192 from merklemaker import MakeBlockHeader
193 from struct import pack, unpack
194 import threading
195 from time import time
196 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
197 import jsonrpc
198 import traceback
199
200 gotwork = None
201 if hasattr(config, 'GotWorkURI'):
202         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
203
204 if not hasattr(config, 'DelayLogForUpstream'):
205         config.DelayLogForUpstream = False
206
207 if not hasattr(config, 'DynamicTargetting'):
208         config.DynamicTargetting = 0
209 else:
210         if not hasattr(config, 'DynamicTargetWindow'):
211                 config.DynamicTargetWindow = 120
212         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
213
214 def submitGotwork(info):
215         try:
216                 gotwork.gotwork(info)
217         except:
218                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
219
220 def clampTarget(target, DTMode):
221         # ShareTarget is the minimum
222         if target is None or target > config.ShareTarget:
223                 target = config.ShareTarget
224         
225         # Never target above the network, as we'd lose blocks
226         if target < networkTarget:
227                 target = networkTarget
228         
229         if DTMode == 2:
230                 # Ceil target to a power of two :)
231                 truebits = log(target, 2)
232                 if target <= 2**int(truebits):
233                         # Workaround for bug in Python's math.log function
234                         truebits = int(truebits)
235                 target = 2**ceil(truebits) - 1
236         elif DTMode == 3:
237                 # Round target to multiple of bdiff 1
238                 target = bdiff1target / int(round(target2bdiff(target)))
239         
240         # Return None for ShareTarget to save memory
241         if target == config.ShareTarget:
242                 return None
243         return target
244
245 def getTarget(username, now, DTMode = None, RequestedTarget = None):
246         if DTMode is None:
247                 DTMode = config.DynamicTargetting
248         if not DTMode:
249                 return None
250         if username in userStatus:
251                 status = userStatus[username]
252         else:
253                 # No record, use default target
254                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
255                 userStatus[username] = [RequestedTarget, now, 0]
256                 return RequestedTarget
257         (targetIn, lastUpdate, work) = status
258         if work <= config.DynamicTargetGoal:
259                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
260                         # No reason to change it just yet
261                         return clampTarget(targetIn, DTMode)
262                 if not work:
263                         # No shares received, reset to minimum
264                         if targetIn:
265                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
266                                 userStatus[username] = [None, now, 0]
267                         return clampTarget(None, DTMode)
268         
269         deltaSec = now - lastUpdate
270         target = targetIn or config.ShareTarget
271         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
272         target = clampTarget(target, DTMode)
273         if target != targetIn:
274                 pfx = 'Retargetting %s' % (repr(username),)
275                 tin = targetIn or config.ShareTarget
276                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
277                 tgt = target or config.ShareTarget
278                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
279         userStatus[username] = [target, now, 0]
280         return target
281 getTarget.logger = logging.getLogger('getTarget')
282
283 def TopTargets(n = 0x10):
284         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
285         tmp.sort(key=lambda k: -userStatus[k][0])
286         tmp2 = {}
287         def t2d(t):
288                 if t not in tmp2:
289                         tmp2[t] = target2pdiff(t)
290                 return tmp2[t]
291         for k in tmp[-n:]:
292                 tgt = userStatus[k][0]
293                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
294
295 def RegisterWork(username, wli, wld, RequestedTarget = None):
296         now = time()
297         target = getTarget(username, now, RequestedTarget=RequestedTarget)
298         wld = tuple(wld) + (target,)
299         workLog.setdefault(username, {})[wli] = (wld, now)
300         return target or config.ShareTarget
301
302 def getBlockHeader(username):
303         MRD = MM.getMRD()
304         merkleRoot = MRD[0]
305         hdr = MakeBlockHeader(MRD)
306         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
307         target = RegisterWork(username, merkleRoot, MRD)
308         return (hdr, workLog[username][merkleRoot], target)
309
310 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
311         if server.tls.wantClear:
312                 wantClear = True
313         elif p_magic and username not in workLog:
314                 wantClear = True
315                 p_magic[0] = True
316         else:
317                 wantClear = False
318         MC = MM.getMC(wantClear)
319         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
320         wliPos = coinbase[0] + 2
321         wliLen = coinbase[wliPos - 1]
322         wli = coinbase[wliPos:wliPos+wliLen]
323         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
324         return (MC, workLog[username][wli], target)
325
326 def getStratumJob(jobid, wantClear = False):
327         MC = MM.getMC(wantClear)
328         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
329         now = time()
330         workLog.setdefault(None, {})[jobid] = (MC, now)
331         return (MC, workLog[None][jobid])
332
333 def getExistingStratumJob(jobid):
334         wld = workLog[None][jobid]
335         return (wld[0], wld)
336
337 loggersShare = []
338
339 RBDs = []
340 RBPs = []
341
342 from bitcoin.varlen import varlenEncode, varlenDecode
343 import bitcoin.txn
344 from merklemaker import assembleBlock
345
346 if not hasattr(config, 'BlockSubmissions'):
347         config.BlockSubmissions = None
348
349 RBFs = []
350 def blockSubmissionThread(payload, blkhash, share):
351         if config.BlockSubmissions is None:
352                 servers = list(a for b in MM.TemplateSources for a in b)
353         else:
354                 servers = list(config.BlockSubmissions)
355         
356         if hasattr(share['merkletree'], 'source_uri'):
357                 servers.insert(0, {
358                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
359                         'name': share['merkletree'].source,
360                 })
361         elif not servers:
362                 servers = list(a for b in MM.TemplateSources for a in b)
363         
364         myblock = (blkhash, payload[4:36])
365         payload = b2a_hex(payload).decode('ascii')
366         nexterr = 0
367         tries = 0
368         success = False
369         while len(servers):
370                 tries += 1
371                 TS = servers.pop(0)
372                 UpstreamBitcoindJSONRPC = TS['access']
373                 try:
374                         # BIP 22 standard submitblock
375                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
376                 except BaseException as gbterr:
377                         gbterr_fmt = traceback.format_exc()
378                         try:
379                                 try:
380                                         # bitcoind 0.5/0.6 getmemorypool
381                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
382                                 except:
383                                         # Old BIP 22 draft getmemorypool
384                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
385                                 if reason is True:
386                                         reason = None
387                                 elif reason is False:
388                                         reason = 'rejected'
389                         except BaseException as gmperr:
390                                 now = time()
391                                 if now > nexterr:
392                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
393                                         RaiseRedFlags(gbterr_fmt)
394                                         nexterr = now + 5
395                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
396                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
397                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
398                                         if share['upstreamRejectReason'] is PendingUpstream:
399                                                 share['upstreamRejectReason'] = 'GAVE UP'
400                                                 share['upstreamResult'] = False
401                                                 logShare(share)
402                                         return
403                                 
404                                 servers.append(TS)
405                                 continue
406                 
407                 # At this point, we have a reason back
408                 if reason:
409                         # FIXME: The returned value could be a list of multiple responses
410                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
411                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
412                                 # no big deal
413                                 blockSubmissionThread.logger.debug(msg)
414                         else:
415                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
416                                 RaiseRedFlags(msg)
417                 else:
418                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
419                         success = True
420                 if share['upstreamRejectReason'] is PendingUpstream:
421                         share['upstreamRejectReason'] = reason
422                         share['upstreamResult'] = not reason
423                         logShare(share)
424 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
425
426 def checkData(share):
427         data = share['data']
428         data = data[:80]
429         (prevBlock, height, bits) = MM.currentBlock
430         sharePrevBlock = data[4:36]
431         if sharePrevBlock != prevBlock:
432                 if sharePrevBlock == MM.lastBlock[0]:
433                         raise RejectedShare('stale-prevblk')
434                 raise RejectedShare('bad-prevblk')
435         
436         if data[72:76] != bits:
437                 raise RejectedShare('bad-diffbits')
438         
439         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
440         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
441         if data[1:4] != b'\0\0\0' or data[0] > 2:
442                 raise RejectedShare('bad-version')
443
444 def buildStratumData(share, merkleroot):
445         (prevBlock, height, bits) = MM.currentBlock
446         
447         data = b'\x02\0\0\0'
448         data += prevBlock
449         data += merkleroot
450         data += share['ntime'][::-1]
451         data += bits
452         data += share['nonce'][::-1]
453         
454         share['data'] = data
455         return data
456
457 def IsJobValid(wli, wluser = None):
458         if wluser not in workLog:
459                 return False
460         if wli not in workLog[wluser]:
461                 return False
462         (wld, issueT) = workLog[wluser][wli]
463         if time() < issueT - 120:
464                 return False
465         return True
466
467 def checkShare(share):
468         shareTime = share['time'] = time()
469         
470         username = share['username']
471         if 'data' in share:
472                 # getwork/GBT
473                 checkData(share)
474                 data = share['data']
475                 
476                 if username not in workLog:
477                         raise RejectedShare('unknown-user')
478                 MWL = workLog[username]
479                 
480                 shareMerkleRoot = data[36:68]
481                 if 'blkdata' in share:
482                         pl = share['blkdata']
483                         (txncount, pl) = varlenDecode(pl)
484                         cbtxn = bitcoin.txn.Txn(pl)
485                         othertxndata = cbtxn.disassemble(retExtra=True)
486                         coinbase = cbtxn.getCoinbase()
487                         wliPos = coinbase[0] + 2
488                         wliLen = coinbase[wliPos - 1]
489                         wli = coinbase[wliPos:wliPos+wliLen]
490                         mode = 'MC'
491                         moden = 1
492                 else:
493                         wli = shareMerkleRoot
494                         mode = 'MRD'
495                         moden = 0
496                         coinbase = None
497         else:
498                 # Stratum
499                 MWL = workLog[None]
500                 wli = share['jobid']
501                 buildStratumData(share, b'\0' * 32)
502                 mode = 'MC'
503                 moden = 1
504                 othertxndata = b''
505         
506         if wli not in MWL:
507                 raise RejectedShare('unknown-work')
508         (wld, issueT) = MWL[wli]
509         share[mode] = wld
510         
511         share['issuetime'] = issueT
512         
513         (workMerkleTree, workCoinbase) = wld[1:3]
514         share['merkletree'] = workMerkleTree
515         if 'jobid' in share:
516                 cbtxn = deepcopy(workMerkleTree.data[0])
517                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
518                 cbtxn.setCoinbase(coinbase)
519                 cbtxn.assemble()
520                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
521                 shareMerkleRoot = data[36:68]
522         
523         if data in DupeShareHACK:
524                 raise RejectedShare('duplicate')
525         DupeShareHACK[data] = None
526         
527         blkhash = dblsha(data)
528         if blkhash[28:] != b'\0\0\0\0':
529                 raise RejectedShare('H-not-zero')
530         blkhashn = LEhash2int(blkhash)
531         
532         global networkTarget
533         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
534         logfunc('BLKHASH: %64x' % (blkhashn,))
535         logfunc(' TARGET: %64x' % (networkTarget,))
536         
537         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
538         txlist = workMerkleTree.data
539         txlist = [deepcopy(txlist[0]),] + txlist[1:]
540         cbtxn = txlist[0]
541         cbtxn.setCoinbase(coinbase or workCoinbase)
542         cbtxn.assemble()
543         
544         if blkhashn <= networkTarget:
545                 logfunc("Submitting upstream")
546                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
547                 if not moden:
548                         payload = assembleBlock(data, txlist)
549                 else:
550                         payload = share['data']
551                         if len(othertxndata):
552                                 payload += share['blkdata']
553                         else:
554                                 payload += assembleBlock(data, txlist)[80:]
555                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
556                 RBPs.append(payload)
557                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
558                 bcnode.submitBlock(payload)
559                 if config.DelayLogForUpstream:
560                         share['upstreamRejectReason'] = PendingUpstream
561                 else:
562                         share['upstreamRejectReason'] = None
563                         share['upstreamResult'] = True
564                 MM.updateBlock(blkhash)
565         
566         # Gotwork hack...
567         if gotwork and blkhashn <= config.GotWorkTarget:
568                 try:
569                         coinbaseMrkl = cbtxn.data
570                         coinbaseMrkl += blkhash
571                         steps = workMerkleTree._steps
572                         coinbaseMrkl += pack('B', len(steps))
573                         for step in steps:
574                                 coinbaseMrkl += step
575                         coinbaseMrkl += b"\0\0\0\0"
576                         info = {}
577                         info['hash'] = b2a_hex(blkhash).decode('ascii')
578                         info['header'] = b2a_hex(data).decode('ascii')
579                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
580                         thr = threading.Thread(target=submitGotwork, args=(info,))
581                         thr.daemon = True
582                         thr.start()
583                 except:
584                         checkShare.logger.warning('Failed to build gotwork request')
585         
586         if 'target' in share:
587                 workTarget = share['target']
588         elif len(wld) > 6:
589                 workTarget = wld[6]
590         else:
591                 workTarget = None
592         
593         if workTarget is None:
594                 workTarget = config.ShareTarget
595         if blkhashn > workTarget:
596                 raise RejectedShare('high-hash')
597         share['target'] = workTarget
598         share['_targethex'] = '%064x' % (workTarget,)
599         
600         shareTimestamp = unpack('<L', data[68:72])[0]
601         if shareTime < issueT - 120:
602                 raise RejectedShare('stale-work')
603         if shareTimestamp < shareTime - 300:
604                 raise RejectedShare('time-too-old')
605         if shareTimestamp > shareTime + 7200:
606                 raise RejectedShare('time-too-new')
607         
608         if config.DynamicTargetting and username in userStatus:
609                 # NOTE: userStatus[username] only doesn't exist across restarts
610                 status = userStatus[username]
611                 target = status[0] or config.ShareTarget
612                 if target == workTarget:
613                         userStatus[username][2] += 1
614                 else:
615                         userStatus[username][2] += float(target) / workTarget
616         
617         if moden:
618                 cbpre = workCoinbase
619                 cbpreLen = len(cbpre)
620                 if coinbase[:cbpreLen] != cbpre:
621                         raise RejectedShare('bad-cb-prefix')
622                 
623                 # Filter out known "I support" flags, to prevent exploits
624                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
625                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
626                                 raise RejectedShare('bad-cb-flag')
627                 
628                 if len(coinbase) > 100:
629                         raise RejectedShare('bad-cb-length')
630                 
631                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
632                         raise RejectedShare('bad-txnmrklroot')
633                 
634                 if len(othertxndata):
635                         allowed = assembleBlock(data, txlist)[80:]
636                         if allowed != share['blkdata']:
637                                 raise RejectedShare('bad-txns')
638 checkShare.logger = logging.getLogger('checkShare')
639
640 def logShare(share):
641         if '_origdata' in share:
642                 share['solution'] = share['_origdata']
643         else:
644                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
645         for i in loggersShare:
646                 i.logShare(share)
647
648 def receiveShare(share):
649         # TODO: username => userid
650         try:
651                 checkShare(share)
652         except RejectedShare as rej:
653                 share['rejectReason'] = str(rej)
654                 raise
655         except BaseException as e:
656                 share['rejectReason'] = 'ERROR'
657                 raise
658         finally:
659                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
660                         logShare(share)
661
662 def newBlockNotification():
663         logging.getLogger('newBlockNotification').info('Received new block notification')
664         MM.updateMerkleTree()
665         # TODO: Force RESPOND TO LONGPOLLS?
666         pass
667
668 def newBlockNotificationSIGNAL(signum, frame):
669         # Use a new thread, in case the signal handler is called with locks held
670         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
671         thr.daemon = True
672         thr.start()
673
674 from signal import signal, SIGUSR1
675 signal(SIGUSR1, newBlockNotificationSIGNAL)
676
677
678 import os
679 import os.path
680 import pickle
681 import signal
682 import sys
683 from time import sleep
684 import traceback
685
686 SAVE_STATE_FILENAME = 'eloipool.worklog'
687
688 def stopServers():
689         logger = logging.getLogger('stopServers')
690         
691         if hasattr(stopServers, 'already'):
692                 logger.debug('Already tried to stop servers before')
693                 return
694         stopServers.already = True
695         
696         logger.info('Stopping servers...')
697         global bcnode, server
698         servers = (bcnode, server, stratumsrv)
699         for s in servers:
700                 s.keepgoing = False
701         for s in servers:
702                 try:
703                         s.wakeup()
704                 except:
705                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
706         i = 0
707         while True:
708                 sl = []
709                 for s in servers:
710                         if s.running:
711                                 sl.append(s.__class__.__name__)
712                 if not sl:
713                         break
714                 i += 1
715                 if i >= 0x100:
716                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
717                         break
718                 sleep(0.01)
719         
720         for s in servers:
721                 for fd in s._fd.keys():
722                         os.close(fd)
723
724 def stopLoggers():
725         for i in loggersShare:
726                 if hasattr(i, 'stop'):
727                         i.stop()
728
729 def saveState(t = None):
730         logger = logging.getLogger('saveState')
731         
732         # Then, save data needed to resume work
733         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
734         i = 0
735         while True:
736                 try:
737                         with open(SAVE_STATE_FILENAME, 'wb') as f:
738                                 pickle.dump(t, f)
739                                 pickle.dump(DupeShareHACK, f)
740                                 pickle.dump(workLog, f)
741                         break
742                 except:
743                         i += 1
744                         if i >= 0x10000:
745                                 logger.error('Failed to save work\n' + traceback.format_exc())
746                                 try:
747                                         os.unlink(SAVE_STATE_FILENAME)
748                                 except:
749                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
750
751 def exit():
752         t = time()
753         stopServers()
754         stopLoggers()
755         saveState(t)
756         logging.getLogger('exit').info('Goodbye...')
757         os.kill(os.getpid(), signal.SIGTERM)
758         sys.exit(0)
759
760 def restart():
761         t = time()
762         stopServers()
763         stopLoggers()
764         saveState(t)
765         logging.getLogger('restart').info('Restarting...')
766         try:
767                 os.execv(sys.argv[0], sys.argv)
768         except:
769                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
770
771 def restoreState():
772         if not os.path.exists(SAVE_STATE_FILENAME):
773                 return
774         
775         global workLog, DupeShareHACK
776         
777         logger = logging.getLogger('restoreState')
778         s = os.stat(SAVE_STATE_FILENAME)
779         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
780         try:
781                 with open(SAVE_STATE_FILENAME, 'rb') as f:
782                         t = pickle.load(f)
783                         if type(t) == tuple:
784                                 if len(t) > 2:
785                                         # Future formats, not supported here
786                                         ver = t[3]
787                                         # TODO
788                                 
789                                 # Old format, from 2012-02-02 to 2012-02-03
790                                 workLog = t[0]
791                                 DupeShareHACK = t[1]
792                                 t = None
793                         else:
794                                 if isinstance(t, dict):
795                                         # Old format, from 2012-02-03 to 2012-02-03
796                                         DupeShareHACK = t
797                                         t = None
798                                 else:
799                                         # Current format, from 2012-02-03 onward
800                                         DupeShareHACK = pickle.load(f)
801                                 
802                                 if t + 120 >= time():
803                                         workLog = pickle.load(f)
804                                 else:
805                                         logger.debug('Skipping restore of expired workLog')
806         except:
807                 logger.error('Failed to restore state\n' + traceback.format_exc())
808                 return
809         logger.info('State restored successfully')
810         if t:
811                 logger.info('Total downtime: %g seconds' % (time() - t,))
812
813
814 from jsonrpcserver import JSONRPCListener, JSONRPCServer
815 import interactivemode
816 from networkserver import NetworkListener
817 import threading
818 import sharelogging
819 from stratumserver import StratumServer
820 import imp
821
822 if __name__ == "__main__":
823         if not hasattr(config, 'ShareLogging'):
824                 config.ShareLogging = ()
825         if hasattr(config, 'DbOptions'):
826                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
827                 config.ShareLogging = list(config.ShareLogging)
828                 config.ShareLogging.append( {
829                         'type': 'sql',
830                         'engine': 'postgres',
831                         'dbopts': config.DbOptions,
832                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
833                 } )
834         for i in config.ShareLogging:
835                 if not hasattr(i, 'keys'):
836                         name, parameters = i
837                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
838                         if name == 'postgres':
839                                 name = 'sql'
840                                 i = {
841                                         'engine': 'postgres',
842                                         'dbopts': parameters,
843                                 }
844                         elif name == 'logfile':
845                                 i = {}
846                                 i['thropts'] = parameters
847                                 if 'filename' in parameters:
848                                         i['filename'] = parameters['filename']
849                                         i['thropts'] = dict(i['thropts'])
850                                         del i['thropts']['filename']
851                         else:
852                                 i = parameters
853                         i['type'] = name
854                 
855                 name = i['type']
856                 parameters = i
857                 try:
858                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
859                         m = imp.load_module(name, fp, pathname, description)
860                         lo = getattr(m, name)(**parameters)
861                         loggersShare.append(lo)
862                 except:
863                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
864
865         LSbc = []
866         if not hasattr(config, 'BitcoinNodeAddresses'):
867                 config.BitcoinNodeAddresses = ()
868         for a in config.BitcoinNodeAddresses:
869                 LSbc.append(NetworkListener(bcnode, a))
870         
871         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
872                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
873         
874         import jsonrpc_getblocktemplate
875         import jsonrpc_getwork
876         import jsonrpc_setworkaux
877         
878         server = JSONRPCServer()
879         server.tls = threading.local()
880         server.tls.wantClear = False
881         if hasattr(config, 'JSONRPCAddress'):
882                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
883                 if not hasattr(config, 'JSONRPCAddresses'):
884                         config.JSONRPCAddresses = []
885                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
886         LS = []
887         for a in config.JSONRPCAddresses:
888                 LS.append(JSONRPCListener(server, a))
889         if hasattr(config, 'SecretUser'):
890                 server.SecretUser = config.SecretUser
891         server.aux = MM.CoinbaseAux
892         server.getBlockHeader = getBlockHeader
893         server.getBlockTemplate = getBlockTemplate
894         server.receiveShare = receiveShare
895         server.RaiseRedFlags = RaiseRedFlags
896         server.ShareTarget = config.ShareTarget
897         
898         if hasattr(config, 'TrustedForwarders'):
899                 server.TrustedForwarders = config.TrustedForwarders
900         server.ServerName = config.ServerName
901         
902         stratumsrv = StratumServer()
903         stratumsrv.getStratumJob = getStratumJob
904         stratumsrv.getExistingStratumJob = getExistingStratumJob
905         stratumsrv.receiveShare = receiveShare
906         stratumsrv.getTarget = getTarget
907         stratumsrv.defaultTarget = config.ShareTarget
908         stratumsrv.IsJobValid = IsJobValid
909         if not hasattr(config, 'StratumAddresses'):
910                 config.StratumAddresses = ()
911         for a in config.StratumAddresses:
912                 NetworkListener(stratumsrv, a)
913         
914         MM.start()
915         
916         restoreState()
917         
918         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
919         prune_thr.daemon = True
920         prune_thr.start()
921         
922         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
923         bcnode_thr.daemon = True
924         bcnode_thr.start()
925         
926         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
927         stratum_thr.daemon = True
928         stratum_thr.start()
929         
930         server.serve_forever()