RaiseRedFlags if upstream rejects a block
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23
24 import logging
25
26 if len(logging.root.handlers) == 0:
27         logging.basicConfig(
28                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
29                 level=logging.DEBUG,
30         )
31         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
32                 logging.getLogger(infoOnly).setLevel(logging.INFO)
33
34 def RaiseRedFlags(reason):
35         logging.getLogger('redflag').critical(reason)
36         return reason
37
38
39 from bitcoin.node import BitcoinLink, BitcoinNode
40 bcnode = BitcoinNode(config.UpstreamNetworkId)
41 bcnode.userAgent += b'Eloipool:0.1/'
42
43 import jsonrpc
44 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
45
46
47 from bitcoin.script import BitcoinScript
48 from bitcoin.txn import Txn
49 from base58 import b58decode
50 from struct import pack
51 import subprocess
52 from time import time
53
54 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
55         txn = Txn.new()
56         
57         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
58                 coinbased = 0
59                 try:
60                         cmd = config.CoinbaserCmd
61                         cmd = cmd.replace('%d', str(coinbaseValue))
62                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63                         nout = int(p.stdout.readline())
64                         for i in range(nout):
65                                 amount = int(p.stdout.readline())
66                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
67                                 pkScript = BitcoinScript.toAddress(addr)
68                                 txn.addOutput(amount, pkScript)
69                                 coinbased += amount
70                 except:
71                         coinbased = coinbaseValue + 1
72                 if coinbased >= coinbaseValue:
73                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
74                         txn.outputs = []
75                 else:
76                         coinbaseValue -= coinbased
77         
78         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
79         txn.addOutput(coinbaseValue, pkScript)
80         
81         # TODO
82         # TODO: red flag on dupe coinbase
83         return txn
84
85
86 import jsonrpc_getwork
87 from util import Bits2Target
88
89 workLog = {}
90 networkTarget = None
91 DupeShareHACK = {}
92
93 server = None
94 def updateBlocks():
95         if server:
96                 server.wakeLongpoll()
97
98 def blockChanged():
99         global DupeShareHACK
100         DupeShareHACK = {}
101         jsonrpc_getwork._CheckForDupesHACK = {}
102         global MM, networkTarget, server
103         networkTarget = Bits2Target(MM.currentBlock[1])
104         workLog.clear()
105         updateBlocks()
106
107
108 from merklemaker import merkleMaker
109 MM = merkleMaker()
110 MM.__dict__.update(config.__dict__)
111 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
112 MM.clearCoinbaseTxn.assemble()
113 MM.makeCoinbaseTxn = makeCoinbaseTxn
114 MM.onBlockChange = blockChanged
115 MM.onBlockUpdate = updateBlocks
116 MM.start()
117
118
119 from binascii import b2a_hex
120 from copy import deepcopy
121 from struct import pack, unpack
122 import threading
123 from time import time
124 from util import RejectedShare, dblsha, hash2int, swap32
125 import jsonrpc
126 import traceback
127
128 gotwork = None
129 if hasattr(config, 'GotWorkURI'):
130         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
131
132 def submitGotwork(info):
133         try:
134                 gotwork.gotwork(info)
135         except:
136                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
137
138 def getBlockHeader(username):
139         MRD = MM.getMRD()
140         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
141         timestamp = pack('<L', int(time()))
142         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
143         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
144         return (hdr, workLog[username][merkleRoot])
145
146 def getBlockTemplate(username):
147         MC = MM.getMC()
148         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
149         wliLen = coinbase[0]
150         wli = coinbase[1:wliLen+1]
151         workLog.setdefault(username, {})[wli] = (MC, time())
152         return MC
153
154 loggersShare = []
155
156 RBDs = []
157 RBPs = []
158
159 from bitcoin.varlen import varlenEncode, varlenDecode
160 import bitcoin.txn
161 def assembleBlock(blkhdr, txlist):
162         payload = blkhdr
163         payload += varlenEncode(len(txlist))
164         for tx in txlist:
165                 payload += tx.data
166         return payload
167
168 def blockSubmissionThread(payload):
169         while True:
170                 try:
171                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
172                         break
173                 except:
174                         pass
175         if not rv:
176                 RaiseRedFlags('Upstream rejected block!')
177
178 def checkShare(share):
179         shareTime = share['time'] = time()
180         
181         data = share['data']
182         data = data[:80]
183         (prevBlock, bits) = MM.currentBlock
184         sharePrevBlock = data[4:36]
185         if sharePrevBlock != prevBlock:
186                 if sharePrevBlock == MM.lastBlock[0]:
187                         raise RejectedShare('stale-prevblk')
188                 raise RejectedShare('bad-prevblk')
189         
190         # TODO: use userid
191         username = share['username']
192         if username not in workLog:
193                 raise RejectedShare('unknown-user')
194         
195         if data[72:76] != bits:
196                 raise RejectedShare('bad-diffbits')
197         if data[:4] != b'\1\0\0\0':
198                 raise RejectedShare('bad-version')
199         
200         shareMerkleRoot = data[36:68]
201         if 'blkdata' in share:
202                 pl = share['blkdata']
203                 (txncount, pl) = varlenDecode(pl)
204                 cbtxn = bitcoin.txn.Txn(pl)
205                 cbtxn.disassemble(retExtra=True)
206                 coinbase = cbtxn.getCoinbase()
207                 wliLen = coinbase[0]
208                 wli = coinbase[1:wliLen+1]
209                 mode = 'MC'
210                 moden = 1
211         else:
212                 wli = shareMerkleRoot
213                 mode = 'MRD'
214                 moden = 0
215         
216         MWL = workLog[username]
217         if wli not in MWL:
218                 raise RejectedShare('unknown-work')
219         (wld, issueT) = MWL[wli]
220         share[mode] = wld
221         
222         if data in DupeShareHACK:
223                 raise RejectedShare('duplicate')
224         DupeShareHACK[data] = None
225         
226         blkhash = dblsha(data)
227         if blkhash[28:] != b'\0\0\0\0':
228                 raise RejectedShare('H-not-zero')
229         blkhashn = hash2int(blkhash)
230         
231         global networkTarget
232         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
233         logfunc('BLKHASH: %64x' % (blkhashn,))
234         logfunc(' TARGET: %64x' % (networkTarget,))
235         
236         workMerkleTree = wld[1]
237         workCoinbase = wld[2]
238         
239         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
240         txlist = workMerkleTree.data
241         txlist = [deepcopy(txlist[0]),] + txlist[1:]
242         cbtxn = txlist[0]
243         cbtxn.setCoinbase(workCoinbase)
244         cbtxn.assemble()
245         
246         if blkhashn <= networkTarget:
247                 logfunc("Submitting upstream")
248                 if not moden:
249                         RBDs.append( deepcopy( (data, txlist) ) )
250                         payload = assembleBlock(data, txlist)
251                 else:
252                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
253                         payload = share['data'] + share['blkdata']
254                 logfunc('Real block payload: %s' % (payload,))
255                 RBPs.append(payload)
256                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
257                 bcnode.submitBlock(payload)
258                 share['upstreamResult'] = True
259                 MM.updateBlock(blkhash)
260         
261         # Gotwork hack...
262         if gotwork and blkhashn <= config.GotWorkTarget:
263                 try:
264                         coinbaseMrkl = cbtxn.data
265                         coinbaseMrkl += blkhash
266                         steps = workMerkleTree._steps
267                         coinbaseMrkl += pack('B', len(steps))
268                         for step in steps:
269                                 coinbaseMrkl += step
270                         coinbaseMrkl += b"\0\0\0\0"
271                         info = {}
272                         info['hash'] = b2a_hex(blkhash).decode('ascii')
273                         info['header'] = b2a_hex(data).decode('ascii')
274                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
275                         thr = threading.Thread(target=submitGotwork, args=(info,))
276                         thr.daemon = True
277                         thr.start()
278                 except:
279                         checkShare.logger.warning('Failed to build gotwork request')
280         
281         shareTimestamp = unpack('<L', data[68:72])[0]
282         if shareTime < issueT - 120:
283                 raise RejectedShare('stale-work')
284         if shareTimestamp < shareTime - 300:
285                 raise RejectedShare('time-too-old')
286         if shareTimestamp > shareTime + 7200:
287                 raise RejectedShare('time-too-new')
288         
289         if moden:
290                 cbpre = cbtxn.getCoinbase()
291                 cbpreLen = len(cbpre)
292                 if coinbase[:cbpreLen] != cbpre:
293                         raise RejectedShare('bad-cb-prefix')
294                 
295                 # Filter out known "I support" flags, to prevent exploits
296                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
297                         if coinbase.find(ff) > cbpreLen - len(ff):
298                                 raise RejectedShare('bad-cb-flag')
299                 
300                 if len(coinbase) > 100:
301                         raise RejectedShare('bad-cb-length')
302                 
303                 cbtxn.setCoinbase(coinbase)
304                 cbtxn.assemble()
305                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
306                         raise RejectedShare('bad-txnmrklroot')
307                 
308                 allowed = assembleBlock(data, txlist)
309                 if allowed != share['data'] + share['blkdata']:
310                         raise RejectedShare('bad-txns')
311 checkShare.logger = logging.getLogger('checkShare')
312
313 def receiveShare(share):
314         # TODO: username => userid
315         try:
316                 checkShare(share)
317         except RejectedShare as rej:
318                 share['rejectReason'] = str(rej)
319                 raise
320         finally:
321                 if '_origdata' in share:
322                         share['solution'] = share['_origdata']
323                 else:
324                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
325                 for i in loggersShare:
326                         i(share)
327
328 def newBlockNotification():
329         logging.getLogger('newBlockNotification').info('Received new block notification')
330         MM.updateMerkleTree()
331         # TODO: Force RESPOND TO LONGPOLLS?
332         pass
333
334 def newBlockNotificationSIGNAL(signum, frame):
335         # Use a new thread, in case the signal handler is called with locks held
336         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
337         thr.daemon = True
338         thr.start()
339
340 from signal import signal, SIGUSR1
341 signal(SIGUSR1, newBlockNotificationSIGNAL)
342
343
344 import os
345 import os.path
346 import pickle
347 import signal
348 import sys
349 from time import sleep
350 import traceback
351
352 SAVE_STATE_FILENAME = 'eloipool.worklog'
353
354 def stopServers():
355         logger = logging.getLogger('stopServers')
356         
357         logger.info('Stopping servers...')
358         global bcnode, server
359         servers = (bcnode, server)
360         for s in servers:
361                 s.keepgoing = False
362         for s in servers:
363                 s.wakeup()
364         i = 0
365         while True:
366                 sl = []
367                 for s in servers:
368                         if s.running:
369                                 sl.append(s.__class__.__name__)
370                 if not sl:
371                         break
372                 i += 1
373                 if i >= 0x100:
374                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
375                         break
376                 sleep(0.01)
377         
378         for s in servers:
379                 for fd in s._fd.keys():
380                         os.close(fd)
381
382 def saveState(t = None):
383         logger = logging.getLogger('saveState')
384         
385         # Then, save data needed to resume work
386         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
387         i = 0
388         while True:
389                 try:
390                         with open(SAVE_STATE_FILENAME, 'wb') as f:
391                                 pickle.dump(t, f)
392                                 pickle.dump(DupeShareHACK, f)
393                                 pickle.dump(workLog, f)
394                         break
395                 except:
396                         i += 1
397                         if i >= 0x10000:
398                                 logger.error('Failed to save work\n' + traceback.format_exc())
399                                 try:
400                                         os.unlink(SAVE_STATE_FILENAME)
401                                 except:
402                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
403
404 def exit():
405         t = time()
406         stopServers()
407         saveState(t)
408         logging.getLogger('exit').info('Goodbye...')
409         os.kill(os.getpid(), signal.SIGTERM)
410         sys.exit(0)
411
412 def restart():
413         t = time()
414         stopServers()
415         saveState(t)
416         logging.getLogger('restart').info('Restarting...')
417         try:
418                 os.execv(sys.argv[0], sys.argv)
419         except:
420                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
421
422 def restoreState():
423         if not os.path.exists(SAVE_STATE_FILENAME):
424                 return
425         
426         global workLog, DupeShareHACK
427         
428         logger = logging.getLogger('restoreState')
429         s = os.stat(SAVE_STATE_FILENAME)
430         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
431         try:
432                 with open(SAVE_STATE_FILENAME, 'rb') as f:
433                         t = pickle.load(f)
434                         if type(t) == tuple:
435                                 if len(t) > 2:
436                                         # Future formats, not supported here
437                                         ver = t[3]
438                                         # TODO
439                                 
440                                 # Old format, from 2012-02-02 to 2012-02-03
441                                 workLog = t[0]
442                                 DupeShareHACK = t[1]
443                                 t = None
444                         else:
445                                 if isinstance(t, dict):
446                                         # Old format, from 2012-02-03 to 2012-02-03
447                                         DupeShareHACK = t
448                                         t = None
449                                 else:
450                                         # Current format, from 2012-02-03 onward
451                                         DupeShareHACK = pickle.load(f)
452                                 
453                                 if t + 120 >= time():
454                                         workLog = pickle.load(f)
455                                 else:
456                                         logger.debug('Skipping restore of expired workLog')
457         except:
458                 logger.error('Failed to restore state\n' + traceback.format_exc())
459                 return
460         logger.info('State restored successfully')
461         if t:
462                 logger.info('Total downtime: %g seconds' % (time() - t,))
463
464
465 from jsonrpcserver import JSONRPCListener, JSONRPCServer
466 import interactivemode
467 from networkserver import NetworkListener
468 import threading
469 import sharelogging
470 import imp
471
472 if __name__ == "__main__":
473         if not hasattr(config, 'ShareLogging'):
474                 config.ShareLogging = ()
475         if hasattr(config, 'DbOptions'):
476                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
477                 config.ShareLogging = list(config.ShareLogging)
478                 config.ShareLogging.append( {
479                         'type': 'sql',
480                         'engine': 'postgres',
481                         'dbopts': config.DbOptions,
482                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
483                 } )
484         for i in config.ShareLogging:
485                 if not hasattr(i, 'keys'):
486                         name, parameters = i
487                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
488                         if name == 'postgres':
489                                 name = 'sql'
490                                 i = {
491                                         'engine': 'postgres',
492                                         'dbopts': parameters,
493                                 }
494                         elif name == 'logfile':
495                                 i = {}
496                                 i['thropts'] = parameters
497                                 if 'filename' in parameters:
498                                         i['filename'] = parameters['filename']
499                                         i['thropts'] = dict(i['thropts'])
500                                         del i['thropts']['filename']
501                         else:
502                                 i = parameters
503                         i['type'] = name
504                 
505                 name = i['type']
506                 parameters = i
507                 try:
508                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
509                         m = imp.load_module(name, fp, pathname, description)
510                         lo = getattr(m, name)(**parameters)
511                         loggersShare.append(lo.logShare)
512                 except:
513                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
514
515         LSbc = []
516         if not hasattr(config, 'BitcoinNodeAddresses'):
517                 config.BitcoinNodeAddresses = ()
518         for a in config.BitcoinNodeAddresses:
519                 LSbc.append(NetworkListener(bcnode, a))
520         
521         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
522                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
523         
524         import jsonrpc_getmemorypool
525         import jsonrpc_getwork
526         import jsonrpc_setworkaux
527         
528         server = JSONRPCServer()
529         if hasattr(config, 'JSONRPCAddress'):
530                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
531                 if not hasattr(config, 'JSONRPCAddresses'):
532                         config.JSONRPCAddresses = []
533                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
534         LS = []
535         for a in config.JSONRPCAddresses:
536                 LS.append(JSONRPCListener(server, a))
537         if hasattr(config, 'SecretUser'):
538                 server.SecretUser = config.SecretUser
539         server.aux = MM.CoinbaseAux
540         server.getBlockHeader = getBlockHeader
541         server.getBlockTemplate = getBlockTemplate
542         server.receiveShare = receiveShare
543         server.RaiseRedFlags = RaiseRedFlags
544         
545         server.TrustedForwarders = ()
546         if hasattr(config, 'TrustedForwarders'):
547                 server.TrustedForwarders = config.TrustedForwarders
548         server.ServerName = config.ServerName
549         
550         restoreState()
551         
552         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
553         bcnode_thr.daemon = True
554         bcnode_thr.start()
555         
556         server.serve_forever()