UniqueSessionIdManager: Support for delaying releases of session ids, and picking...
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28 import logging.handlers
29
30 rootlogger = logging.getLogger(None)
31 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
32 logformatter = logging.Formatter(logformat)
33 if len(rootlogger.handlers) == 0:
34         logging.basicConfig(
35                 format=logformat,
36                 level=logging.DEBUG,
37         )
38         for infoOnly in (
39                 'checkShare',
40                 'getTarget',
41                 'JSONRPCHandler',
42                 'JSONRPCServer',
43                 'merkleMaker',
44                 'StratumServer',
45                 'Waker for JSONRPCServer',
46                 'Waker for StratumServer',
47                 'WorkLogPruner'
48         ):
49                 logging.getLogger(infoOnly).setLevel(logging.INFO)
50 if getattr(config, 'LogToSysLog', False):
51         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
52         rootlogger.addHandler(sysloghandler)
53 if hasattr(config, 'LogFile'):
54         if isinstance(config.LogFile, str):
55                 filehandler = logging.FileHandler(config.LogFile)
56         else:
57                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
58         filehandler.setFormatter(logformatter)
59         rootlogger.addHandler(filehandler)
60
61 def RaiseRedFlags(reason):
62         logging.getLogger('redflag').critical(reason)
63         return reason
64
65
66 from bitcoin.node import BitcoinLink, BitcoinNode
67 bcnode = BitcoinNode(config.UpstreamNetworkId)
68 bcnode.userAgent += b'Eloipool:0.1/'
69
70 import jsonrpc
71
72 try:
73         import jsonrpc.authproxy
74         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
75 except:
76         pass
77
78
79 from bitcoin.script import BitcoinScript
80 from bitcoin.txn import Txn
81 from base58 import b58decode
82 from struct import pack
83 import subprocess
84 from time import time
85
86 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
87         txn = Txn.new()
88         
89         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
90                 coinbased = 0
91                 try:
92                         cmd = config.CoinbaserCmd
93                         cmd = cmd.replace('%d', str(coinbaseValue))
94                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
95                         nout = int(p.stdout.readline())
96                         for i in range(nout):
97                                 amount = int(p.stdout.readline())
98                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
99                                 pkScript = BitcoinScript.toAddress(addr)
100                                 txn.addOutput(amount, pkScript)
101                                 coinbased += amount
102                 except:
103                         coinbased = coinbaseValue + 1
104                 if coinbased >= coinbaseValue:
105                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
106                         txn.outputs = []
107                 else:
108                         coinbaseValue -= coinbased
109         
110         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
111         txn.addOutput(coinbaseValue, pkScript)
112         
113         # TODO
114         # TODO: red flag on dupe coinbase
115         return txn
116
117
118 import jsonrpc_getwork
119 from util import Bits2Target
120
121 workLog = {}
122 userStatus = {}
123 networkTarget = None
124 DupeShareHACK = {}
125
126 server = None
127 stratumsrv = None
128 def updateBlocks():
129         server.wakeLongpoll()
130         stratumsrv.updateJob()
131
132 def blockChanged():
133         global MM, networkTarget, server
134         bits = MM.currentBlock[2]
135         if bits is None:
136                 networkTarget = None
137         else:
138                 networkTarget = Bits2Target(bits)
139         if MM.lastBlock != (None, None, None):
140                 global DupeShareHACK
141                 DupeShareHACK = {}
142                 jsonrpc_getwork._CheckForDupesHACK = {}
143                 workLog.clear()
144         server.wakeLongpoll(wantClear=True)
145         stratumsrv.updateJob(wantClear=True)
146
147
148 from time import sleep, time
149 import traceback
150
151 def _WorkLogPruner_I(wl):
152         now = time()
153         pruned = 0
154         for username in wl:
155                 userwork = wl[username]
156                 for wli in tuple(userwork.keys()):
157                         if now > userwork[wli][1] + 120:
158                                 del userwork[wli]
159                                 pruned += 1
160         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
161
162 def WorkLogPruner(wl):
163         while True:
164                 try:
165                         sleep(60)
166                         _WorkLogPruner_I(wl)
167                 except:
168                         WorkLogPruner.logger.error(traceback.format_exc())
169 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
170
171
172 from merklemaker import merkleMaker
173 MM = merkleMaker()
174 MM.__dict__.update(config.__dict__)
175 MM.makeCoinbaseTxn = makeCoinbaseTxn
176 MM.onBlockChange = blockChanged
177 MM.onBlockUpdate = updateBlocks
178
179
180 from binascii import b2a_hex
181 from copy import deepcopy
182 from math import ceil, log
183 from merklemaker import MakeBlockHeader
184 from struct import pack, unpack
185 import threading
186 from time import time
187 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
188 import jsonrpc
189 import traceback
190
191 gotwork = None
192 if hasattr(config, 'GotWorkURI'):
193         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
194
195 if not hasattr(config, 'DelayLogForUpstream'):
196         config.DelayLogForUpstream = False
197
198 if not hasattr(config, 'DynamicTargetting'):
199         config.DynamicTargetting = 0
200 else:
201         if not hasattr(config, 'DynamicTargetWindow'):
202                 config.DynamicTargetWindow = 120
203         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
204
205 def submitGotwork(info):
206         try:
207                 gotwork.gotwork(info)
208         except:
209                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
210
211 def clampTarget(target, DTMode):
212         # ShareTarget is the minimum
213         if target is None or target > config.ShareTarget:
214                 target = config.ShareTarget
215         
216         # Never target above the network, as we'd lose blocks
217         if target < networkTarget:
218                 target = networkTarget
219         
220         if DTMode == 2:
221                 # Ceil target to a power of two :)
222                 truebits = log(target, 2)
223                 target = 2**ceil(truebits) - 1
224         elif DTMode == 3:
225                 # Round target to multiple of bdiff 1
226                 target = bdiff1target / int(round(target2bdiff(target)))
227         
228         # Return None for ShareTarget to save memory
229         if target == config.ShareTarget:
230                 return None
231         return target
232
233 def getTarget(username, now, DTMode = None):
234         if DTMode is None:
235                 DTMode = config.DynamicTargetting
236         if not DTMode:
237                 return None
238         if username in userStatus:
239                 status = userStatus[username]
240         else:
241                 # No record, use default target
242                 userStatus[username] = [None, now, 0]
243                 return clampTarget(None, DTMode)
244         (targetIn, lastUpdate, work) = status
245         if work <= config.DynamicTargetGoal:
246                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
247                         # No reason to change it just yet
248                         return clampTarget(targetIn, DTMode)
249                 if not work:
250                         # No shares received, reset to minimum
251                         if targetIn:
252                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
253                                 userStatus[username] = [None, now, 0]
254                         return clampTarget(None, DTMode)
255         
256         deltaSec = now - lastUpdate
257         target = targetIn or config.ShareTarget
258         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
259         target = clampTarget(target, DTMode)
260         if target != targetIn:
261                 pfx = 'Retargetting %s' % (repr(username),)
262                 tin = targetIn or config.ShareTarget
263                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
264                 tgt = target or config.ShareTarget
265                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
266         userStatus[username] = [target, now, 0]
267         return target
268 getTarget.logger = logging.getLogger('getTarget')
269
270 def TopTargets(n = 0x10):
271         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
272         tmp.sort(key=lambda k: -userStatus[k][0])
273         tmp2 = {}
274         def t2d(t):
275                 if t not in tmp2:
276                         tmp2[t] = target2pdiff(t)
277                 return tmp2[t]
278         for k in tmp[-n:]:
279                 tgt = userStatus[k][0]
280                 print('%34s %064x %3d' % (k, tgt, t2d(tgt)))
281
282 def RegisterWork(username, wli, wld):
283         now = time()
284         target = getTarget(username, now)
285         wld = tuple(wld) + (target,)
286         workLog.setdefault(username, {})[wli] = (wld, now)
287         return target or config.ShareTarget
288
289 def getBlockHeader(username):
290         MRD = MM.getMRD()
291         merkleRoot = MRD[0]
292         hdr = MakeBlockHeader(MRD)
293         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
294         target = RegisterWork(username, merkleRoot, MRD)
295         return (hdr, workLog[username][merkleRoot], target)
296
297 def getBlockTemplate(username, p_magic = None):
298         if server.tls.wantClear:
299                 wantClear = True
300         elif p_magic and username not in workLog:
301                 wantClear = True
302                 p_magic[0] = True
303         else:
304                 wantClear = False
305         MC = MM.getMC(wantClear)
306         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
307         wliPos = coinbase[0] + 2
308         wliLen = coinbase[wliPos - 1]
309         wli = coinbase[wliPos:wliPos+wliLen]
310         target = RegisterWork(username, wli, MC)
311         return (MC, workLog[username][wli], target)
312
313 def getStratumJob(jobid, wantClear = False):
314         MC = MM.getMC(wantClear)
315         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
316         now = time()
317         workLog.setdefault(None, {})[jobid] = (MC, now)
318         return (MC, workLog[None][jobid])
319
320 def getExistingStratumJob(jobid):
321         wld = workLog[None][jobid]
322         return (wld[0], wld)
323
324 loggersShare = []
325
326 RBDs = []
327 RBPs = []
328
329 from bitcoin.varlen import varlenEncode, varlenDecode
330 import bitcoin.txn
331 from merklemaker import assembleBlock
332
333 if not hasattr(config, 'BlockSubmissions'):
334         config.BlockSubmissions = None
335
336 RBFs = []
337 def blockSubmissionThread(payload, blkhash, share):
338         if config.BlockSubmissions is None:
339                 servers = list(a for b in MM.TemplateSources for a in b)
340         else:
341                 servers = list(config.BlockSubmissions)
342         
343         if hasattr(share['merkletree'], 'source_uri'):
344                 servers.insert(0, {
345                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
346                         'name': share['merkletree'].source,
347                 })
348         elif not servers:
349                 servers = list(a for b in MM.TemplateSources for a in b)
350         
351         myblock = (blkhash, payload[4:36])
352         payload = b2a_hex(payload).decode('ascii')
353         nexterr = 0
354         tries = 0
355         success = False
356         while len(servers):
357                 tries += 1
358                 TS = servers.pop(0)
359                 UpstreamBitcoindJSONRPC = TS['access']
360                 try:
361                         # BIP 22 standard submitblock
362                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
363                 except BaseException as gbterr:
364                         gbterr_fmt = traceback.format_exc()
365                         try:
366                                 try:
367                                         # bitcoind 0.5/0.6 getmemorypool
368                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
369                                 except:
370                                         # Old BIP 22 draft getmemorypool
371                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
372                                 if reason is True:
373                                         reason = None
374                                 elif reason is False:
375                                         reason = 'rejected'
376                         except BaseException as gmperr:
377                                 now = time()
378                                 if now > nexterr:
379                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
380                                         RaiseRedFlags(gbterr_fmt)
381                                         nexterr = now + 5
382                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
383                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
384                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
385                                         if share['upstreamRejectReason'] is PendingUpstream:
386                                                 share['upstreamRejectReason'] = 'GAVE UP'
387                                                 share['upstreamResult'] = False
388                                                 logShare(share)
389                                         return
390                                 
391                                 servers.append(UpstreamBitcoindJSONRPC)
392                                 continue
393                 
394                 # At this point, we have a reason back
395                 if reason:
396                         # FIXME: The returned value could be a list of multiple responses
397                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
398                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
399                                 # no big deal
400                                 blockSubmissionThread.logger.debug(msg)
401                         else:
402                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
403                                 RaiseRedFlags(msg)
404                 else:
405                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
406                         success = True
407                 if share['upstreamRejectReason'] is PendingUpstream:
408                         share['upstreamRejectReason'] = reason
409                         share['upstreamResult'] = not reason
410                         logShare(share)
411 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
412
413 def checkData(share):
414         data = share['data']
415         data = data[:80]
416         (prevBlock, height, bits) = MM.currentBlock
417         sharePrevBlock = data[4:36]
418         if sharePrevBlock != prevBlock:
419                 if sharePrevBlock == MM.lastBlock[0]:
420                         raise RejectedShare('stale-prevblk')
421                 raise RejectedShare('bad-prevblk')
422         
423         if data[72:76] != bits:
424                 raise RejectedShare('bad-diffbits')
425         
426         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
427         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
428         if data[1:4] != b'\0\0\0' or data[0] > 2:
429                 raise RejectedShare('bad-version')
430
431 def buildStratumData(share, merkleroot):
432         (prevBlock, height, bits) = MM.currentBlock
433         
434         data = b'\x02\0\0\0'
435         data += prevBlock
436         data += merkleroot
437         data += share['ntime'][::-1]
438         data += bits
439         data += share['nonce'][::-1]
440         
441         share['data'] = data
442         return data
443
444 def IsJobValid(wli, wluser = None):
445         if wluser not in workLog:
446                 return False
447         if wli not in workLog[wluser]:
448                 return False
449         (wld, issueT) = workLog[wluser][wli]
450         if time() < issueT - 120:
451                 return False
452         return True
453
454 def checkShare(share):
455         shareTime = share['time'] = time()
456         
457         username = share['username']
458         if 'data' in share:
459                 # getwork/GBT
460                 checkData(share)
461                 data = share['data']
462                 
463                 if username not in workLog:
464                         raise RejectedShare('unknown-user')
465                 MWL = workLog[username]
466                 
467                 shareMerkleRoot = data[36:68]
468                 if 'blkdata' in share:
469                         pl = share['blkdata']
470                         (txncount, pl) = varlenDecode(pl)
471                         cbtxn = bitcoin.txn.Txn(pl)
472                         othertxndata = cbtxn.disassemble(retExtra=True)
473                         coinbase = cbtxn.getCoinbase()
474                         wliPos = coinbase[0] + 2
475                         wliLen = coinbase[wliPos - 1]
476                         wli = coinbase[wliPos:wliPos+wliLen]
477                         mode = 'MC'
478                         moden = 1
479                 else:
480                         wli = shareMerkleRoot
481                         mode = 'MRD'
482                         moden = 0
483                         coinbase = None
484         else:
485                 # Stratum
486                 MWL = workLog[None]
487                 wli = share['jobid']
488                 buildStratumData(share, b'\0' * 32)
489                 mode = 'MC'
490                 moden = 1
491                 othertxndata = b''
492         
493         if wli not in MWL:
494                 raise RejectedShare('unknown-work')
495         (wld, issueT) = MWL[wli]
496         share[mode] = wld
497         
498         share['issuetime'] = issueT
499         
500         (workMerkleTree, workCoinbase) = wld[1:3]
501         share['merkletree'] = workMerkleTree
502         if 'jobid' in share:
503                 cbtxn = deepcopy(workMerkleTree.data[0])
504                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
505                 cbtxn.setCoinbase(coinbase)
506                 cbtxn.assemble()
507                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
508                 shareMerkleRoot = data[36:68]
509         
510         if data in DupeShareHACK:
511                 raise RejectedShare('duplicate')
512         DupeShareHACK[data] = None
513         
514         blkhash = dblsha(data)
515         if blkhash[28:] != b'\0\0\0\0':
516                 raise RejectedShare('H-not-zero')
517         blkhashn = LEhash2int(blkhash)
518         
519         global networkTarget
520         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
521         logfunc('BLKHASH: %64x' % (blkhashn,))
522         logfunc(' TARGET: %64x' % (networkTarget,))
523         
524         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
525         txlist = workMerkleTree.data
526         txlist = [deepcopy(txlist[0]),] + txlist[1:]
527         cbtxn = txlist[0]
528         cbtxn.setCoinbase(coinbase or workCoinbase)
529         cbtxn.assemble()
530         
531         if blkhashn <= networkTarget:
532                 logfunc("Submitting upstream")
533                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
534                 if not moden:
535                         payload = assembleBlock(data, txlist)
536                 else:
537                         payload = share['data']
538                         if len(othertxndata):
539                                 payload += share['blkdata']
540                         else:
541                                 payload += assembleBlock(data, txlist)[80:]
542                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
543                 RBPs.append(payload)
544                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
545                 bcnode.submitBlock(payload)
546                 if config.DelayLogForUpstream:
547                         share['upstreamRejectReason'] = PendingUpstream
548                 else:
549                         share['upstreamRejectReason'] = None
550                         share['upstreamResult'] = True
551                 MM.updateBlock(blkhash)
552         
553         # Gotwork hack...
554         if gotwork and blkhashn <= config.GotWorkTarget:
555                 try:
556                         coinbaseMrkl = cbtxn.data
557                         coinbaseMrkl += blkhash
558                         steps = workMerkleTree._steps
559                         coinbaseMrkl += pack('B', len(steps))
560                         for step in steps:
561                                 coinbaseMrkl += step
562                         coinbaseMrkl += b"\0\0\0\0"
563                         info = {}
564                         info['hash'] = b2a_hex(blkhash).decode('ascii')
565                         info['header'] = b2a_hex(data).decode('ascii')
566                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
567                         thr = threading.Thread(target=submitGotwork, args=(info,))
568                         thr.daemon = True
569                         thr.start()
570                 except:
571                         checkShare.logger.warning('Failed to build gotwork request')
572         
573         if 'target' in share:
574                 workTarget = share['target']
575         elif len(wld) > 6:
576                 workTarget = wld[6]
577         else:
578                 workTarget = None
579         
580         if workTarget is None:
581                 workTarget = config.ShareTarget
582         if blkhashn > workTarget:
583                 raise RejectedShare('high-hash')
584         share['target'] = workTarget
585         share['_targethex'] = '%064x' % (workTarget,)
586         
587         shareTimestamp = unpack('<L', data[68:72])[0]
588         if shareTime < issueT - 120:
589                 raise RejectedShare('stale-work')
590         if shareTimestamp < shareTime - 300:
591                 raise RejectedShare('time-too-old')
592         if shareTimestamp > shareTime + 7200:
593                 raise RejectedShare('time-too-new')
594         
595         if config.DynamicTargetting and username in userStatus:
596                 # NOTE: userStatus[username] only doesn't exist across restarts
597                 status = userStatus[username]
598                 target = status[0] or config.ShareTarget
599                 if target == workTarget:
600                         userStatus[username][2] += 1
601                 else:
602                         userStatus[username][2] += float(target) / workTarget
603         
604         if moden:
605                 cbpre = workCoinbase
606                 cbpreLen = len(cbpre)
607                 if coinbase[:cbpreLen] != cbpre:
608                         raise RejectedShare('bad-cb-prefix')
609                 
610                 # Filter out known "I support" flags, to prevent exploits
611                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
612                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
613                                 raise RejectedShare('bad-cb-flag')
614                 
615                 if len(coinbase) > 100:
616                         raise RejectedShare('bad-cb-length')
617                 
618                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
619                         raise RejectedShare('bad-txnmrklroot')
620                 
621                 if len(othertxndata):
622                         allowed = assembleBlock(data, txlist)[80:]
623                         if allowed != share['blkdata']:
624                                 raise RejectedShare('bad-txns')
625 checkShare.logger = logging.getLogger('checkShare')
626
627 def logShare(share):
628         if '_origdata' in share:
629                 share['solution'] = share['_origdata']
630         else:
631                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
632         for i in loggersShare:
633                 i.logShare(share)
634
635 def receiveShare(share):
636         # TODO: username => userid
637         try:
638                 checkShare(share)
639         except RejectedShare as rej:
640                 share['rejectReason'] = str(rej)
641                 raise
642         finally:
643                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
644                         logShare(share)
645
646 def newBlockNotification():
647         logging.getLogger('newBlockNotification').info('Received new block notification')
648         MM.updateMerkleTree()
649         # TODO: Force RESPOND TO LONGPOLLS?
650         pass
651
652 def newBlockNotificationSIGNAL(signum, frame):
653         # Use a new thread, in case the signal handler is called with locks held
654         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
655         thr.daemon = True
656         thr.start()
657
658 from signal import signal, SIGUSR1
659 signal(SIGUSR1, newBlockNotificationSIGNAL)
660
661
662 import os
663 import os.path
664 import pickle
665 import signal
666 import sys
667 from time import sleep
668 import traceback
669
670 SAVE_STATE_FILENAME = 'eloipool.worklog'
671
672 def stopServers():
673         logger = logging.getLogger('stopServers')
674         
675         if hasattr(stopServers, 'already'):
676                 logger.debug('Already tried to stop servers before')
677                 return
678         stopServers.already = True
679         
680         logger.info('Stopping servers...')
681         global bcnode, server
682         servers = (bcnode, server, stratumsrv)
683         for s in servers:
684                 s.keepgoing = False
685         for s in servers:
686                 try:
687                         s.wakeup()
688                 except:
689                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
690         i = 0
691         while True:
692                 sl = []
693                 for s in servers:
694                         if s.running:
695                                 sl.append(s.__class__.__name__)
696                 if not sl:
697                         break
698                 i += 1
699                 if i >= 0x100:
700                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
701                         break
702                 sleep(0.01)
703         
704         for s in servers:
705                 for fd in s._fd.keys():
706                         os.close(fd)
707
708 def stopLoggers():
709         for i in loggersShare:
710                 if hasattr(i, 'stop'):
711                         i.stop()
712
713 def saveState(t = None):
714         logger = logging.getLogger('saveState')
715         
716         # Then, save data needed to resume work
717         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
718         i = 0
719         while True:
720                 try:
721                         with open(SAVE_STATE_FILENAME, 'wb') as f:
722                                 pickle.dump(t, f)
723                                 pickle.dump(DupeShareHACK, f)
724                                 pickle.dump(workLog, f)
725                         break
726                 except:
727                         i += 1
728                         if i >= 0x10000:
729                                 logger.error('Failed to save work\n' + traceback.format_exc())
730                                 try:
731                                         os.unlink(SAVE_STATE_FILENAME)
732                                 except:
733                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
734
735 def exit():
736         t = time()
737         stopServers()
738         stopLoggers()
739         saveState(t)
740         logging.getLogger('exit').info('Goodbye...')
741         os.kill(os.getpid(), signal.SIGTERM)
742         sys.exit(0)
743
744 def restart():
745         t = time()
746         stopServers()
747         stopLoggers()
748         saveState(t)
749         logging.getLogger('restart').info('Restarting...')
750         try:
751                 os.execv(sys.argv[0], sys.argv)
752         except:
753                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
754
755 def restoreState():
756         if not os.path.exists(SAVE_STATE_FILENAME):
757                 return
758         
759         global workLog, DupeShareHACK
760         
761         logger = logging.getLogger('restoreState')
762         s = os.stat(SAVE_STATE_FILENAME)
763         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
764         try:
765                 with open(SAVE_STATE_FILENAME, 'rb') as f:
766                         t = pickle.load(f)
767                         if type(t) == tuple:
768                                 if len(t) > 2:
769                                         # Future formats, not supported here
770                                         ver = t[3]
771                                         # TODO
772                                 
773                                 # Old format, from 2012-02-02 to 2012-02-03
774                                 workLog = t[0]
775                                 DupeShareHACK = t[1]
776                                 t = None
777                         else:
778                                 if isinstance(t, dict):
779                                         # Old format, from 2012-02-03 to 2012-02-03
780                                         DupeShareHACK = t
781                                         t = None
782                                 else:
783                                         # Current format, from 2012-02-03 onward
784                                         DupeShareHACK = pickle.load(f)
785                                 
786                                 if t + 120 >= time():
787                                         workLog = pickle.load(f)
788                                 else:
789                                         logger.debug('Skipping restore of expired workLog')
790         except:
791                 logger.error('Failed to restore state\n' + traceback.format_exc())
792                 return
793         logger.info('State restored successfully')
794         if t:
795                 logger.info('Total downtime: %g seconds' % (time() - t,))
796
797
798 from jsonrpcserver import JSONRPCListener, JSONRPCServer
799 import interactivemode
800 from networkserver import NetworkListener
801 import threading
802 import sharelogging
803 from stratumserver import StratumServer
804 import imp
805
806 if __name__ == "__main__":
807         if not hasattr(config, 'ShareLogging'):
808                 config.ShareLogging = ()
809         if hasattr(config, 'DbOptions'):
810                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
811                 config.ShareLogging = list(config.ShareLogging)
812                 config.ShareLogging.append( {
813                         'type': 'sql',
814                         'engine': 'postgres',
815                         'dbopts': config.DbOptions,
816                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
817                 } )
818         for i in config.ShareLogging:
819                 if not hasattr(i, 'keys'):
820                         name, parameters = i
821                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
822                         if name == 'postgres':
823                                 name = 'sql'
824                                 i = {
825                                         'engine': 'postgres',
826                                         'dbopts': parameters,
827                                 }
828                         elif name == 'logfile':
829                                 i = {}
830                                 i['thropts'] = parameters
831                                 if 'filename' in parameters:
832                                         i['filename'] = parameters['filename']
833                                         i['thropts'] = dict(i['thropts'])
834                                         del i['thropts']['filename']
835                         else:
836                                 i = parameters
837                         i['type'] = name
838                 
839                 name = i['type']
840                 parameters = i
841                 try:
842                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
843                         m = imp.load_module(name, fp, pathname, description)
844                         lo = getattr(m, name)(**parameters)
845                         loggersShare.append(lo)
846                 except:
847                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
848
849         LSbc = []
850         if not hasattr(config, 'BitcoinNodeAddresses'):
851                 config.BitcoinNodeAddresses = ()
852         for a in config.BitcoinNodeAddresses:
853                 LSbc.append(NetworkListener(bcnode, a))
854         
855         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
856                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
857         
858         import jsonrpc_getblocktemplate
859         import jsonrpc_getwork
860         import jsonrpc_setworkaux
861         
862         server = JSONRPCServer()
863         server.tls = threading.local()
864         server.tls.wantClear = False
865         if hasattr(config, 'JSONRPCAddress'):
866                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
867                 if not hasattr(config, 'JSONRPCAddresses'):
868                         config.JSONRPCAddresses = []
869                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
870         LS = []
871         for a in config.JSONRPCAddresses:
872                 LS.append(JSONRPCListener(server, a))
873         if hasattr(config, 'SecretUser'):
874                 server.SecretUser = config.SecretUser
875         server.aux = MM.CoinbaseAux
876         server.getBlockHeader = getBlockHeader
877         server.getBlockTemplate = getBlockTemplate
878         server.receiveShare = receiveShare
879         server.RaiseRedFlags = RaiseRedFlags
880         server.ShareTarget = config.ShareTarget
881         
882         if hasattr(config, 'TrustedForwarders'):
883                 server.TrustedForwarders = config.TrustedForwarders
884         server.ServerName = config.ServerName
885         
886         stratumsrv = StratumServer()
887         stratumsrv.getStratumJob = getStratumJob
888         stratumsrv.getExistingStratumJob = getExistingStratumJob
889         stratumsrv.receiveShare = receiveShare
890         stratumsrv.getTarget = getTarget
891         stratumsrv.defaultTarget = config.ShareTarget
892         stratumsrv.IsJobValid = IsJobValid
893         if not hasattr(config, 'StratumAddresses'):
894                 config.StratumAddresses = ()
895         for a in config.StratumAddresses:
896                 NetworkListener(stratumsrv, a)
897         
898         MM.start()
899         
900         restoreState()
901         
902         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
903         prune_thr.daemon = True
904         prune_thr.start()
905         
906         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
907         bcnode_thr.daemon = True
908         bcnode_thr.start()
909         
910         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
911         stratum_thr.daemon = True
912         stratum_thr.start()
913         
914         server.serve_forever()