Trigger merkletree update when block inv received from bitcoin p2p
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2013  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28 import logging.handlers
29
30 rootlogger = logging.getLogger(None)
31 logformat = getattr(config, 'LogFormat', '%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s')
32 logformatter = logging.Formatter(logformat)
33 if len(rootlogger.handlers) == 0:
34         logging.basicConfig(
35                 format=logformat,
36                 level=logging.DEBUG,
37         )
38         for infoOnly in (
39                 'checkShare',
40                 'getTarget',
41                 'JSONRPCHandler',
42                 'JSONRPCServer',
43                 'merkleMaker',
44                 'StratumServer',
45                 'Waker for JSONRPCServer',
46                 'Waker for StratumServer',
47                 'WorkLogPruner'
48         ):
49                 logging.getLogger(infoOnly).setLevel(logging.INFO)
50 if getattr(config, 'LogToSysLog', False):
51         sysloghandler = logging.handlers.SysLogHandler(address = '/dev/log')
52         rootlogger.addHandler(sysloghandler)
53 if hasattr(config, 'LogFile'):
54         if isinstance(config.LogFile, str):
55                 filehandler = logging.FileHandler(config.LogFile)
56         else:
57                 filehandler = logging.handlers.TimedRotatingFileHandler(**config.LogFile)
58         filehandler.setFormatter(logformatter)
59         rootlogger.addHandler(filehandler)
60
61 def RaiseRedFlags(reason):
62         logging.getLogger('redflag').critical(reason)
63         return reason
64
65
66 from bitcoin.node import BitcoinLink, BitcoinNode
67 bcnode = BitcoinNode(config.UpstreamNetworkId)
68 bcnode.userAgent += b'Eloipool:0.1/'
69 bcnode.newBlock = lambda blkhash: MM.updateMerkleTree()
70
71 import jsonrpc
72
73 try:
74         import jsonrpc.authproxy
75         jsonrpc.authproxy.USER_AGENT = 'Eloipool/0.1'
76 except:
77         pass
78
79
80 from bitcoin.script import BitcoinScript
81 from bitcoin.txn import Txn
82 from base58 import b58decode
83 from struct import pack
84 import subprocess
85 from time import time
86
87 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
88         txn = Txn.new()
89         
90         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
91                 coinbased = 0
92                 try:
93                         cmd = config.CoinbaserCmd
94                         cmd = cmd.replace('%d', str(coinbaseValue))
95                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
96                         nout = int(p.stdout.readline())
97                         for i in range(nout):
98                                 amount = int(p.stdout.readline())
99                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
100                                 pkScript = BitcoinScript.toAddress(addr)
101                                 txn.addOutput(amount, pkScript)
102                                 coinbased += amount
103                 except:
104                         coinbased = coinbaseValue + 1
105                 if coinbased >= coinbaseValue:
106                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
107                         txn.outputs = []
108                 else:
109                         coinbaseValue -= coinbased
110         
111         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
112         txn.addOutput(coinbaseValue, pkScript)
113         
114         # TODO
115         # TODO: red flag on dupe coinbase
116         return txn
117
118
119 import jsonrpc_getwork
120 from util import Bits2Target
121
122 workLog = {}
123 userStatus = {}
124 networkTarget = None
125 DupeShareHACK = {}
126
127 server = None
128 stratumsrv = None
129 def updateBlocks():
130         server.wakeLongpoll()
131         stratumsrv.updateJob()
132
133 def blockChanged():
134         global MM, networkTarget, server
135         bits = MM.currentBlock[2]
136         if bits is None:
137                 networkTarget = None
138         else:
139                 networkTarget = Bits2Target(bits)
140         if MM.lastBlock != (None, None, None):
141                 global DupeShareHACK
142                 DupeShareHACK = {}
143                 jsonrpc_getwork._CheckForDupesHACK = {}
144                 workLog.clear()
145         server.wakeLongpoll(wantClear=True)
146         stratumsrv.updateJob(wantClear=True)
147
148
149 from time import sleep, time
150 import traceback
151
152 def _WorkLogPruner_I(wl):
153         now = time()
154         pruned = 0
155         for username in wl:
156                 userwork = wl[username]
157                 for wli in tuple(userwork.keys()):
158                         if now > userwork[wli][1] + 120:
159                                 del userwork[wli]
160                                 pruned += 1
161         WorkLogPruner.logger.debug('Pruned %d jobs' % (pruned,))
162
163 def WorkLogPruner(wl):
164         while True:
165                 try:
166                         sleep(60)
167                         _WorkLogPruner_I(wl)
168                 except:
169                         WorkLogPruner.logger.error(traceback.format_exc())
170 WorkLogPruner.logger = logging.getLogger('WorkLogPruner')
171
172
173 from merklemaker import merkleMaker
174 MM = merkleMaker()
175 MM.__dict__.update(config.__dict__)
176 MM.makeCoinbaseTxn = makeCoinbaseTxn
177 MM.onBlockChange = blockChanged
178 MM.onBlockUpdate = updateBlocks
179
180
181 from binascii import b2a_hex
182 from copy import deepcopy
183 from math import ceil, log
184 from merklemaker import MakeBlockHeader
185 from struct import pack, unpack
186 import threading
187 from time import time
188 from util import PendingUpstream, RejectedShare, bdiff1target, dblsha, LEhash2int, swap32, target2bdiff, target2pdiff
189 import jsonrpc
190 import traceback
191
192 gotwork = None
193 if hasattr(config, 'GotWorkURI'):
194         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
195
196 if not hasattr(config, 'DelayLogForUpstream'):
197         config.DelayLogForUpstream = False
198
199 if not hasattr(config, 'DynamicTargetting'):
200         config.DynamicTargetting = 0
201 else:
202         if not hasattr(config, 'DynamicTargetWindow'):
203                 config.DynamicTargetWindow = 120
204         config.DynamicTargetGoal *= config.DynamicTargetWindow / 60
205
206 def submitGotwork(info):
207         try:
208                 gotwork.gotwork(info)
209         except:
210                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
211
212 def clampTarget(target, DTMode):
213         # ShareTarget is the minimum
214         if target is None or target > config.ShareTarget:
215                 target = config.ShareTarget
216         
217         # Never target above the network, as we'd lose blocks
218         if target < networkTarget:
219                 target = networkTarget
220         
221         if DTMode == 2:
222                 # Ceil target to a power of two :)
223                 truebits = log(target, 2)
224                 if target <= 2**int(truebits):
225                         # Workaround for bug in Python's math.log function
226                         truebits = int(truebits)
227                 target = 2**ceil(truebits) - 1
228         elif DTMode == 3:
229                 # Round target to multiple of bdiff 1
230                 target = bdiff1target / int(round(target2bdiff(target)))
231         
232         # Return None for ShareTarget to save memory
233         if target == config.ShareTarget:
234                 return None
235         return target
236
237 def getTarget(username, now, DTMode = None, RequestedTarget = None):
238         if DTMode is None:
239                 DTMode = config.DynamicTargetting
240         if not DTMode:
241                 return None
242         if username in userStatus:
243                 status = userStatus[username]
244         else:
245                 # No record, use default target
246                 RequestedTarget = clampTarget(RequestedTarget, DTMode)
247                 userStatus[username] = [RequestedTarget, now, 0]
248                 return RequestedTarget
249         (targetIn, lastUpdate, work) = status
250         if work <= config.DynamicTargetGoal:
251                 if now < lastUpdate + config.DynamicTargetWindow and (targetIn is None or targetIn >= networkTarget):
252                         # No reason to change it just yet
253                         return clampTarget(targetIn, DTMode)
254                 if not work:
255                         # No shares received, reset to minimum
256                         if targetIn:
257                                 getTarget.logger.debug("No shares from %s, resetting to minimum target" % (repr(username),))
258                                 userStatus[username] = [None, now, 0]
259                         return clampTarget(None, DTMode)
260         
261         deltaSec = now - lastUpdate
262         target = targetIn or config.ShareTarget
263         target = int(target * config.DynamicTargetGoal * deltaSec / config.DynamicTargetWindow / work)
264         target = clampTarget(target, DTMode)
265         if target != targetIn:
266                 pfx = 'Retargetting %s' % (repr(username),)
267                 tin = targetIn or config.ShareTarget
268                 getTarget.logger.debug("%s from: %064x (pdiff %s)" % (pfx, tin, target2pdiff(tin)))
269                 tgt = target or config.ShareTarget
270                 getTarget.logger.debug("%s   to: %064x (pdiff %s)" % (pfx, tgt, target2pdiff(tgt)))
271         userStatus[username] = [target, now, 0]
272         return target
273 getTarget.logger = logging.getLogger('getTarget')
274
275 def TopTargets(n = 0x10):
276         tmp = list(k for k, v in userStatus.items() if not v[0] is None)
277         tmp.sort(key=lambda k: -userStatus[k][0])
278         tmp2 = {}
279         def t2d(t):
280                 if t not in tmp2:
281                         tmp2[t] = target2pdiff(t)
282                 return tmp2[t]
283         for k in tmp[-n:]:
284                 tgt = userStatus[k][0]
285                 print('%-34s %064x %3d' % (k, tgt, t2d(tgt)))
286
287 def RegisterWork(username, wli, wld, RequestedTarget = None):
288         now = time()
289         target = getTarget(username, now, RequestedTarget=RequestedTarget)
290         wld = tuple(wld) + (target,)
291         workLog.setdefault(username, {})[wli] = (wld, now)
292         return target or config.ShareTarget
293
294 def getBlockHeader(username):
295         MRD = MM.getMRD()
296         merkleRoot = MRD[0]
297         hdr = MakeBlockHeader(MRD)
298         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
299         target = RegisterWork(username, merkleRoot, MRD)
300         return (hdr, workLog[username][merkleRoot], target)
301
302 def getBlockTemplate(username, p_magic = None, RequestedTarget = None):
303         if server.tls.wantClear:
304                 wantClear = True
305         elif p_magic and username not in workLog:
306                 wantClear = True
307                 p_magic[0] = True
308         else:
309                 wantClear = False
310         MC = MM.getMC(wantClear)
311         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
312         wliPos = coinbase[0] + 2
313         wliLen = coinbase[wliPos - 1]
314         wli = coinbase[wliPos:wliPos+wliLen]
315         target = RegisterWork(username, wli, MC, RequestedTarget=RequestedTarget)
316         return (MC, workLog[username][wli], target)
317
318 def getStratumJob(jobid, wantClear = False):
319         MC = MM.getMC(wantClear)
320         (dummy, merkleTree, coinbase, prevBlock, bits) = MC[:5]
321         now = time()
322         workLog.setdefault(None, {})[jobid] = (MC, now)
323         return (MC, workLog[None][jobid])
324
325 def getExistingStratumJob(jobid):
326         wld = workLog[None][jobid]
327         return (wld[0], wld)
328
329 loggersShare = []
330
331 RBDs = []
332 RBPs = []
333
334 from bitcoin.varlen import varlenEncode, varlenDecode
335 import bitcoin.txn
336 from merklemaker import assembleBlock
337
338 if not hasattr(config, 'BlockSubmissions'):
339         config.BlockSubmissions = None
340
341 RBFs = []
342 def blockSubmissionThread(payload, blkhash, share):
343         if config.BlockSubmissions is None:
344                 servers = list(a for b in MM.TemplateSources for a in b)
345         else:
346                 servers = list(config.BlockSubmissions)
347         
348         if hasattr(share['merkletree'], 'source_uri'):
349                 servers.insert(0, {
350                         'access': jsonrpc.ServiceProxy(share['merkletree'].source_uri),
351                         'name': share['merkletree'].source,
352                 })
353         elif not servers:
354                 servers = list(a for b in MM.TemplateSources for a in b)
355         
356         myblock = (blkhash, payload[4:36])
357         payload = b2a_hex(payload).decode('ascii')
358         nexterr = 0
359         tries = 0
360         success = False
361         while len(servers):
362                 tries += 1
363                 TS = servers.pop(0)
364                 UpstreamBitcoindJSONRPC = TS['access']
365                 try:
366                         # BIP 22 standard submitblock
367                         reason = UpstreamBitcoindJSONRPC.submitblock(payload)
368                 except BaseException as gbterr:
369                         gbterr_fmt = traceback.format_exc()
370                         try:
371                                 try:
372                                         # bitcoind 0.5/0.6 getmemorypool
373                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload)
374                                 except:
375                                         # Old BIP 22 draft getmemorypool
376                                         reason = UpstreamBitcoindJSONRPC.getmemorypool(payload, {})
377                                 if reason is True:
378                                         reason = None
379                                 elif reason is False:
380                                         reason = 'rejected'
381                         except BaseException as gmperr:
382                                 now = time()
383                                 if now > nexterr:
384                                         # FIXME: This will show "Method not found" on pre-BIP22 servers
385                                         RaiseRedFlags(gbterr_fmt)
386                                         nexterr = now + 5
387                                 if MM.currentBlock[0] not in myblock and tries > len(servers):
388                                         RBFs.append( (('next block', MM.currentBlock, now, (gbterr, gmperr)), payload, blkhash, share) )
389                                         RaiseRedFlags('Giving up on submitting block to upstream \'%s\'' % (TS['name'],))
390                                         if share['upstreamRejectReason'] is PendingUpstream:
391                                                 share['upstreamRejectReason'] = 'GAVE UP'
392                                                 share['upstreamResult'] = False
393                                                 logShare(share)
394                                         return
395                                 
396                                 servers.append(TS)
397                                 continue
398                 
399                 # At this point, we have a reason back
400                 if reason:
401                         # FIXME: The returned value could be a list of multiple responses
402                         msg = 'Upstream \'%s\' block submission failed: %s' % (TS['name'], reason,)
403                         if success and reason in ('stale-prevblk', 'bad-prevblk', 'orphan', 'duplicate'):
404                                 # no big deal
405                                 blockSubmissionThread.logger.debug(msg)
406                         else:
407                                 RBFs.append( (('upstream reject', reason, time()), payload, blkhash, share) )
408                                 RaiseRedFlags(msg)
409                 else:
410                         blockSubmissionThread.logger.debug('Upstream \'%s\' accepted block' % (TS['name'],))
411                         success = True
412                 if share['upstreamRejectReason'] is PendingUpstream:
413                         share['upstreamRejectReason'] = reason
414                         share['upstreamResult'] = not reason
415                         logShare(share)
416 blockSubmissionThread.logger = logging.getLogger('blockSubmission')
417
418 def checkData(share):
419         data = share['data']
420         data = data[:80]
421         (prevBlock, height, bits) = MM.currentBlock
422         sharePrevBlock = data[4:36]
423         if sharePrevBlock != prevBlock:
424                 if sharePrevBlock == MM.lastBlock[0]:
425                         raise RejectedShare('stale-prevblk')
426                 raise RejectedShare('bad-prevblk')
427         
428         if data[72:76] != bits:
429                 raise RejectedShare('bad-diffbits')
430         
431         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
432         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
433         if data[1:4] != b'\0\0\0' or data[0] > 2:
434                 raise RejectedShare('bad-version')
435
436 def buildStratumData(share, merkleroot):
437         (prevBlock, height, bits) = MM.currentBlock
438         
439         data = b'\x02\0\0\0'
440         data += prevBlock
441         data += merkleroot
442         data += share['ntime'][::-1]
443         data += bits
444         data += share['nonce'][::-1]
445         
446         share['data'] = data
447         return data
448
449 def IsJobValid(wli, wluser = None):
450         if wluser not in workLog:
451                 return False
452         if wli not in workLog[wluser]:
453                 return False
454         (wld, issueT) = workLog[wluser][wli]
455         if time() < issueT - 120:
456                 return False
457         return True
458
459 def checkShare(share):
460         shareTime = share['time'] = time()
461         
462         username = share['username']
463         if 'data' in share:
464                 # getwork/GBT
465                 checkData(share)
466                 data = share['data']
467                 
468                 if username not in workLog:
469                         raise RejectedShare('unknown-user')
470                 MWL = workLog[username]
471                 
472                 shareMerkleRoot = data[36:68]
473                 if 'blkdata' in share:
474                         pl = share['blkdata']
475                         (txncount, pl) = varlenDecode(pl)
476                         cbtxn = bitcoin.txn.Txn(pl)
477                         othertxndata = cbtxn.disassemble(retExtra=True)
478                         coinbase = cbtxn.getCoinbase()
479                         wliPos = coinbase[0] + 2
480                         wliLen = coinbase[wliPos - 1]
481                         wli = coinbase[wliPos:wliPos+wliLen]
482                         mode = 'MC'
483                         moden = 1
484                 else:
485                         wli = shareMerkleRoot
486                         mode = 'MRD'
487                         moden = 0
488                         coinbase = None
489         else:
490                 # Stratum
491                 MWL = workLog[None]
492                 wli = share['jobid']
493                 buildStratumData(share, b'\0' * 32)
494                 mode = 'MC'
495                 moden = 1
496                 othertxndata = b''
497         
498         if wli not in MWL:
499                 raise RejectedShare('unknown-work')
500         (wld, issueT) = MWL[wli]
501         share[mode] = wld
502         
503         share['issuetime'] = issueT
504         
505         (workMerkleTree, workCoinbase) = wld[1:3]
506         share['merkletree'] = workMerkleTree
507         if 'jobid' in share:
508                 cbtxn = deepcopy(workMerkleTree.data[0])
509                 coinbase = workCoinbase + share['extranonce1'] + share['extranonce2']
510                 cbtxn.setCoinbase(coinbase)
511                 cbtxn.assemble()
512                 data = buildStratumData(share, workMerkleTree.withFirst(cbtxn))
513                 shareMerkleRoot = data[36:68]
514         
515         if data in DupeShareHACK:
516                 raise RejectedShare('duplicate')
517         DupeShareHACK[data] = None
518         
519         blkhash = dblsha(data)
520         if blkhash[28:] != b'\0\0\0\0':
521                 raise RejectedShare('H-not-zero')
522         blkhashn = LEhash2int(blkhash)
523         
524         global networkTarget
525         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
526         logfunc('BLKHASH: %64x' % (blkhashn,))
527         logfunc(' TARGET: %64x' % (networkTarget,))
528         
529         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
530         txlist = workMerkleTree.data
531         txlist = [deepcopy(txlist[0]),] + txlist[1:]
532         cbtxn = txlist[0]
533         cbtxn.setCoinbase(coinbase or workCoinbase)
534         cbtxn.assemble()
535         
536         if blkhashn <= networkTarget:
537                 logfunc("Submitting upstream")
538                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree, share, wld) ) )
539                 if not moden:
540                         payload = assembleBlock(data, txlist)
541                 else:
542                         payload = share['data']
543                         if len(othertxndata):
544                                 payload += share['blkdata']
545                         else:
546                                 payload += assembleBlock(data, txlist)[80:]
547                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
548                 RBPs.append(payload)
549                 threading.Thread(target=blockSubmissionThread, args=(payload, blkhash, share)).start()
550                 bcnode.submitBlock(payload)
551                 if config.DelayLogForUpstream:
552                         share['upstreamRejectReason'] = PendingUpstream
553                 else:
554                         share['upstreamRejectReason'] = None
555                         share['upstreamResult'] = True
556                 MM.updateBlock(blkhash)
557         
558         # Gotwork hack...
559         if gotwork and blkhashn <= config.GotWorkTarget:
560                 try:
561                         coinbaseMrkl = cbtxn.data
562                         coinbaseMrkl += blkhash
563                         steps = workMerkleTree._steps
564                         coinbaseMrkl += pack('B', len(steps))
565                         for step in steps:
566                                 coinbaseMrkl += step
567                         coinbaseMrkl += b"\0\0\0\0"
568                         info = {}
569                         info['hash'] = b2a_hex(blkhash).decode('ascii')
570                         info['header'] = b2a_hex(data).decode('ascii')
571                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
572                         thr = threading.Thread(target=submitGotwork, args=(info,))
573                         thr.daemon = True
574                         thr.start()
575                 except:
576                         checkShare.logger.warning('Failed to build gotwork request')
577         
578         if 'target' in share:
579                 workTarget = share['target']
580         elif len(wld) > 6:
581                 workTarget = wld[6]
582         else:
583                 workTarget = None
584         
585         if workTarget is None:
586                 workTarget = config.ShareTarget
587         if blkhashn > workTarget:
588                 raise RejectedShare('high-hash')
589         share['target'] = workTarget
590         share['_targethex'] = '%064x' % (workTarget,)
591         
592         shareTimestamp = unpack('<L', data[68:72])[0]
593         if shareTime < issueT - 120:
594                 raise RejectedShare('stale-work')
595         if shareTimestamp < shareTime - 300:
596                 raise RejectedShare('time-too-old')
597         if shareTimestamp > shareTime + 7200:
598                 raise RejectedShare('time-too-new')
599         
600         if config.DynamicTargetting and username in userStatus:
601                 # NOTE: userStatus[username] only doesn't exist across restarts
602                 status = userStatus[username]
603                 target = status[0] or config.ShareTarget
604                 if target == workTarget:
605                         userStatus[username][2] += 1
606                 else:
607                         userStatus[username][2] += float(target) / workTarget
608         
609         if moden:
610                 cbpre = workCoinbase
611                 cbpreLen = len(cbpre)
612                 if coinbase[:cbpreLen] != cbpre:
613                         raise RejectedShare('bad-cb-prefix')
614                 
615                 # Filter out known "I support" flags, to prevent exploits
616                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
617                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
618                                 raise RejectedShare('bad-cb-flag')
619                 
620                 if len(coinbase) > 100:
621                         raise RejectedShare('bad-cb-length')
622                 
623                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
624                         raise RejectedShare('bad-txnmrklroot')
625                 
626                 if len(othertxndata):
627                         allowed = assembleBlock(data, txlist)[80:]
628                         if allowed != share['blkdata']:
629                                 raise RejectedShare('bad-txns')
630 checkShare.logger = logging.getLogger('checkShare')
631
632 def logShare(share):
633         if '_origdata' in share:
634                 share['solution'] = share['_origdata']
635         else:
636                 share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
637         for i in loggersShare:
638                 i.logShare(share)
639
640 def receiveShare(share):
641         # TODO: username => userid
642         try:
643                 checkShare(share)
644         except RejectedShare as rej:
645                 share['rejectReason'] = str(rej)
646                 raise
647         except BaseException as e:
648                 share['rejectReason'] = 'ERROR'
649                 raise
650         finally:
651                 if not share.get('upstreamRejectReason', None) is PendingUpstream:
652                         logShare(share)
653
654 def newBlockNotification():
655         logging.getLogger('newBlockNotification').info('Received new block notification')
656         MM.updateMerkleTree()
657         # TODO: Force RESPOND TO LONGPOLLS?
658         pass
659
660 def newBlockNotificationSIGNAL(signum, frame):
661         # Use a new thread, in case the signal handler is called with locks held
662         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
663         thr.daemon = True
664         thr.start()
665
666 from signal import signal, SIGUSR1
667 signal(SIGUSR1, newBlockNotificationSIGNAL)
668
669
670 import os
671 import os.path
672 import pickle
673 import signal
674 import sys
675 from time import sleep
676 import traceback
677
678 SAVE_STATE_FILENAME = 'eloipool.worklog'
679
680 def stopServers():
681         logger = logging.getLogger('stopServers')
682         
683         if hasattr(stopServers, 'already'):
684                 logger.debug('Already tried to stop servers before')
685                 return
686         stopServers.already = True
687         
688         logger.info('Stopping servers...')
689         global bcnode, server
690         servers = (bcnode, server, stratumsrv)
691         for s in servers:
692                 s.keepgoing = False
693         for s in servers:
694                 try:
695                         s.wakeup()
696                 except:
697                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
698         i = 0
699         while True:
700                 sl = []
701                 for s in servers:
702                         if s.running:
703                                 sl.append(s.__class__.__name__)
704                 if not sl:
705                         break
706                 i += 1
707                 if i >= 0x100:
708                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
709                         break
710                 sleep(0.01)
711         
712         for s in servers:
713                 for fd in s._fd.keys():
714                         os.close(fd)
715
716 def stopLoggers():
717         for i in loggersShare:
718                 if hasattr(i, 'stop'):
719                         i.stop()
720
721 def saveState(t = None):
722         logger = logging.getLogger('saveState')
723         
724         # Then, save data needed to resume work
725         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
726         i = 0
727         while True:
728                 try:
729                         with open(SAVE_STATE_FILENAME, 'wb') as f:
730                                 pickle.dump(t, f)
731                                 pickle.dump(DupeShareHACK, f)
732                                 pickle.dump(workLog, f)
733                         break
734                 except:
735                         i += 1
736                         if i >= 0x10000:
737                                 logger.error('Failed to save work\n' + traceback.format_exc())
738                                 try:
739                                         os.unlink(SAVE_STATE_FILENAME)
740                                 except:
741                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
742
743 def exit():
744         t = time()
745         stopServers()
746         stopLoggers()
747         saveState(t)
748         logging.getLogger('exit').info('Goodbye...')
749         os.kill(os.getpid(), signal.SIGTERM)
750         sys.exit(0)
751
752 def restart():
753         t = time()
754         stopServers()
755         stopLoggers()
756         saveState(t)
757         logging.getLogger('restart').info('Restarting...')
758         try:
759                 os.execv(sys.argv[0], sys.argv)
760         except:
761                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
762
763 def restoreState():
764         if not os.path.exists(SAVE_STATE_FILENAME):
765                 return
766         
767         global workLog, DupeShareHACK
768         
769         logger = logging.getLogger('restoreState')
770         s = os.stat(SAVE_STATE_FILENAME)
771         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
772         try:
773                 with open(SAVE_STATE_FILENAME, 'rb') as f:
774                         t = pickle.load(f)
775                         if type(t) == tuple:
776                                 if len(t) > 2:
777                                         # Future formats, not supported here
778                                         ver = t[3]
779                                         # TODO
780                                 
781                                 # Old format, from 2012-02-02 to 2012-02-03
782                                 workLog = t[0]
783                                 DupeShareHACK = t[1]
784                                 t = None
785                         else:
786                                 if isinstance(t, dict):
787                                         # Old format, from 2012-02-03 to 2012-02-03
788                                         DupeShareHACK = t
789                                         t = None
790                                 else:
791                                         # Current format, from 2012-02-03 onward
792                                         DupeShareHACK = pickle.load(f)
793                                 
794                                 if t + 120 >= time():
795                                         workLog = pickle.load(f)
796                                 else:
797                                         logger.debug('Skipping restore of expired workLog')
798         except:
799                 logger.error('Failed to restore state\n' + traceback.format_exc())
800                 return
801         logger.info('State restored successfully')
802         if t:
803                 logger.info('Total downtime: %g seconds' % (time() - t,))
804
805
806 from jsonrpcserver import JSONRPCListener, JSONRPCServer
807 import interactivemode
808 from networkserver import NetworkListener
809 import threading
810 import sharelogging
811 from stratumserver import StratumServer
812 import imp
813
814 if __name__ == "__main__":
815         if not hasattr(config, 'ShareLogging'):
816                 config.ShareLogging = ()
817         if hasattr(config, 'DbOptions'):
818                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
819                 config.ShareLogging = list(config.ShareLogging)
820                 config.ShareLogging.append( {
821                         'type': 'sql',
822                         'engine': 'postgres',
823                         'dbopts': config.DbOptions,
824                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
825                 } )
826         for i in config.ShareLogging:
827                 if not hasattr(i, 'keys'):
828                         name, parameters = i
829                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
830                         if name == 'postgres':
831                                 name = 'sql'
832                                 i = {
833                                         'engine': 'postgres',
834                                         'dbopts': parameters,
835                                 }
836                         elif name == 'logfile':
837                                 i = {}
838                                 i['thropts'] = parameters
839                                 if 'filename' in parameters:
840                                         i['filename'] = parameters['filename']
841                                         i['thropts'] = dict(i['thropts'])
842                                         del i['thropts']['filename']
843                         else:
844                                 i = parameters
845                         i['type'] = name
846                 
847                 name = i['type']
848                 parameters = i
849                 try:
850                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
851                         m = imp.load_module(name, fp, pathname, description)
852                         lo = getattr(m, name)(**parameters)
853                         loggersShare.append(lo)
854                 except:
855                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
856
857         LSbc = []
858         if not hasattr(config, 'BitcoinNodeAddresses'):
859                 config.BitcoinNodeAddresses = ()
860         for a in config.BitcoinNodeAddresses:
861                 LSbc.append(NetworkListener(bcnode, a))
862         
863         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
864                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
865         
866         import jsonrpc_getblocktemplate
867         import jsonrpc_getwork
868         import jsonrpc_setworkaux
869         
870         server = JSONRPCServer()
871         server.tls = threading.local()
872         server.tls.wantClear = False
873         if hasattr(config, 'JSONRPCAddress'):
874                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
875                 if not hasattr(config, 'JSONRPCAddresses'):
876                         config.JSONRPCAddresses = []
877                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
878         LS = []
879         for a in config.JSONRPCAddresses:
880                 LS.append(JSONRPCListener(server, a))
881         if hasattr(config, 'SecretUser'):
882                 server.SecretUser = config.SecretUser
883         server.aux = MM.CoinbaseAux
884         server.getBlockHeader = getBlockHeader
885         server.getBlockTemplate = getBlockTemplate
886         server.receiveShare = receiveShare
887         server.RaiseRedFlags = RaiseRedFlags
888         server.ShareTarget = config.ShareTarget
889         
890         if hasattr(config, 'TrustedForwarders'):
891                 server.TrustedForwarders = config.TrustedForwarders
892         server.ServerName = config.ServerName
893         
894         stratumsrv = StratumServer()
895         stratumsrv.getStratumJob = getStratumJob
896         stratumsrv.getExistingStratumJob = getExistingStratumJob
897         stratumsrv.receiveShare = receiveShare
898         stratumsrv.getTarget = getTarget
899         stratumsrv.defaultTarget = config.ShareTarget
900         stratumsrv.IsJobValid = IsJobValid
901         if not hasattr(config, 'StratumAddresses'):
902                 config.StratumAddresses = ()
903         for a in config.StratumAddresses:
904                 NetworkListener(stratumsrv, a)
905         
906         MM.start()
907         
908         restoreState()
909         
910         prune_thr = threading.Thread(target=WorkLogPruner, args=(workLog,))
911         prune_thr.daemon = True
912         prune_thr.start()
913         
914         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
915         bcnode_thr.daemon = True
916         bcnode_thr.start()
917         
918         stratum_thr = threading.Thread(target=stratumsrv.serve_forever)
919         stratum_thr.daemon = True
920         stratum_thr.start()
921         
922         server.serve_forever()