Merge branch 'arg_config'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import argparse
19 import importlib
20 argparser = argparse.ArgumentParser()
21 argparser.add_argument('-c', '--config', help='Config name to load from config_<ARG>.py')
22 args = argparser.parse_args()
23 configmod = 'config'
24 if not args.config is None:
25         configmod = 'config_%s' % (args.config,)
26 __import__(configmod)
27 config = importlib.import_module(configmod)
28
29 if not hasattr(config, 'ServerName'):
30         config.ServerName = 'Unnamed Eloipool'
31
32 if not hasattr(config, 'ShareTarget'):
33         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
34
35
36 import logging
37 import logging.handlers
38
39 rootlogger = logging.getLogger(None)
40 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
41 logformatter = logging.Formatter(logformat)
42 if len(rootlogger.handlers) == 0:
43         logging.basicConfig(
44                 format=logformat,
45                 level=logging.DEBUG,
46         )
47         for infoOnly in (
48                 'checkShare',
49                 'getTarget',
50                 'JSONRPCHandler',
51                 'JSONRPCServer',
52                 'merkleMaker',
53                 'StratumServer',
54                 'Waker for JSONRPCServer',
55                 'Waker for StratumServer',
56                 'WorkLogPruner'
57         ):
58                 logging.getLogger(infoOnly).setLevel(logging.INFO)
59 if getattr(config, 'LogToSysLog', False):
60         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
61         rootlogger.addHandler(sysloghandler)
62 if hasattr(config, 'LogFile'):
63         if isinstance(config.LogFile, str):
64                 filehandler = logging.FileHandler(config.LogFile)
65         else:
66                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
67         filehandler.setFormatter(logformatter)
68         rootlogger.addHandler(filehandler)
69
70 def RaiseRedFlags(reason):
71         logging.getLogger('redflag').critical(reason)
72         return reason
73
74
75 from bitcoin.node import BitcoinLink, BitcoinNode
76 bcnode = BitcoinNode(config.UpstreamNetworkId)
77 bcnode.userAgent += b'Eloipool:0.1/'
78 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
79
80 import jsonrpc
81
82 try:
83         import jsonrpc.authproxy
84         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
85 except:
86         pass
87
88
89 from bitcoin.script import BitcoinScript
90 from bitcoin.txn import Txn
91 from base58 import b58decode
92 from struct import pack
93 import subprocess
94 from time import time
95
96 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
97         txn = Txn.new()
98         
99         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
100                 coinbased = 0
101                 try:
102                         cmd = config.CoinbaserCmd
103                         cmd = cmd.replace('%d', str(coinbaseValue))
104                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
105                         nout = int(p.stdout.readline())
106                         for i in range(nout):
107                                 amount = int(p.stdout.readline())
108                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
109                                 pkScript = BitcoinScript.toAddress(addr)
110                                 txn.addOutput(amount, pkScript)
111                                 coinbased += amount
112                 except:
113                         coinbased = coinbaseValue + 1
114                 if coinbased >= coinbaseValue:
115                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
116                         txn.outputs = []
117                 else:
118                         coinbaseValue -= coinbased
119         
120         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
121         txn.addOutput(coinbaseValue, pkScript)
122         
123         # TODO
124         # TODO: red flag on dupe coinbase
125         return txn
126
127
128 import jsonrpc_getwork
129 from util import Bits2Target
130
131 workLog = {}
132 userStatus = {}
133 networkTarget = None
134 DupeShareHACK = {}
135
136 server = None
137 stratumsrv = None
138 def updateBlocks():
139         server.wakeLongpoll()
140         stratumsrv.updateJob()
141
142 def blockChanged():
143         global MM, networkTarget, server
144         bits = MM.currentBlock[2]
145         if bits is None:
146                 networkTarget = None
147         else:
148                 networkTarget = Bits2Target(bits)
149         if MM.lastBlock != (None, None, None):
150                 global DupeShareHACK
151                 DupeShareHACK = {}
152                 jsonrpc_getwork._CheckForDupesHACK = {}
153                 workLog.clear()
154         server.wakeLongpoll(wantClear=True)
155         stratumsrv.updateJob(wantClear=True)
156
157
158 from time import sleep, time
159 import traceback
160
161 def _WorkLogPruner_I(wl):
162         now = time()
163         pruned = 0
164         for username in wl:
165                 userwork = wl[username]
166                 for wli in tuple(userwork.keys()):
167                         if now > userwork[wli][1] + 120:
168                                 del userwork[wli]
169                                 pruned += 1
170         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
171
172 def WorkLogPruner(wl):
173         while True:
174                 try:
175                         sleep(60)
176                         _WorkLogPruner_I(wl)
177                 except:
178                         WorkLogPruner.logger.error(traceback.format_exc())
179 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
180
181
182 from merklemaker import merkleMaker
183 MM = merkleMaker()
184 MM.__dict__.update(config.__dict__)
185 MM.makeCoinbaseTxn = makeCoinbaseTxn
186 MM.onBlockChange = blockChanged
187 MM.onBlockUpdate = updateBlocks
188
189
190 from binascii import b2a_hex
191 from copy import deepcopy
192 from math import ceil, log
193 from merklemaker import MakeBlockHeader
194 from struct import pack, unpack
195 import threading
196 from time import time
197 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
198 import jsonrpc
199 import traceback
200
201 gotwork = None
202 if hasattr(config, 'GotWorkURI'):
203         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
204
205 if not hasattr(config, 'DelayLogForUpstream'):
206         config.DelayLogForUpstream = False
207
208 if not hasattr(config, 'DynamicTargetting'):
209         config.DynamicTargetting = 0
210 else:
211         if not hasattr(config, 'DynamicTargetWindow'):
212                 config.DynamicTargetWindow = 120
213         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
214
215 def submitGotwork(info):
216         try:
217                 gotwork.gotwork(info)
218         except:
219                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
220
221 def clampTarget(target, DTMode):
222         # ShareTarget is the minimum
223         if target is None or target > config.ShareTarget:
224                 target = config.ShareTarget
225         
226         # Never target above the network, as we'd lose blocks
227         if target < networkTarget:
228                 target = networkTarget
229         
230         if DTMode == 2:
231                 # Ceil target to a power of two :)
232                 truebits = log(target, 2)
233                 if target <= 2**int(truebits):
234                         # Workaround for bug in Python's math.log function
235                         truebits = int(truebits)
236                 target = 2**ceil(truebits) - 1
237         elif DTMode == 3:
238                 # Round target to multiple of bdiff 1
239                 target = bdiff1target / int(round(target2bdiff(target)))
240         
241         # Return None for ShareTarget to save memory
242         if target == config.ShareTarget:
243                 return None
244         return target
245
246 def getTarget(username, now, DTMode = None, RequestedTarget = None):
247         if DTMode is None:
248                 DTMode = config.DynamicTargetting
249         if not DTMode:
250                 return None
251         if username in userStatus:
252                 status = userStatus[username]
253         else:
254                 # No record, use default target
255                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
256                 userStatus[username] = [RequestedTarget, now, 0]
257                 return RequestedTarget
258         (targetIn, lastUpdate, work) = status
259         if work <= config.DynamicTargetGoal:
260                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
261                         # No reason to change it just yet
262                         return clampTarget(targetIn, DTMode)
263                 if not work:
264                         # No shares received, reset to minimum
265                         if targetIn:
266                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
267                                 userStatus[username] = [None, now, 0]
268                         return clampTarget(None, DTMode)
269         
270         deltaSec = now - lastUpdate
271         target = targetIn or config.ShareTarget
272         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
273         target = clampTarget(target, DTMode)
274         if target != targetIn:
275                 pfx = 'Retargetting %s' % (repr(username),)
276                 tin = targetIn or config.ShareTarget
277                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
278                 tgt = target or config.ShareTarget
279                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
280         userStatus[username] = [target, now, 0]
281         return target
282 getTarget.logger = logging.getLogger('getTarget')
283
284 def TopTargets(n = 0x10):
285         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
286         tmp.sort(key=lambda k: -userStatus[k][0])
287         tmp2 = {}
288         def t2d(t):
289                 if t not in tmp2:
290                         tmp2[t] = target2pdiff(t)
291                 return tmp2[t]
292         for k in tmp[-n:]:
293                 tgt = userStatus[k][0]
294                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
295
296 def RegisterWork(username, wli, wld, RequestedTarget = None):
297         now = time()
298         target = getTarget(username, now, RequestedTarget=RequestedTarget)
299         wld = tuple(wld) + (target,)
300         workLog.setdefault(username, {})[wli] = (wld, now)
301         return target or config.ShareTarget
302
303 def getBlockHeader(username):
304         MRD = MM.getMRD()
305         merkleRoot = MRD[0]
306         hdr = MakeBlockHeader(MRD)
307         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
308         target = RegisterWork(username, merkleRoot, MRD)
309         return (hdr, workLog[username][merkleRoot], target)
310
311 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
312         if server.tls.wantClear:
313                 wantClear = True
314         elif p_magic and username not in workLog:
315                 wantClear = True
316                 p_magic[0] = True
317         else:
318                 wantClear = False
319         MC = MM.getMC(wantClear)
320         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
321         wliPos = coinbase[0] + 2
322         wliLen = coinbase[wliPos - 1]
323         wli = coinbase[wliPos:wliPos+wliLen]
324         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
325         return (MC, workLog[username][wli], target)
326
327 def getStratumJob(jobid, wantClear = False):
328         MC = MM.getMC(wantClear)
329         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
330         now = time()
331         workLog.setdefault(None, {})[jobid] = (MC, now)
332         return (MC, workLog[None][jobid])
333
334 def getExistingStratumJob(jobid):
335         wld = workLog[None][jobid]
336         return (wld[0], wld)
337
338 loggersShare = []
339
340 RBDs = []
341 RBPs = []
342
343 from bitcoin.varlen import varlenEncode, varlenDecode
344 import bitcoin.txn
345 from merklemaker import assembleBlock
346
347 if not hasattr(config, 'BlockSubmissions'):
348         config.BlockSubmissions = None
349
350 RBFs = []
351 def blockSubmissionThread(payload, blkhash, share):
352         if config.BlockSubmissions is None:
353                 servers = list(a for b in MM.TemplateSources for a in b)
354         else:
355                 servers = list(config.BlockSubmissions)
356         
357         if hasattr(share['merkletree'], 'source_uri'):
358                 servers.insert(0, {
359                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
360                         'name': share['merkletree'].source,
361                 })
362         elif not servers:
363                 servers = list(a for b in MM.TemplateSources for a in b)
364         
365         myblock = (blkhash, payload[4:36])
366         payload = b2a_hex(payload).decode('ascii')
367         nexterr = 0
368         tries = 0
369         success = False
370         while len(servers):
371                 tries += 1
372                 TS = servers.pop(0)
373                 UpstreamBitcoindJSONRPC = TS['access']
374                 try:
375                         # BIP 22 standard submitblock
376                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
377                 except BaseException as gbterr:
378                         gbterr_fmt = traceback.format_exc()
379                         try:
380                                 try:
381                                         # bitcoind 0.5/0.6 getmemorypool
382                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
383                                 except:
384                                         # Old BIP 22 draft getmemorypool
385                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
386                                 if reason is True:
387                                         reason = None
388                                 elif reason is False:
389                                         reason = 'rejected'
390                         except BaseException as gmperr:
391                                 now = time()
392                                 if now > nexterr:
393                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
394                                         RaiseRedFlags(gbterr_fmt)
395                                         nexterr = now + 5
396                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
397                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
398                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
399                                         if share['upstreamRejectReason'] is PendingUpstream:
400                                                 share['upstreamRejectReason'] = 'GAVE UP'
401                                                 share['upstreamResult'] = False
402                                                 logShare(share)
403                                         return
404                                 
405                                 servers.append(TS)
406                                 continue
407                 
408                 # At this point, we have a reason back
409                 if reason:
410                         # FIXME: The returned value could be a list of multiple responses
411                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
412                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
413                                 # no big deal
414                                 blockSubmissionThread.logger.debug(msg)
415                         else:
416                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
417                                 RaiseRedFlags(msg)
418                 else:
419                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
420                         success = True
421                 if share['upstreamRejectReason'] is PendingUpstream:
422                         share['upstreamRejectReason'] = reason
423                         share['upstreamResult'] = not reason
424                         logShare(share)
425 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
426
427 def checkData(share):
428         data = share['data']
429         data = data[:80]
430         (prevBlock, height, bits) = MM.currentBlock
431         sharePrevBlock = data[4:36]
432         if sharePrevBlock != prevBlock:
433                 if sharePrevBlock == MM.lastBlock[0]:
434                         raise RejectedShare('stale-prevblk')
435                 raise RejectedShare('bad-prevblk')
436         
437         if data[72:76] != bits:
438                 raise RejectedShare('bad-diffbits')
439         
440         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
441         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
442         if data[1:4] != b'\0\0\0' or data[0] > 2:
443                 raise RejectedShare('bad-version')
444
445 def buildStratumData(share, merkleroot):
446         (prevBlock, height, bits) = MM.currentBlock
447         
448         data = b'\x02\0\0\0'
449         data += prevBlock
450         data += merkleroot
451         data += share['ntime'][::-1]
452         data += bits
453         data += share['nonce'][::-1]
454         
455         share['data'] = data
456         return data
457
458 def IsJobValid(wli, wluser = None):
459         if wluser not in workLog:
460                 return False
461         if wli not in workLog[wluser]:
462                 return False
463         (wld, issueT) = workLog[wluser][wli]
464         if time() < issueT - 120:
465                 return False
466         return True
467
468 def checkShare(share):
469         shareTime = share['time'] = time()
470         
471         username = share['username']
472         if 'data' in share:
473                 # getwork/GBT
474                 checkData(share)
475                 data = share['data']
476                 
477                 if username not in workLog:
478                         raise RejectedShare('unknown-user')
479                 MWL = workLog[username]
480                 
481                 shareMerkleRoot = data[36:68]
482                 if 'blkdata' in share:
483                         pl = share['blkdata']
484                         (txncount, pl) = varlenDecode(pl)
485                         cbtxn = bitcoin.txn.Txn(pl)
486                         othertxndata = cbtxn.disassemble(retExtra=True)
487                         coinbase = cbtxn.getCoinbase()
488                         wliPos = coinbase[0] + 2
489                         wliLen = coinbase[wliPos - 1]
490                         wli = coinbase[wliPos:wliPos+wliLen]
491                         mode = 'MC'
492                         moden = 1
493                 else:
494                         wli = shareMerkleRoot
495                         mode = 'MRD'
496                         moden = 0
497                         coinbase = None
498         else:
499                 # Stratum
500                 MWL = workLog[None]
501                 wli = share['jobid']
502                 buildStratumData(share, b'\0' * 32)
503                 mode = 'MC'
504                 moden = 1
505                 othertxndata = b''
506         
507         if wli not in MWL:
508                 raise RejectedShare('unknown-work')
509         (wld, issueT) = MWL[wli]
510         share[mode] = wld
511         
512         share['issuetime'] = issueT
513         
514         (workMerkleTree, workCoinbase) = wld[1:3]
515         share['merkletree'] = workMerkleTree
516         if 'jobid' in share:
517                 cbtxn = deepcopy(workMerkleTree.data[0])
518                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
519                 cbtxn.setCoinbase(coinbase)
520                 cbtxn.assemble()
521                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
522                 shareMerkleRoot = data[36:68]
523         
524         if data in DupeShareHACK:
525                 raise RejectedShare('duplicate')
526         DupeShareHACK[data] = None
527         
528         blkhash = dblsha(data)
529         if blkhash[28:] != b'\0\0\0\0':
530                 raise RejectedShare('H-not-zero')
531         blkhashn = LEhash2int(blkhash)
532         
533         global networkTarget
534         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
535         logfunc('BLKHASH: %64x' % (blkhashn,))
536         logfunc(' TARGET: %64x' % (networkTarget,))
537         
538         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
539         txlist = workMerkleTree.data
540         txlist = [deepcopy(txlist[0]),] + txlist[1:]
541         cbtxn = txlist[0]
542         cbtxn.setCoinbase(coinbase or workCoinbase)
543         cbtxn.assemble()
544         
545         if blkhashn <= networkTarget:
546                 logfunc("Submitting upstream")
547                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
548                 if not moden:
549                         payload = assembleBlock(data, txlist)
550                 else:
551                         payload = share['data']
552                         if len(othertxndata):
553                                 payload += share['blkdata']
554                         else:
555                                 payload += assembleBlock(data, txlist)[80:]
556                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
557                 RBPs.append(payload)
558                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
559                 bcnode.submitBlock(payload)
560                 if config.DelayLogForUpstream:
561                         share['upstreamRejectReason'] = PendingUpstream
562                 else:
563                         share['upstreamRejectReason'] = None
564                         share['upstreamResult'] = True
565                 MM.updateBlock(blkhash)
566         
567         # Gotwork hack...
568         if gotwork and blkhashn <= config.GotWorkTarget:
569                 try:
570                         coinbaseMrkl = cbtxn.data
571                         coinbaseMrkl += blkhash
572                         steps = workMerkleTree._steps
573                         coinbaseMrkl += pack('B', len(steps))
574                         for step in steps:
575                                 coinbaseMrkl += step
576                         coinbaseMrkl += b"\0\0\0\0"
577                         info = {}
578                         info['hash'] = b2a_hex(blkhash).decode('ascii')
579                         info['header'] = b2a_hex(data).decode('ascii')
580                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
581                         thr = threading.Thread(target=submitGotwork, args=(info,))
582                         thr.daemon = True
583                         thr.start()
584                 except:
585                         checkShare.logger.warning('Failed to build gotwork request')
586         
587         if 'target' in share:
588                 workTarget = share['target']
589         elif len(wld) > 6:
590                 workTarget = wld[6]
591         else:
592                 workTarget = None
593         
594         if workTarget is None:
595                 workTarget = config.ShareTarget
596         if blkhashn > workTarget:
597                 raise RejectedShare('high-hash')
598         share['target'] = workTarget
599         share['_targethex'] = '%064x' % (workTarget,)
600         
601         shareTimestamp = unpack('<L', data[68:72])[0]
602         if shareTime < issueT - 120:
603                 raise RejectedShare('stale-work')
604         if shareTimestamp < shareTime - 300:
605                 raise RejectedShare('time-too-old')
606         if shareTimestamp > shareTime + 7200:
607                 raise RejectedShare('time-too-new')
608         
609         if config.DynamicTargetting and username in userStatus:
610                 # NOTE: userStatus[username] only doesn't exist across restarts
611                 status = userStatus[username]
612                 target = status[0] or config.ShareTarget
613                 if target == workTarget:
614                         userStatus[username][2] += 1
615                 else:
616                         userStatus[username][2] += float(target) / workTarget
617         
618         if moden:
619                 cbpre = workCoinbase
620                 cbpreLen = len(cbpre)
621                 if coinbase[:cbpreLen] != cbpre:
622                         raise RejectedShare('bad-cb-prefix')
623                 
624                 # Filter out known "I support" flags, to prevent exploits
625                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
626                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
627                                 raise RejectedShare('bad-cb-flag')
628                 
629                 if len(coinbase) > 100:
630                         raise RejectedShare('bad-cb-length')
631                 
632                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
633                         raise RejectedShare('bad-txnmrklroot')
634                 
635                 if len(othertxndata):
636                         allowed = assembleBlock(data, txlist)[80:]
637                         if allowed != share['blkdata']:
638                                 raise RejectedShare('bad-txns')
639 checkShare.logger = logging.getLogger('checkShare')
640
641 def logShare(share):
642         if '_origdata' in share:
643                 share['solution'] = share['_origdata']
644         else:
645                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
646         for i in loggersShare:
647                 i.logShare(share)
648
649 def receiveShare(share):
650         # TODO: username => userid
651         try:
652                 checkShare(share)
653         except RejectedShare as rej:
654                 share['rejectReason'] = str(rej)
655                 raise
656         except BaseException as e:
657                 share['rejectReason'] = 'ERROR'
658                 raise
659         finally:
660                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
661                         logShare(share)
662
663 def newBlockNotification():
664         logging.getLogger('newBlockNotification').info('Received new block notification')
665         MM.updateMerkleTree()
666         # TODO: Force RESPOND TO LONGPOLLS?
667         pass
668
669 def newBlockNotificationSIGNAL(signum, frame):
670         # Use a new thread, in case the signal handler is called with locks held
671         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
672         thr.daemon = True
673         thr.start()
674
675 from signal import signal, SIGUSR1
676 signal(SIGUSR1, newBlockNotificationSIGNAL)
677
678
679 import os
680 import os.path
681 import pickle
682 import signal
683 import sys
684 from time import sleep
685 import traceback
686
687 if getattr(config, 'SaveStateFilename', None) is None:
688         config.SaveStateFilename = 'eloipool.worklog'
689
690 def stopServers():
691         logger = logging.getLogger('stopServers')
692         
693         if hasattr(stopServers, 'already'):
694                 logger.debug('Already tried to stop servers before')
695                 return
696         stopServers.already = True
697         
698         logger.info('Stopping servers...')
699         global bcnode, server
700         servers = (bcnode, server, stratumsrv)
701         for s in servers:
702                 s.keepgoing = False
703         for s in servers:
704                 try:
705                         s.wakeup()
706                 except:
707                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
708         i = 0
709         while True:
710                 sl = []
711                 for s in servers:
712                         if s.running:
713                                 sl.append(s.__class__.__name__)
714                 if not sl:
715                         break
716                 i += 1
717                 if i >= 0x100:
718                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
719                         break
720                 sleep(0.01)
721         
722         for s in servers:
723                 for fd in s._fd.keys():
724                         os.close(fd)
725
726 def stopLoggers():
727         for i in loggersShare:
728                 if hasattr(i, 'stop'):
729                         i.stop()
730
731 def saveState(SAVE_STATE_FILENAME, t = None):
732         logger = logging.getLogger('saveState')
733         
734         # Then, save data needed to resume work
735         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
736         i = 0
737         while True:
738                 try:
739                         with open(SAVE_STATE_FILENAME, 'wb') as f:
740                                 pickle.dump(t, f)
741                                 pickle.dump(DupeShareHACK, f)
742                                 pickle.dump(workLog, f)
743                         break
744                 except:
745                         i += 1
746                         if i >= 0x10000:
747                                 logger.error('Failed to save work\n' + traceback.format_exc())
748                                 try:
749                                         os.unlink(SAVE_STATE_FILENAME)
750                                 except:
751                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
752
753 def exit():
754         t = time()
755         stopServers()
756         stopLoggers()
757         saveState(config.SaveStateFilename, t=t)
758         logging.getLogger('exit').info('Goodbye...')
759         os.kill(os.getpid(), signal.SIGTERM)
760         sys.exit(0)
761
762 def restart():
763         t = time()
764         stopServers()
765         stopLoggers()
766         saveState(config.SaveStateFilename, t=t)
767         logging.getLogger('restart').info('Restarting...')
768         try:
769                 os.execv(sys.argv[0], sys.argv)
770         except:
771                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
772
773 def restoreState(SAVE_STATE_FILENAME):
774         if not os.path.exists(SAVE_STATE_FILENAME):
775                 return
776         
777         global workLog, DupeShareHACK
778         
779         logger = logging.getLogger('restoreState')
780         s = os.stat(SAVE_STATE_FILENAME)
781         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
782         try:
783                 with open(SAVE_STATE_FILENAME, 'rb') as f:
784                         t = pickle.load(f)
785                         if type(t) == tuple:
786                                 if len(t) > 2:
787                                         # Future formats, not supported here
788                                         ver = t[3]
789                                         # TODO
790                                 
791                                 # Old format, from 2012-02-02 to 2012-02-03
792                                 workLog = t[0]
793                                 DupeShareHACK = t[1]
794                                 t = None
795                         else:
796                                 if isinstance(t, dict):
797                                         # Old format, from 2012-02-03 to 2012-02-03
798                                         DupeShareHACK = t
799                                         t = None
800                                 else:
801                                         # Current format, from 2012-02-03 onward
802                                         DupeShareHACK = pickle.load(f)
803                                 
804                                 if t + 120 >= time():
805                                         workLog = pickle.load(f)
806                                 else:
807                                         logger.debug('Skipping restore of expired workLog')
808         except:
809                 logger.error('Failed to restore state\n' + traceback.format_exc())
810                 return
811         logger.info('State restored successfully')
812         if t:
813                 logger.info('Total downtime: %g seconds' % (time() - t,))
814
815
816 from jsonrpcserver import JSONRPCListener, JSONRPCServer
817 import interactivemode
818 from networkserver import NetworkListener
819 import threading
820 import sharelogging
821 from stratumserver import StratumServer
822 import imp
823
824 if __name__ == "__main__":
825         if not hasattr(config, 'ShareLogging'):
826                 config.ShareLogging = ()
827         if hasattr(config, 'DbOptions'):
828                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
829                 config.ShareLogging = list(config.ShareLogging)
830                 config.ShareLogging.append( {
831                         'type': 'sql',
832                         'engine': 'postgres',
833                         'dbopts': config.DbOptions,
834                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
835                 } )
836         for i in config.ShareLogging:
837                 if not hasattr(i, 'keys'):
838                         name, parameters = i
839                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
840                         if name == 'postgres':
841                                 name = 'sql'
842                                 i = {
843                                         'engine': 'postgres',
844                                         'dbopts': parameters,
845                                 }
846                         elif name == 'logfile':
847                                 i = {}
848                                 i['thropts'] = parameters
849                                 if 'filename' in parameters:
850                                         i['filename'] = parameters['filename']
851                                         i['thropts'] = dict(i['thropts'])
852                                         del i['thropts']['filename']
853                         else:
854                                 i = parameters
855                         i['type'] = name
856                 
857                 name = i['type']
858                 parameters = i
859                 try:
860                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
861                         m = imp.load_module(name, fp, pathname, description)
862                         lo = getattr(m, name)(**parameters)
863                         loggersShare.append(lo)
864                 except:
865                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
866
867         LSbc = []
868         if not hasattr(config, 'BitcoinNodeAddresses'):
869                 config.BitcoinNodeAddresses = ()
870         for a in config.BitcoinNodeAddresses:
871                 LSbc.append(NetworkListener(bcnode, a))
872         
873         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
874                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
875         
876         import jsonrpc_getblocktemplate
877         import jsonrpc_getwork
878         import jsonrpc_setworkaux
879         
880         server = JSONRPCServer()
881         server.tls = threading.local()
882         server.tls.wantClear = False
883         if hasattr(config, 'JSONRPCAddress'):
884                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
885                 if not hasattr(config, 'JSONRPCAddresses'):
886                         config.JSONRPCAddresses = []
887                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
888         LS = []
889         for a in config.JSONRPCAddresses:
890                 LS.append(JSONRPCListener(server, a))
891         if hasattr(config, 'SecretUser'):
892                 server.SecretUser = config.SecretUser
893         server.aux = MM.CoinbaseAux
894         server.getBlockHeader = getBlockHeader
895         server.getBlockTemplate = getBlockTemplate
896         server.receiveShare = receiveShare
897         server.RaiseRedFlags = RaiseRedFlags
898         server.ShareTarget = config.ShareTarget
899         
900         if hasattr(config, 'TrustedForwarders'):
901                 server.TrustedForwarders = config.TrustedForwarders
902         server.ServerName = config.ServerName
903         
904         stratumsrv = StratumServer()
905         stratumsrv.getStratumJob = getStratumJob
906         stratumsrv.getExistingStratumJob = getExistingStratumJob
907         stratumsrv.receiveShare = receiveShare
908         stratumsrv.getTarget = getTarget
909         stratumsrv.defaultTarget = config.ShareTarget
910         stratumsrv.IsJobValid = IsJobValid
911         if not hasattr(config, 'StratumAddresses'):
912                 config.StratumAddresses = ()
913         for a in config.StratumAddresses:
914                 NetworkListener(stratumsrv, a)
915         
916         MM.start()
917         
918         restoreState(config.SaveStateFilename)
919         
920         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
921         prune_thr.daemon = True
922         prune_thr.start()
923         
924         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
925         bcnode_thr.daemon = True
926         bcnode_thr.start()
927         
928         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
929         stratum_thr.daemon = True
930         stratum_thr.start()
931         
932         server.serve_forever()