Merge commit 'fde36f4'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer', 'StratumServer', 'Waker for StratumServer', 'WorkLogPruner'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47
48 try:
49         import jsonrpc.authproxy
50         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
51 except:
52         pass
53
54
55 from bitcoin.script import BitcoinScript
56 from bitcoin.txn import Txn
57 from base58 import b58decode
58 from struct import pack
59 import subprocess
60 from time import time
61
62 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
63         txn = Txn.new()
64         
65         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
66                 coinbased = 0
67                 try:
68                         cmd = config.CoinbaserCmd
69                         cmd = cmd.replace('%d', str(coinbaseValue))
70                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
71                         nout = int(p.stdout.readline())
72                         for i in range(nout):
73                                 amount = int(p.stdout.readline())
74                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
75                                 pkScript = BitcoinScript.toAddress(addr)
76                                 txn.addOutput(amount, pkScript)
77                                 coinbased += amount
78                 except:
79                         coinbased = coinbaseValue + 1
80                 if coinbased >= coinbaseValue:
81                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
82                         txn.outputs = []
83                 else:
84                         coinbaseValue -= coinbased
85         
86         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
87         txn.addOutput(coinbaseValue, pkScript)
88         
89         # TODO
90         # TODO: red flag on dupe coinbase
91         return txn
92
93
94 import jsonrpc_getwork
95 from util import Bits2Target
96
97 workLog = {}
98 userStatus = {}
99 networkTarget = None
100 DupeShareHACK = {}
101
102 server = None
103 stratumsrv = None
104 def updateBlocks():
105         server.wakeLongpoll()
106         stratumsrv.updateJob()
107
108 def blockChanged():
109         global MM, networkTarget, server
110         bits = MM.currentBlock[2]
111         if bits is None:
112                 networkTarget = None
113         else:
114                 networkTarget = Bits2Target(bits)
115         if MM.lastBlock != (None, None, None):
116                 global DupeShareHACK
117                 DupeShareHACK = {}
118                 jsonrpc_getwork._CheckForDupesHACK = {}
119                 workLog.clear()
120         server.wakeLongpoll(wantClear=True)
121         stratumsrv.updateJob(wantClear=True)
122
123
124 from time import sleep, time
125 import traceback
126
127 def _WorkLogPruner_I(wl):
128         now = time()
129         pruned = 0
130         for username in wl:
131                 userwork = wl[username]
132                 for wli in tuple(userwork.keys()):
133                         if now > userwork[wli][1] + 120:
134                                 del userwork[wli]
135                                 pruned += 1
136         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
137
138 def WorkLogPruner(wl):
139         while True:
140                 try:
141                         sleep(60)
142                         _WorkLogPruner_I(wl)
143                 except:
144                         WorkLogPruner.logger.error(traceback.format_exc())
145 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
146
147
148 from merklemaker import merkleMaker
149 MM = merkleMaker()
150 MM.__dict__.update(config.__dict__)
151 MM.makeCoinbaseTxn = makeCoinbaseTxn
152 MM.onBlockChange = blockChanged
153 MM.onBlockUpdate = updateBlocks
154
155
156 from binascii import b2a_hex
157 from copy import deepcopy
158 from math import ceil, log
159 from merklemaker import MakeBlockHeader
160 from struct import pack, unpack
161 import threading
162 from time import time
163 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
164 import jsonrpc
165 import traceback
166
167 gotwork = None
168 if hasattr(config, 'GotWorkURI'):
169         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
170
171 if not hasattr(config, 'DelayLogForUpstream'):
172         config.DelayLogForUpstream = False
173
174 if not hasattr(config, 'DynamicTargetting'):
175         config.DynamicTargetting = 0
176 else:
177         if not hasattr(config, 'DynamicTargetWindow'):
178                 config.DynamicTargetWindow = 120
179         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
180
181 def submitGotwork(info):
182         try:
183                 gotwork.gotwork(info)
184         except:
185                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
186
187 def clampTarget(target, DTMode):
188         # ShareTarget is the minimum
189         if target is None or target > config.ShareTarget:
190                 target = config.ShareTarget
191         
192         # Never target above the network, as we'd lose blocks
193         if target < networkTarget:
194                 target = networkTarget
195         
196         if DTMode == 2:
197                 # Ceil target to a power of two :)
198                 truebits = log(target, 2)
199                 target = 2**ceil(truebits) - 1
200         elif DTMode == 3:
201                 # Round target to multiple of bdiff 1
202                 target = bdiff1target / int(round(target2bdiff(target)))
203         
204         # Return None for ShareTarget to save memory
205         if target == config.ShareTarget:
206                 return None
207         return target
208
209 def getTarget(username, now, DTMode = None):
210         if DTMode is None:
211                 DTMode = config.DynamicTargetting
212         if not DTMode:
213                 return None
214         if username in userStatus:
215                 status = userStatus[username]
216         else:
217                 # No record, use default target
218                 userStatus[username] = [None, now, 0]
219                 return clampTarget(None, DTMode)
220         (targetIn, lastUpdate, work) = status
221         if work <= config.DynamicTargetGoal:
222                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
223                         # No reason to change it just yet
224                         return clampTarget(targetIn, DTMode)
225                 if not work:
226                         # No shares received, reset to minimum
227                         if targetIn:
228                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
229                                 userStatus[username] = [None, now, 0]
230                         return clampTarget(None, DTMode)
231         
232         deltaSec = now - lastUpdate
233         target = targetIn or config.ShareTarget
234         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
235         target = clampTarget(target, DTMode)
236         if target != targetIn:
237                 pfx = 'Retargetting %s' % (repr(username),)
238                 tin = targetIn or config.ShareTarget
239                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
240                 tgt = target or config.ShareTarget
241                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
242         userStatus[username] = [target, now, 0]
243         return target
244 getTarget.logger = logging.getLogger('getTarget')
245
246 def RegisterWork(username, wli, wld):
247         now = time()
248         target = getTarget(username, now)
249         wld = tuple(wld) + (target,)
250         workLog.setdefault(username, {})[wli] = (wld, now)
251         return target or config.ShareTarget
252
253 def getBlockHeader(username):
254         MRD = MM.getMRD()
255         merkleRoot = MRD[0]
256         hdr = MakeBlockHeader(MRD)
257         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
258         target = RegisterWork(username, merkleRoot, MRD)
259         return (hdr, workLog[username][merkleRoot], target)
260
261 def getBlockTemplate(username, p_magic = None):
262         if server.tls.wantClear:
263                 wantClear = True
264         elif p_magic and username not in workLog:
265                 wantClear = True
266                 p_magic[0] = True
267         else:
268                 wantClear = False
269         MC = MM.getMC(wantClear)
270         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
271         wliPos = coinbase[0] + 2
272         wliLen = coinbase[wliPos - 1]
273         wli = coinbase[wliPos:wliPos+wliLen]
274         target = RegisterWork(username, wli, MC)
275         return (MC, workLog[username][wli], target)
276
277 def getStratumJob(jobid, wantClear = False):
278         MC = MM.getMC(wantClear)
279         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
280         now = time()
281         workLog.setdefault(None, {})[jobid] = (MC, now)
282         return (MC, workLog[None][jobid])
283
284 def getExistingStratumJob(jobid):
285         wld = workLog[None][jobid]
286         return (wld[0], wld)
287
288 loggersShare = []
289
290 RBDs = []
291 RBPs = []
292
293 from bitcoin.varlen import varlenEncode, varlenDecode
294 import bitcoin.txn
295 from merklemaker import assembleBlock
296
297 if not hasattr(config, 'BlockSubmissions'):
298         config.BlockSubmissions = None
299
300 RBFs = []
301 def blockSubmissionThread(payload, blkhash, share):
302         if config.BlockSubmissions is None:
303                 servers = list(a for b in MM.TemplateSources for a in b)
304         else:
305                 servers = list(config.BlockSubmissions)
306         
307         if hasattr(share['merkletree'], 'source_uri'):
308                 servers.insert(0, {
309                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
310                         'name': share['merkletree'].source,
311                 })
312         elif not servers:
313                 servers = list(a for b in MM.TemplateSources for a in b)
314         
315         myblock = (blkhash, payload[4:36])
316         payload = b2a_hex(payload).decode('ascii')
317         nexterr = 0
318         tries = 0
319         success = False
320         while len(servers):
321                 tries += 1
322                 TS = servers.pop(0)
323                 UpstreamBitcoindJSONRPC = TS['access']
324                 try:
325                         # BIP 22 standard submitblock
326                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
327                 except BaseException as gbterr:
328                         gbterr_fmt = traceback.format_exc()
329                         try:
330                                 try:
331                                         # bitcoind 0.5/0.6 getmemorypool
332                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
333                                 except:
334                                         # Old BIP 22 draft getmemorypool
335                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
336                                 if reason is True:
337                                         reason = None
338                                 elif reason is False:
339                                         reason = 'rejected'
340                         except BaseException as gmperr:
341                                 now = time()
342                                 if now > nexterr:
343                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
344                                         RaiseRedFlags(gbterr_fmt)
345                                         nexterr = now + 5
346                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
347                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
348                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
349                                         if share['upstreamRejectReason'] is PendingUpstream:
350                                                 share['upstreamRejectReason'] = 'GAVE UP'
351                                                 share['upstreamResult'] = False
352                                                 logShare(share)
353                                         return
354                                 
355                                 servers.append(UpstreamBitcoindJSONRPC)
356                                 continue
357                 
358                 # At this point, we have a reason back
359                 if reason:
360                         # FIXME: The returned value could be a list of multiple responses
361                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
362                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
363                                 # no big deal
364                                 blockSubmissionThread.logger.debug(msg)
365                         else:
366                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
367                                 RaiseRedFlags(msg)
368                 else:
369                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
370                         success = True
371                 if share['upstreamRejectReason'] is PendingUpstream:
372                         share['upstreamRejectReason'] = reason
373                         share['upstreamResult'] = not reason
374                         logShare(share)
375 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
376
377 def checkData(share):
378         data = share['data']
379         data = data[:80]
380         (prevBlock, height, bits) = MM.currentBlock
381         sharePrevBlock = data[4:36]
382         if sharePrevBlock != prevBlock:
383                 if sharePrevBlock == MM.lastBlock[0]:
384                         raise RejectedShare('stale-prevblk')
385                 raise RejectedShare('bad-prevblk')
386         
387         if data[72:76] != bits:
388                 raise RejectedShare('bad-diffbits')
389         
390         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
391         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
392         if data[1:4] != b'\0\0\0' or data[0] > 2:
393                 raise RejectedShare('bad-version')
394
395 def buildStratumData(share, merkleroot):
396         (prevBlock, height, bits) = MM.currentBlock
397         
398         data = b'\x02\0\0\0'
399         data += prevBlock
400         data += merkleroot
401         data += share['ntime'][::-1]
402         data += bits
403         data += share['nonce'][::-1]
404         
405         share['data'] = data
406         return data
407
408 def IsJobValid(wli, wluser = None):
409         if wluser not in workLog:
410                 return False
411         if wli not in workLog[wluser]:
412                 return False
413         (wld, issueT) = workLog[wluser][wli]
414         if time() < issueT - 120:
415                 return False
416         return True
417
418 def checkShare(share):
419         shareTime = share['time'] = time()
420         
421         username = share['username']
422         if 'data' in share:
423                 # getwork/GBT
424                 checkData(share)
425                 data = share['data']
426                 
427                 if username not in workLog:
428                         raise RejectedShare('unknown-user')
429                 MWL = workLog[username]
430                 
431                 shareMerkleRoot = data[36:68]
432                 if 'blkdata' in share:
433                         pl = share['blkdata']
434                         (txncount, pl) = varlenDecode(pl)
435                         cbtxn = bitcoin.txn.Txn(pl)
436                         othertxndata = cbtxn.disassemble(retExtra=True)
437                         coinbase = cbtxn.getCoinbase()
438                         wliPos = coinbase[0] + 2
439                         wliLen = coinbase[wliPos - 1]
440                         wli = coinbase[wliPos:wliPos+wliLen]
441                         mode = 'MC'
442                         moden = 1
443                 else:
444                         wli = shareMerkleRoot
445                         mode = 'MRD'
446                         moden = 0
447                         coinbase = None
448         else:
449                 # Stratum
450                 MWL = workLog[None]
451                 wli = share['jobid']
452                 buildStratumData(share, b'\0' * 32)
453                 mode = 'MC'
454                 moden = 1
455                 othertxndata = b''
456         
457         if wli not in MWL:
458                 raise RejectedShare('unknown-work')
459         (wld, issueT) = MWL[wli]
460         share[mode] = wld
461         
462         share['issuetime'] = issueT
463         
464         (workMerkleTree, workCoinbase) = wld[1:3]
465         share['merkletree'] = workMerkleTree
466         if 'jobid' in share:
467                 cbtxn = deepcopy(workMerkleTree.data[0])
468                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
469                 cbtxn.setCoinbase(coinbase)
470                 cbtxn.assemble()
471                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
472                 shareMerkleRoot = data[36:68]
473         
474         if data in DupeShareHACK:
475                 raise RejectedShare('duplicate')
476         DupeShareHACK[data] = None
477         
478         blkhash = dblsha(data)
479         if blkhash[28:] != b'\0\0\0\0':
480                 raise RejectedShare('H-not-zero')
481         blkhashn = LEhash2int(blkhash)
482         
483         global networkTarget
484         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
485         logfunc('BLKHASH: %64x' % (blkhashn,))
486         logfunc(' TARGET: %64x' % (networkTarget,))
487         
488         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
489         txlist = workMerkleTree.data
490         txlist = [deepcopy(txlist[0]),] + txlist[1:]
491         cbtxn = txlist[0]
492         cbtxn.setCoinbase(coinbase or workCoinbase)
493         cbtxn.assemble()
494         
495         if blkhashn <= networkTarget:
496                 logfunc("Submitting upstream")
497                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
498                 if not moden:
499                         payload = assembleBlock(data, txlist)
500                 else:
501                         payload = share['data']
502                         if len(othertxndata):
503                                 payload += share['blkdata']
504                         else:
505                                 payload += assembleBlock(data, txlist)[80:]
506                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
507                 RBPs.append(payload)
508                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
509                 bcnode.submitBlock(payload)
510                 if config.DelayLogForUpstream:
511                         share['upstreamRejectReason'] = PendingUpstream
512                 else:
513                         share['upstreamRejectReason'] = None
514                         share['upstreamResult'] = True
515                 MM.updateBlock(blkhash)
516         
517         # Gotwork hack...
518         if gotwork and blkhashn <= config.GotWorkTarget:
519                 try:
520                         coinbaseMrkl = cbtxn.data
521                         coinbaseMrkl += blkhash
522                         steps = workMerkleTree._steps
523                         coinbaseMrkl += pack('B', len(steps))
524                         for step in steps:
525                                 coinbaseMrkl += step
526                         coinbaseMrkl += b"\0\0\0\0"
527                         info = {}
528                         info['hash'] = b2a_hex(blkhash).decode('ascii')
529                         info['header'] = b2a_hex(data).decode('ascii')
530                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
531                         thr = threading.Thread(target=submitGotwork, args=(info,))
532                         thr.daemon = True
533                         thr.start()
534                 except:
535                         checkShare.logger.warning('Failed to build gotwork request')
536         
537         if 'target' in share:
538                 workTarget = share['target']
539         elif len(wld) > 6:
540                 workTarget = wld[6]
541         else:
542                 workTarget = None
543         
544         if workTarget is None:
545                 workTarget = config.ShareTarget
546         if blkhashn > workTarget:
547                 raise RejectedShare('high-hash')
548         share['target'] = workTarget
549         share['_targethex'] = '%064x' % (workTarget,)
550         
551         shareTimestamp = unpack('<L', data[68:72])[0]
552         if shareTime < issueT - 120:
553                 raise RejectedShare('stale-work')
554         if shareTimestamp < shareTime - 300:
555                 raise RejectedShare('time-too-old')
556         if shareTimestamp > shareTime + 7200:
557                 raise RejectedShare('time-too-new')
558         
559         if config.DynamicTargetting and username in userStatus:
560                 # NOTE: userStatus[username] only doesn't exist across restarts
561                 status = userStatus[username]
562                 target = status[0] or config.ShareTarget
563                 if target == workTarget:
564                         userStatus[username][2] += 1
565                 else:
566                         userStatus[username][2] += float(target) / workTarget
567         
568         if moden:
569                 cbpre = workCoinbase
570                 cbpreLen = len(cbpre)
571                 if coinbase[:cbpreLen] != cbpre:
572                         raise RejectedShare('bad-cb-prefix')
573                 
574                 # Filter out known "I support" flags, to prevent exploits
575                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
576                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
577                                 raise RejectedShare('bad-cb-flag')
578                 
579                 if len(coinbase) > 100:
580                         raise RejectedShare('bad-cb-length')
581                 
582                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
583                         raise RejectedShare('bad-txnmrklroot')
584                 
585                 if len(othertxndata):
586                         allowed = assembleBlock(data, txlist)[80:]
587                         if allowed != share['blkdata']:
588                                 raise RejectedShare('bad-txns')
589 checkShare.logger = logging.getLogger('checkShare')
590
591 def logShare(share):
592         if '_origdata' in share:
593                 share['solution'] = share['_origdata']
594         else:
595                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
596         for i in loggersShare:
597                 i.logShare(share)
598
599 def receiveShare(share):
600         # TODO: username => userid
601         try:
602                 checkShare(share)
603         except RejectedShare as rej:
604                 share['rejectReason'] = str(rej)
605                 raise
606         finally:
607                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
608                         logShare(share)
609
610 def newBlockNotification():
611         logging.getLogger('newBlockNotification').info('Received new block notification')
612         MM.updateMerkleTree()
613         # TODO: Force RESPOND TO LONGPOLLS?
614         pass
615
616 def newBlockNotificationSIGNAL(signum, frame):
617         # Use a new thread, in case the signal handler is called with locks held
618         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
619         thr.daemon = True
620         thr.start()
621
622 from signal import signal, SIGUSR1
623 signal(SIGUSR1, newBlockNotificationSIGNAL)
624
625
626 import os
627 import os.path
628 import pickle
629 import signal
630 import sys
631 from time import sleep
632 import traceback
633
634 SAVE_STATE_FILENAME = 'eloipool.worklog'
635
636 def stopServers():
637         logger = logging.getLogger('stopServers')
638         
639         if hasattr(stopServers, 'already'):
640                 logger.debug('Already tried to stop servers before')
641                 return
642         stopServers.already = True
643         
644         logger.info('Stopping servers...')
645         global bcnode, server
646         servers = (bcnode, server, stratumsrv)
647         for s in servers:
648                 s.keepgoing = False
649         for s in servers:
650                 try:
651                         s.wakeup()
652                 except:
653                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
654         i = 0
655         while True:
656                 sl = []
657                 for s in servers:
658                         if s.running:
659                                 sl.append(s.__class__.__name__)
660                 if not sl:
661                         break
662                 i += 1
663                 if i >= 0x100:
664                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
665                         break
666                 sleep(0.01)
667         
668         for s in servers:
669                 for fd in s._fd.keys():
670                         os.close(fd)
671
672 def stopLoggers():
673         for i in loggersShare:
674                 if hasattr(i, 'stop'):
675                         i.stop()
676
677 def saveState(t = None):
678         logger = logging.getLogger('saveState')
679         
680         # Then, save data needed to resume work
681         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
682         i = 0
683         while True:
684                 try:
685                         with open(SAVE_STATE_FILENAME, 'wb') as f:
686                                 pickle.dump(t, f)
687                                 pickle.dump(DupeShareHACK, f)
688                                 pickle.dump(workLog, f)
689                         break
690                 except:
691                         i += 1
692                         if i >= 0x10000:
693                                 logger.error('Failed to save work\n' + traceback.format_exc())
694                                 try:
695                                         os.unlink(SAVE_STATE_FILENAME)
696                                 except:
697                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
698
699 def exit():
700         t = time()
701         stopServers()
702         stopLoggers()
703         saveState(t)
704         logging.getLogger('exit').info('Goodbye...')
705         os.kill(os.getpid(), signal.SIGTERM)
706         sys.exit(0)
707
708 def restart():
709         t = time()
710         stopServers()
711         stopLoggers()
712         saveState(t)
713         logging.getLogger('restart').info('Restarting...')
714         try:
715                 os.execv(sys.argv[0], sys.argv)
716         except:
717                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
718
719 def restoreState():
720         if not os.path.exists(SAVE_STATE_FILENAME):
721                 return
722         
723         global workLog, DupeShareHACK
724         
725         logger = logging.getLogger('restoreState')
726         s = os.stat(SAVE_STATE_FILENAME)
727         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
728         try:
729                 with open(SAVE_STATE_FILENAME, 'rb') as f:
730                         t = pickle.load(f)
731                         if type(t) == tuple:
732                                 if len(t) > 2:
733                                         # Future formats, not supported here
734                                         ver = t[3]
735                                         # TODO
736                                 
737                                 # Old format, from 2012-02-02 to 2012-02-03
738                                 workLog = t[0]
739                                 DupeShareHACK = t[1]
740                                 t = None
741                         else:
742                                 if isinstance(t, dict):
743                                         # Old format, from 2012-02-03 to 2012-02-03
744                                         DupeShareHACK = t
745                                         t = None
746                                 else:
747                                         # Current format, from 2012-02-03 onward
748                                         DupeShareHACK = pickle.load(f)
749                                 
750                                 if t + 120 >= time():
751                                         workLog = pickle.load(f)
752                                 else:
753                                         logger.debug('Skipping restore of expired workLog')
754         except:
755                 logger.error('Failed to restore state\n' + traceback.format_exc())
756                 return
757         logger.info('State restored successfully')
758         if t:
759                 logger.info('Total downtime: %g seconds' % (time() - t,))
760
761
762 from jsonrpcserver import JSONRPCListener, JSONRPCServer
763 import interactivemode
764 from networkserver import NetworkListener
765 import threading
766 import sharelogging
767 from stratumserver import StratumServer
768 import imp
769
770 if __name__ == "__main__":
771         if not hasattr(config, 'ShareLogging'):
772                 config.ShareLogging = ()
773         if hasattr(config, 'DbOptions'):
774                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
775                 config.ShareLogging = list(config.ShareLogging)
776                 config.ShareLogging.append( {
777                         'type': 'sql',
778                         'engine': 'postgres',
779                         'dbopts': config.DbOptions,
780                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
781                 } )
782         for i in config.ShareLogging:
783                 if not hasattr(i, 'keys'):
784                         name, parameters = i
785                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
786                         if name == 'postgres':
787                                 name = 'sql'
788                                 i = {
789                                         'engine': 'postgres',
790                                         'dbopts': parameters,
791                                 }
792                         elif name == 'logfile':
793                                 i = {}
794                                 i['thropts'] = parameters
795                                 if 'filename' in parameters:
796                                         i['filename'] = parameters['filename']
797                                         i['thropts'] = dict(i['thropts'])
798                                         del i['thropts']['filename']
799                         else:
800                                 i = parameters
801                         i['type'] = name
802                 
803                 name = i['type']
804                 parameters = i
805                 try:
806                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
807                         m = imp.load_module(name, fp, pathname, description)
808                         lo = getattr(m, name)(**parameters)
809                         loggersShare.append(lo)
810                 except:
811                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
812
813         LSbc = []
814         if not hasattr(config, 'BitcoinNodeAddresses'):
815                 config.BitcoinNodeAddresses = ()
816         for a in config.BitcoinNodeAddresses:
817                 LSbc.append(NetworkListener(bcnode, a))
818         
819         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
820                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
821         
822         import jsonrpc_getblocktemplate
823         import jsonrpc_getwork
824         import jsonrpc_setworkaux
825         
826         server = JSONRPCServer()
827         server.tls = threading.local()
828         server.tls.wantClear = False
829         if hasattr(config, 'JSONRPCAddress'):
830                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
831                 if not hasattr(config, 'JSONRPCAddresses'):
832                         config.JSONRPCAddresses = []
833                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
834         LS = []
835         for a in config.JSONRPCAddresses:
836                 LS.append(JSONRPCListener(server, a))
837         if hasattr(config, 'SecretUser'):
838                 server.SecretUser = config.SecretUser
839         server.aux = MM.CoinbaseAux
840         server.getBlockHeader = getBlockHeader
841         server.getBlockTemplate = getBlockTemplate
842         server.receiveShare = receiveShare
843         server.RaiseRedFlags = RaiseRedFlags
844         server.ShareTarget = config.ShareTarget
845         
846         if hasattr(config, 'TrustedForwarders'):
847                 server.TrustedForwarders = config.TrustedForwarders
848         server.ServerName = config.ServerName
849         
850         stratumsrv = StratumServer()
851         stratumsrv.getStratumJob = getStratumJob
852         stratumsrv.getExistingStratumJob = getExistingStratumJob
853         stratumsrv.receiveShare = receiveShare
854         stratumsrv.getTarget = getTarget
855         stratumsrv.defaultTarget = config.ShareTarget
856         stratumsrv.IsJobValid = IsJobValid
857         if not hasattr(config, 'StratumAddresses'):
858                 config.StratumAddresses = ()
859         for a in config.StratumAddresses:
860                 NetworkListener(stratumsrv, a)
861         
862         MM.start()
863         
864         restoreState()
865         
866         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
867         prune_thr.daemon = True
868         prune_thr.start()
869         
870         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
871         bcnode_thr.daemon = True
872         bcnode_thr.start()
873         
874         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
875         stratum_thr.daemon = True
876         stratum_thr.start()
877         
878         server.serve_forever()