Merge branch 'bugfix_short_cb'
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         server.wakeLongpoll()
96
97 def blockChanged():
98         global DupeShareHACK
99         DupeShareHACK = {}
100         jsonrpc_getwork._CheckForDupesHACK = {}
101         global MM, networkTarget, server
102         networkTarget = Bits2Target(MM.currentBlock[1])
103         workLog.clear()
104         updateBlocks()
105
106
107 from merklemaker import merkleMaker
108 MM = merkleMaker()
109 MM.__dict__.update(config.__dict__)
110 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
111 MM.clearCoinbaseTxn.assemble()
112 MM.makeCoinbaseTxn = makeCoinbaseTxn
113 MM.onBlockChange = blockChanged
114 MM.onBlockUpdate = updateBlocks
115
116
117 from binascii import b2a_hex
118 from copy import deepcopy
119 from struct import pack, unpack
120 import threading
121 from time import time
122 from util import RejectedShare, dblsha, hash2int, swap32
123 import jsonrpc
124 import traceback
125
126 gotwork = None
127 if hasattr(config, 'GotWorkURI'):
128         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
129
130 def submitGotwork(info):
131         try:
132                 gotwork.gotwork(info)
133         except:
134                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return (hdr, workLog[username][merkleRoot])
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 loggersShare = []
153
154 RBDs = []
155 RBPs = []
156
157 from bitcoin.varlen import varlenEncode, varlenDecode
158 import bitcoin.txn
159 def assembleBlock(blkhdr, txlist):
160         payload = blkhdr
161         payload += varlenEncode(len(txlist))
162         for tx in txlist:
163                 payload += tx.data
164         return payload
165
166 def blockSubmissionThread(payload):
167         while True:
168                 try:
169                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
170                         break
171                 except:
172                         pass
173         if not rv:
174                 RaiseRedFlags('Upstream rejected block!')
175
176 def checkShare(share):
177         shareTime = share['time'] = time()
178         
179         data = share['data']
180         data = data[:80]
181         (prevBlock, bits) = MM.currentBlock
182         sharePrevBlock = data[4:36]
183         if sharePrevBlock != prevBlock:
184                 if sharePrevBlock == MM.lastBlock[0]:
185                         raise RejectedShare('stale-prevblk')
186                 raise RejectedShare('bad-prevblk')
187         
188         # TODO: use userid
189         username = share['username']
190         if username not in workLog:
191                 raise RejectedShare('unknown-user')
192         
193         if data[72:76] != bits:
194                 raise RejectedShare('bad-diffbits')
195         if data[:4] != b'\1\0\0\0':
196                 raise RejectedShare('bad-version')
197         
198         shareMerkleRoot = data[36:68]
199         if 'blkdata' in share:
200                 pl = share['blkdata']
201                 (txncount, pl) = varlenDecode(pl)
202                 cbtxn = bitcoin.txn.Txn(pl)
203                 cbtxn.disassemble(retExtra=True)
204                 coinbase = cbtxn.getCoinbase()
205                 wliLen = coinbase[0]
206                 wli = coinbase[1:wliLen+1]
207                 mode = 'MC'
208                 moden = 1
209         else:
210                 wli = shareMerkleRoot
211                 mode = 'MRD'
212                 moden = 0
213         
214         MWL = workLog[username]
215         if wli not in MWL:
216                 raise RejectedShare('unknown-work')
217         (wld, issueT) = MWL[wli]
218         share[mode] = wld
219         
220         if data in DupeShareHACK:
221                 raise RejectedShare('duplicate')
222         DupeShareHACK[data] = None
223         
224         blkhash = dblsha(data)
225         if blkhash[28:] != b'\0\0\0\0':
226                 raise RejectedShare('H-not-zero')
227         blkhashn = hash2int(blkhash)
228         
229         global networkTarget
230         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
231         logfunc('BLKHASH: %64x' % (blkhashn,))
232         logfunc(' TARGET: %64x' % (networkTarget,))
233         
234         workMerkleTree = wld[1]
235         workCoinbase = wld[2]
236         
237         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
238         txlist = workMerkleTree.data
239         txlist = [deepcopy(txlist[0]),] + txlist[1:]
240         cbtxn = txlist[0]
241         cbtxn.setCoinbase(workCoinbase)
242         cbtxn.assemble()
243         
244         if blkhashn <= networkTarget:
245                 logfunc("Submitting upstream")
246                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
247                 if not moden:
248                         payload = assembleBlock(data, txlist)
249                 else:
250                         payload = share['data'] + share['blkdata']
251                 logfunc('Real block payload: %s' % (payload,))
252                 RBPs.append(payload)
253                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
254                 bcnode.submitBlock(payload)
255                 share['upstreamResult'] = True
256                 MM.updateBlock(blkhash)
257         
258         # Gotwork hack...
259         if gotwork and blkhashn <= config.GotWorkTarget:
260                 try:
261                         coinbaseMrkl = cbtxn.data
262                         coinbaseMrkl += blkhash
263                         steps = workMerkleTree._steps
264                         coinbaseMrkl += pack('B', len(steps))
265                         for step in steps:
266                                 coinbaseMrkl += step
267                         coinbaseMrkl += b"\0\0\0\0"
268                         info = {}
269                         info['hash'] = b2a_hex(blkhash).decode('ascii')
270                         info['header'] = b2a_hex(data).decode('ascii')
271                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
272                         thr = threading.Thread(target=submitGotwork, args=(info,))
273                         thr.daemon = True
274                         thr.start()
275                 except:
276                         checkShare.logger.warning('Failed to build gotwork request')
277         
278         shareTimestamp = unpack('<L', data[68:72])[0]
279         if shareTime < issueT - 120:
280                 raise RejectedShare('stale-work')
281         if shareTimestamp < shareTime - 300:
282                 raise RejectedShare('time-too-old')
283         if shareTimestamp > shareTime + 7200:
284                 raise RejectedShare('time-too-new')
285         
286         if moden:
287                 cbpre = cbtxn.getCoinbase()
288                 cbpreLen = len(cbpre)
289                 if coinbase[:cbpreLen] != cbpre:
290                         raise RejectedShare('bad-cb-prefix')
291                 
292                 # Filter out known "I support" flags, to prevent exploits
293                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
294                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
295                                 raise RejectedShare('bad-cb-flag')
296                 
297                 if len(coinbase) > 100:
298                         raise RejectedShare('bad-cb-length')
299                 
300                 cbtxn.setCoinbase(coinbase)
301                 cbtxn.assemble()
302                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
303                         raise RejectedShare('bad-txnmrklroot')
304                 
305                 allowed = assembleBlock(data, txlist)
306                 if allowed != share['data'] + share['blkdata']:
307                         raise RejectedShare('bad-txns')
308 checkShare.logger = logging.getLogger('checkShare')
309
310 def receiveShare(share):
311         # TODO: username => userid
312         try:
313                 checkShare(share)
314         except RejectedShare as rej:
315                 share['rejectReason'] = str(rej)
316                 raise
317         finally:
318                 if '_origdata' in share:
319                         share['solution'] = share['_origdata']
320                 else:
321                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
322                 for i in loggersShare:
323                         i(share)
324
325 def newBlockNotification():
326         logging.getLogger('newBlockNotification').info('Received new block notification')
327         MM.updateMerkleTree()
328         # TODO: Force RESPOND TO LONGPOLLS?
329         pass
330
331 def newBlockNotificationSIGNAL(signum, frame):
332         # Use a new thread, in case the signal handler is called with locks held
333         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
334         thr.daemon = True
335         thr.start()
336
337 from signal import signal, SIGUSR1
338 signal(SIGUSR1, newBlockNotificationSIGNAL)
339
340
341 import os
342 import os.path
343 import pickle
344 import signal
345 import sys
346 from time import sleep
347 import traceback
348
349 SAVE_STATE_FILENAME = 'eloipool.worklog'
350
351 def stopServers():
352         logger = logging.getLogger('stopServers')
353         
354         if hasattr(stopServers, 'already'):
355                 logger.debug('Already tried to stop servers before')
356                 return
357         stopServers.already = True
358         
359         logger.info('Stopping servers...')
360         global bcnode, server
361         servers = (bcnode, server)
362         for s in servers:
363                 s.keepgoing = False
364         for s in servers:
365                 try:
366                         s.wakeup()
367                 except:
368                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
369         i = 0
370         while True:
371                 sl = []
372                 for s in servers:
373                         if s.running:
374                                 sl.append(s.__class__.__name__)
375                 if not sl:
376                         break
377                 i += 1
378                 if i >= 0x100:
379                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
380                         break
381                 sleep(0.01)
382         
383         for s in servers:
384                 for fd in s._fd.keys():
385                         os.close(fd)
386
387 def saveState(t = None):
388         logger = logging.getLogger('saveState')
389         
390         # Then, save data needed to resume work
391         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
392         i = 0
393         while True:
394                 try:
395                         with open(SAVE_STATE_FILENAME, 'wb') as f:
396                                 pickle.dump(t, f)
397                                 pickle.dump(DupeShareHACK, f)
398                                 pickle.dump(workLog, f)
399                         break
400                 except:
401                         i += 1
402                         if i >= 0x10000:
403                                 logger.error('Failed to save work\n' + traceback.format_exc())
404                                 try:
405                                         os.unlink(SAVE_STATE_FILENAME)
406                                 except:
407                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
408
409 def exit():
410         t = time()
411         stopServers()
412         saveState(t)
413         logging.getLogger('exit').info('Goodbye...')
414         os.kill(os.getpid(), signal.SIGTERM)
415         sys.exit(0)
416
417 def restart():
418         t = time()
419         stopServers()
420         saveState(t)
421         logging.getLogger('restart').info('Restarting...')
422         try:
423                 os.execv(sys.argv[0], sys.argv)
424         except:
425                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
426
427 def restoreState():
428         if not os.path.exists(SAVE_STATE_FILENAME):
429                 return
430         
431         global workLog, DupeShareHACK
432         
433         logger = logging.getLogger('restoreState')
434         s = os.stat(SAVE_STATE_FILENAME)
435         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
436         try:
437                 with open(SAVE_STATE_FILENAME, 'rb') as f:
438                         t = pickle.load(f)
439                         if type(t) == tuple:
440                                 if len(t) > 2:
441                                         # Future formats, not supported here
442                                         ver = t[3]
443                                         # TODO
444                                 
445                                 # Old format, from 2012-02-02 to 2012-02-03
446                                 workLog = t[0]
447                                 DupeShareHACK = t[1]
448                                 t = None
449                         else:
450                                 if isinstance(t, dict):
451                                         # Old format, from 2012-02-03 to 2012-02-03
452                                         DupeShareHACK = t
453                                         t = None
454                                 else:
455                                         # Current format, from 2012-02-03 onward
456                                         DupeShareHACK = pickle.load(f)
457                                 
458                                 if t + 120 >= time():
459                                         workLog = pickle.load(f)
460                                 else:
461                                         logger.debug('Skipping restore of expired workLog')
462         except:
463                 logger.error('Failed to restore state\n' + traceback.format_exc())
464                 return
465         logger.info('State restored successfully')
466         if t:
467                 logger.info('Total downtime: %g seconds' % (time() - t,))
468
469
470 from jsonrpcserver import JSONRPCListener, JSONRPCServer
471 import interactivemode
472 from networkserver import NetworkListener
473 import threading
474 import sharelogging
475 import imp
476
477 if __name__ == "__main__":
478         if not hasattr(config, 'ShareLogging'):
479                 config.ShareLogging = ()
480         if hasattr(config, 'DbOptions'):
481                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
482                 config.ShareLogging = list(config.ShareLogging)
483                 config.ShareLogging.append( {
484                         'type': 'sql',
485                         'engine': 'postgres',
486                         'dbopts': config.DbOptions,
487                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
488                 } )
489         for i in config.ShareLogging:
490                 if not hasattr(i, 'keys'):
491                         name, parameters = i
492                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
493                         if name == 'postgres':
494                                 name = 'sql'
495                                 i = {
496                                         'engine': 'postgres',
497                                         'dbopts': parameters,
498                                 }
499                         elif name == 'logfile':
500                                 i = {}
501                                 i['thropts'] = parameters
502                                 if 'filename' in parameters:
503                                         i['filename'] = parameters['filename']
504                                         i['thropts'] = dict(i['thropts'])
505                                         del i['thropts']['filename']
506                         else:
507                                 i = parameters
508                         i['type'] = name
509                 
510                 name = i['type']
511                 parameters = i
512                 try:
513                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
514                         m = imp.load_module(name, fp, pathname, description)
515                         lo = getattr(m, name)(**parameters)
516                         loggersShare.append(lo.logShare)
517                 except:
518                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
519
520         LSbc = []
521         if not hasattr(config, 'BitcoinNodeAddresses'):
522                 config.BitcoinNodeAddresses = ()
523         for a in config.BitcoinNodeAddresses:
524                 LSbc.append(NetworkListener(bcnode, a))
525         
526         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
527                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
528         
529         import jsonrpc_getmemorypool
530         import jsonrpc_getwork
531         import jsonrpc_setworkaux
532         
533         server = JSONRPCServer()
534         if hasattr(config, 'JSONRPCAddress'):
535                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
536                 if not hasattr(config, 'JSONRPCAddresses'):
537                         config.JSONRPCAddresses = []
538                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
539         LS = []
540         for a in config.JSONRPCAddresses:
541                 LS.append(JSONRPCListener(server, a))
542         if hasattr(config, 'SecretUser'):
543                 server.SecretUser = config.SecretUser
544         server.aux = MM.CoinbaseAux
545         server.getBlockHeader = getBlockHeader
546         server.getBlockTemplate = getBlockTemplate
547         server.receiveShare = receiveShare
548         server.RaiseRedFlags = RaiseRedFlags
549         
550         if hasattr(config, 'TrustedForwarders'):
551                 server.TrustedForwarders = config.TrustedForwarders
552         server.ServerName = config.ServerName
553         
554         MM.start()
555         
556         restoreState()
557         
558         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
559         bcnode_thr.daemon = True
560         bcnode_thr.start()
561         
562         server.serve_forever()