Merge branch 'bugfix_proxy_sharetarget'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         networkTarget = Bits2Target(MM.currentBlock[1])
106         workLog.clear()
107         updateBlocks()
108
109
110 from merklemaker import merkleMaker
111 MM = merkleMaker()
112 MM.__dict__.update(config.__dict__)
113 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
114 MM.clearCoinbaseTxn.assemble()
115 MM.makeCoinbaseTxn = makeCoinbaseTxn
116 MM.onBlockChange = blockChanged
117 MM.onBlockUpdate = updateBlocks
118
119
120 from binascii import b2a_hex
121 from copy import deepcopy
122 from struct import pack, unpack
123 import threading
124 from time import time
125 from util import RejectedShare, dblsha, hash2int, swap32
126 import jsonrpc
127 import traceback
128
129 gotwork = None
130 if hasattr(config, 'GotWorkURI'):
131         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
132
133 def submitGotwork(info):
134         try:
135                 gotwork.gotwork(info)
136         except:
137                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
138
139 def getBlockHeader(username):
140         MRD = MM.getMRD()
141         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
142         timestamp = pack('<L', int(time()))
143         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
144         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
145         return (hdr, workLog[username][merkleRoot])
146
147 def getBlockTemplate(username):
148         MC = MM.getMC()
149         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
150         wliLen = coinbase[0]
151         wli = coinbase[1:wliLen+1]
152         workLog.setdefault(username, {})[wli] = (MC, time())
153         return MC
154
155 loggersShare = []
156
157 RBDs = []
158 RBPs = []
159
160 from bitcoin.varlen import varlenEncode, varlenDecode
161 import bitcoin.txn
162 def assembleBlock(blkhdr, txlist):
163         payload = blkhdr
164         payload += varlenEncode(len(txlist))
165         for tx in txlist:
166                 payload += tx.data
167         return payload
168
169 def blockSubmissionThread(payload):
170         while True:
171                 try:
172                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
173                         break
174                 except:
175                         pass
176         if not rv:
177                 RaiseRedFlags('Upstream rejected block!')
178
179 _STA = '%064x' % (config.ShareTarget,)
180 def checkShare(share):
181         shareTime = share['time'] = time()
182         
183         data = share['data']
184         data = data[:80]
185         (prevBlock, bits) = MM.currentBlock
186         sharePrevBlock = data[4:36]
187         if sharePrevBlock != prevBlock:
188                 if sharePrevBlock == MM.lastBlock[0]:
189                         raise RejectedShare('stale-prevblk')
190                 raise RejectedShare('bad-prevblk')
191         
192         # TODO: use userid
193         username = share['username']
194         if username not in workLog:
195                 raise RejectedShare('unknown-user')
196         
197         if data[72:76] != bits:
198                 raise RejectedShare('bad-diffbits')
199         if data[:4] != b'\1\0\0\0':
200                 raise RejectedShare('bad-version')
201         
202         shareMerkleRoot = data[36:68]
203         if 'blkdata' in share:
204                 pl = share['blkdata']
205                 (txncount, pl) = varlenDecode(pl)
206                 cbtxn = bitcoin.txn.Txn(pl)
207                 cbtxn.disassemble(retExtra=True)
208                 coinbase = cbtxn.getCoinbase()
209                 wliLen = coinbase[0]
210                 wli = coinbase[1:wliLen+1]
211                 mode = 'MC'
212                 moden = 1
213         else:
214                 wli = shareMerkleRoot
215                 mode = 'MRD'
216                 moden = 0
217         
218         MWL = workLog[username]
219         if wli not in MWL:
220                 raise RejectedShare('unknown-work')
221         (wld, issueT) = MWL[wli]
222         share[mode] = wld
223         
224         if data in DupeShareHACK:
225                 raise RejectedShare('duplicate')
226         DupeShareHACK[data] = None
227         
228         blkhash = dblsha(data)
229         if blkhash[28:] != b'\0\0\0\0':
230                 raise RejectedShare('H-not-zero')
231         blkhashn = hash2int(blkhash)
232         
233         global networkTarget
234         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
235         logfunc('BLKHASH: %64x' % (blkhashn,))
236         logfunc(' TARGET: %64x' % (networkTarget,))
237         
238         workMerkleTree = wld[1]
239         workCoinbase = wld[2]
240         
241         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
242         txlist = workMerkleTree.data
243         txlist = [deepcopy(txlist[0]),] + txlist[1:]
244         cbtxn = txlist[0]
245         cbtxn.setCoinbase(workCoinbase)
246         cbtxn.assemble()
247         
248         if blkhashn <= networkTarget:
249                 logfunc("Submitting upstream")
250                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
251                 if not moden:
252                         payload = assembleBlock(data, txlist)
253                 else:
254                         payload = share['data'] + share['blkdata']
255                 logfunc('Real block payload: %s' % (payload,))
256                 RBPs.append(payload)
257                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
258                 bcnode.submitBlock(payload)
259                 share['upstreamResult'] = True
260                 MM.updateBlock(blkhash)
261         
262         # Gotwork hack...
263         if gotwork and blkhashn <= config.GotWorkTarget:
264                 try:
265                         coinbaseMrkl = cbtxn.data
266                         coinbaseMrkl += blkhash
267                         steps = workMerkleTree._steps
268                         coinbaseMrkl += pack('B', len(steps))
269                         for step in steps:
270                                 coinbaseMrkl += step
271                         coinbaseMrkl += b"\0\0\0\0"
272                         info = {}
273                         info['hash'] = b2a_hex(blkhash).decode('ascii')
274                         info['header'] = b2a_hex(data).decode('ascii')
275                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
276                         thr = threading.Thread(target=submitGotwork, args=(info,))
277                         thr.daemon = True
278                         thr.start()
279                 except:
280                         checkShare.logger.warning('Failed to build gotwork request')
281         
282         if blkhashn > config.ShareTarget:
283                 raise RejectedShare('high-hash')
284         share['target'] = config.ShareTarget
285         share['_targethex'] = _STA
286         
287         shareTimestamp = unpack('<L', data[68:72])[0]
288         if shareTime < issueT - 120:
289                 raise RejectedShare('stale-work')
290         if shareTimestamp < shareTime - 300:
291                 raise RejectedShare('time-too-old')
292         if shareTimestamp > shareTime + 7200:
293                 raise RejectedShare('time-too-new')
294         
295         if moden:
296                 cbpre = cbtxn.getCoinbase()
297                 cbpreLen = len(cbpre)
298                 if coinbase[:cbpreLen] != cbpre:
299                         raise RejectedShare('bad-cb-prefix')
300                 
301                 # Filter out known "I support" flags, to prevent exploits
302                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
303                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
304                                 raise RejectedShare('bad-cb-flag')
305                 
306                 if len(coinbase) > 100:
307                         raise RejectedShare('bad-cb-length')
308                 
309                 cbtxn.setCoinbase(coinbase)
310                 cbtxn.assemble()
311                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
312                         raise RejectedShare('bad-txnmrklroot')
313                 
314                 allowed = assembleBlock(data, txlist)
315                 if allowed != share['data'] + share['blkdata']:
316                         raise RejectedShare('bad-txns')
317 checkShare.logger = logging.getLogger('checkShare')
318
319 def receiveShare(share):
320         # TODO: username => userid
321         try:
322                 checkShare(share)
323         except RejectedShare as rej:
324                 share['rejectReason'] = str(rej)
325                 raise
326         finally:
327                 if '_origdata' in share:
328                         share['solution'] = share['_origdata']
329                 else:
330                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
331                 for i in loggersShare:
332                         i(share)
333
334 def newBlockNotification():
335         logging.getLogger('newBlockNotification').info('Received new block notification')
336         MM.updateMerkleTree()
337         # TODO: Force RESPOND TO LONGPOLLS?
338         pass
339
340 def newBlockNotificationSIGNAL(signum, frame):
341         # Use a new thread, in case the signal handler is called with locks held
342         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
343         thr.daemon = True
344         thr.start()
345
346 from signal import signal, SIGUSR1
347 signal(SIGUSR1, newBlockNotificationSIGNAL)
348
349
350 import os
351 import os.path
352 import pickle
353 import signal
354 import sys
355 from time import sleep
356 import traceback
357
358 SAVE_STATE_FILENAME = 'eloipool.worklog'
359
360 def stopServers():
361         logger = logging.getLogger('stopServers')
362         
363         if hasattr(stopServers, 'already'):
364                 logger.debug('Already tried to stop servers before')
365                 return
366         stopServers.already = True
367         
368         logger.info('Stopping servers...')
369         global bcnode, server
370         servers = (bcnode, server)
371         for s in servers:
372                 s.keepgoing = False
373         for s in servers:
374                 try:
375                         s.wakeup()
376                 except:
377                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
378         i = 0
379         while True:
380                 sl = []
381                 for s in servers:
382                         if s.running:
383                                 sl.append(s.__class__.__name__)
384                 if not sl:
385                         break
386                 i += 1
387                 if i >= 0x100:
388                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
389                         break
390                 sleep(0.01)
391         
392         for s in servers:
393                 for fd in s._fd.keys():
394                         os.close(fd)
395
396 def saveState(t = None):
397         logger = logging.getLogger('saveState')
398         
399         # Then, save data needed to resume work
400         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
401         i = 0
402         while True:
403                 try:
404                         with open(SAVE_STATE_FILENAME, 'wb') as f:
405                                 pickle.dump(t, f)
406                                 pickle.dump(DupeShareHACK, f)
407                                 pickle.dump(workLog, f)
408                         break
409                 except:
410                         i += 1
411                         if i >= 0x10000:
412                                 logger.error('Failed to save work\n' + traceback.format_exc())
413                                 try:
414                                         os.unlink(SAVE_STATE_FILENAME)
415                                 except:
416                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
417
418 def exit():
419         t = time()
420         stopServers()
421         saveState(t)
422         logging.getLogger('exit').info('Goodbye...')
423         os.kill(os.getpid(), signal.SIGTERM)
424         sys.exit(0)
425
426 def restart():
427         t = time()
428         stopServers()
429         saveState(t)
430         logging.getLogger('restart').info('Restarting...')
431         try:
432                 os.execv(sys.argv[0], sys.argv)
433         except:
434                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
435
436 def restoreState():
437         if not os.path.exists(SAVE_STATE_FILENAME):
438                 return
439         
440         global workLog, DupeShareHACK
441         
442         logger = logging.getLogger('restoreState')
443         s = os.stat(SAVE_STATE_FILENAME)
444         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
445         try:
446                 with open(SAVE_STATE_FILENAME, 'rb') as f:
447                         t = pickle.load(f)
448                         if type(t) == tuple:
449                                 if len(t) > 2:
450                                         # Future formats, not supported here
451                                         ver = t[3]
452                                         # TODO
453                                 
454                                 # Old format, from 2012-02-02 to 2012-02-03
455                                 workLog = t[0]
456                                 DupeShareHACK = t[1]
457                                 t = None
458                         else:
459                                 if isinstance(t, dict):
460                                         # Old format, from 2012-02-03 to 2012-02-03
461                                         DupeShareHACK = t
462                                         t = None
463                                 else:
464                                         # Current format, from 2012-02-03 onward
465                                         DupeShareHACK = pickle.load(f)
466                                 
467                                 if t + 120 >= time():
468                                         workLog = pickle.load(f)
469                                 else:
470                                         logger.debug('Skipping restore of expired workLog')
471         except:
472                 logger.error('Failed to restore state\n' + traceback.format_exc())
473                 return
474         logger.info('State restored successfully')
475         if t:
476                 logger.info('Total downtime: %g seconds' % (time() - t,))
477
478
479 from jsonrpcserver import JSONRPCListener, JSONRPCServer
480 import interactivemode
481 from networkserver import NetworkListener
482 import threading
483 import sharelogging
484 import imp
485
486 if __name__ == "__main__":
487         if not hasattr(config, 'ShareLogging'):
488                 config.ShareLogging = ()
489         if hasattr(config, 'DbOptions'):
490                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
491                 config.ShareLogging = list(config.ShareLogging)
492                 config.ShareLogging.append( {
493                         'type': 'sql',
494                         'engine': 'postgres',
495                         'dbopts': config.DbOptions,
496                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
497                 } )
498         for i in config.ShareLogging:
499                 if not hasattr(i, 'keys'):
500                         name, parameters = i
501                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
502                         if name == 'postgres':
503                                 name = 'sql'
504                                 i = {
505                                         'engine': 'postgres',
506                                         'dbopts': parameters,
507                                 }
508                         elif name == 'logfile':
509                                 i = {}
510                                 i['thropts'] = parameters
511                                 if 'filename' in parameters:
512                                         i['filename'] = parameters['filename']
513                                         i['thropts'] = dict(i['thropts'])
514                                         del i['thropts']['filename']
515                         else:
516                                 i = parameters
517                         i['type'] = name
518                 
519                 name = i['type']
520                 parameters = i
521                 try:
522                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
523                         m = imp.load_module(name, fp, pathname, description)
524                         lo = getattr(m, name)(**parameters)
525                         loggersShare.append(lo.logShare)
526                 except:
527                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
528
529         LSbc = []
530         if not hasattr(config, 'BitcoinNodeAddresses'):
531                 config.BitcoinNodeAddresses = ()
532         for a in config.BitcoinNodeAddresses:
533                 LSbc.append(NetworkListener(bcnode, a))
534         
535         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
536                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
537         
538         import jsonrpc_getmemorypool
539         import jsonrpc_getwork
540         import jsonrpc_setworkaux
541         
542         server = JSONRPCServer()
543         if hasattr(config, 'JSONRPCAddress'):
544                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
545                 if not hasattr(config, 'JSONRPCAddresses'):
546                         config.JSONRPCAddresses = []
547                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
548         LS = []
549         for a in config.JSONRPCAddresses:
550                 LS.append(JSONRPCListener(server, a))
551         if hasattr(config, 'SecretUser'):
552                 server.SecretUser = config.SecretUser
553         server.aux = MM.CoinbaseAux
554         server.getBlockHeader = getBlockHeader
555         server.getBlockTemplate = getBlockTemplate
556         server.receiveShare = receiveShare
557         server.RaiseRedFlags = RaiseRedFlags
558         server.ShareTarget = config.ShareTarget
559         
560         if hasattr(config, 'TrustedForwarders'):
561                 server.TrustedForwarders = config.TrustedForwarders
562         server.ServerName = config.ServerName
563         
564         MM.start()
565         
566         restoreState()
567         
568         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
569         bcnode_thr.daemon = True
570         bcnode_thr.start()
571         
572         server.serve_forever()