Propose new blocks with upstream server when it supports BIP 23 Block Proposal
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from merklemaker import MakeBlockHeader
127 from struct import pack, unpack
128 import threading
129 from time import time
130 from util import RejectedShare, dblsha, hash2int, swap32
131 import jsonrpc
132 import traceback
133
134 gotwork = None
135 if hasattr(config, 'GotWorkURI'):
136         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
137
138 def submitGotwork(info):
139         try:
140                 gotwork.gotwork(info)
141         except:
142                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
143
144 def getBlockHeader(username):
145         MRD = MM.getMRD()
146         merkleRoot = MRD[0]
147         hdr = MakeBlockHeader(MRD)
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 from merklemaker import assembleBlock
167
168 def blockSubmissionThread(payload):
169         while True:
170                 try:
171                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
172                         break
173                 except:
174                         pass
175         if not rv:
176                 RaiseRedFlags('Upstream rejected block!')
177
178 _STA = '%064x' % (config.ShareTarget,)
179 def checkShare(share):
180         shareTime = share['time'] = time()
181         
182         data = share['data']
183         data = data[:80]
184         (prevBlock, height, bits) = MM.currentBlock
185         sharePrevBlock = data[4:36]
186         if sharePrevBlock != prevBlock:
187                 if sharePrevBlock == MM.lastBlock[0]:
188                         raise RejectedShare('stale-prevblk')
189                 raise RejectedShare('bad-prevblk')
190         
191         # TODO: use userid
192         username = share['username']
193         if username not in workLog:
194                 raise RejectedShare('unknown-user')
195         
196         if data[72:76] != bits:
197                 raise RejectedShare('bad-diffbits')
198         
199         # Note that we should accept miners reducing version to 1 if they don't understand 2 yet
200         # FIXME: When the supermajority is upgraded to version 2, stop accepting 1!
201         if data[1:4] != b'\0\0\0' or data[0] > 2:
202                 raise RejectedShare('bad-version')
203         
204         shareMerkleRoot = data[36:68]
205         if 'blkdata' in share:
206                 pl = share['blkdata']
207                 (txncount, pl) = varlenDecode(pl)
208                 cbtxn = bitcoin.txn.Txn(pl)
209                 cbtxn.disassemble(retExtra=True)
210                 coinbase = cbtxn.getCoinbase()
211                 wliLen = coinbase[0]
212                 wli = coinbase[1:wliLen+1]
213                 mode = 'MC'
214                 moden = 1
215         else:
216                 wli = shareMerkleRoot
217                 mode = 'MRD'
218                 moden = 0
219         
220         MWL = workLog[username]
221         if wli not in MWL:
222                 raise RejectedShare('unknown-work')
223         (wld, issueT) = MWL[wli]
224         share[mode] = wld
225         
226         if data in DupeShareHACK:
227                 raise RejectedShare('duplicate')
228         DupeShareHACK[data] = None
229         
230         blkhash = dblsha(data)
231         if blkhash[28:] != b'\0\0\0\0':
232                 raise RejectedShare('H-not-zero')
233         blkhashn = hash2int(blkhash)
234         
235         global networkTarget
236         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
237         logfunc('BLKHASH: %64x' % (blkhashn,))
238         logfunc(' TARGET: %64x' % (networkTarget,))
239         
240         workMerkleTree = wld[1]
241         workCoinbase = wld[2]
242         
243         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
244         txlist = workMerkleTree.data
245         txlist = [deepcopy(txlist[0]),] + txlist[1:]
246         cbtxn = txlist[0]
247         cbtxn.setCoinbase(workCoinbase)
248         cbtxn.assemble()
249         
250         if blkhashn <= networkTarget:
251                 logfunc("Submitting upstream")
252                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
253                 if not moden:
254                         payload = assembleBlock(data, txlist)
255                 else:
256                         payload = share['data'] + share['blkdata']
257                 logfunc('Real block payload: %s' % (b2a_hex(payload).decode('utf8'),))
258                 RBPs.append(payload)
259                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
260                 bcnode.submitBlock(payload)
261                 share['upstreamResult'] = True
262                 MM.updateBlock(blkhash)
263         
264         # Gotwork hack...
265         if gotwork and blkhashn <= config.GotWorkTarget:
266                 try:
267                         coinbaseMrkl = cbtxn.data
268                         coinbaseMrkl += blkhash
269                         steps = workMerkleTree._steps
270                         coinbaseMrkl += pack('B', len(steps))
271                         for step in steps:
272                                 coinbaseMrkl += step
273                         coinbaseMrkl += b"\0\0\0\0"
274                         info = {}
275                         info['hash'] = b2a_hex(blkhash).decode('ascii')
276                         info['header'] = b2a_hex(data).decode('ascii')
277                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
278                         thr = threading.Thread(target=submitGotwork, args=(info,))
279                         thr.daemon = True
280                         thr.start()
281                 except:
282                         checkShare.logger.warning('Failed to build gotwork request')
283         
284         if blkhashn > config.ShareTarget:
285                 raise RejectedShare('high-hash')
286         share['target'] = config.ShareTarget
287         share['_targethex'] = _STA
288         
289         shareTimestamp = unpack('<L', data[68:72])[0]
290         if shareTime < issueT - 120:
291                 raise RejectedShare('stale-work')
292         if shareTimestamp < shareTime - 300:
293                 raise RejectedShare('time-too-old')
294         if shareTimestamp > shareTime + 7200:
295                 raise RejectedShare('time-too-new')
296         
297         if moden:
298                 cbpre = cbtxn.getCoinbase()
299                 cbpreLen = len(cbpre)
300                 if coinbase[:cbpreLen] != cbpre:
301                         raise RejectedShare('bad-cb-prefix')
302                 
303                 # Filter out known "I support" flags, to prevent exploits
304                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
305                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
306                                 raise RejectedShare('bad-cb-flag')
307                 
308                 if len(coinbase) > 100:
309                         raise RejectedShare('bad-cb-length')
310                 
311                 cbtxn.setCoinbase(coinbase)
312                 cbtxn.assemble()
313                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
314                         raise RejectedShare('bad-txnmrklroot')
315                 
316                 allowed = assembleBlock(data, txlist)
317                 if allowed != share['data'] + share['blkdata']:
318                         raise RejectedShare('bad-txns')
319 checkShare.logger = logging.getLogger('checkShare')
320
321 def receiveShare(share):
322         # TODO: username => userid
323         try:
324                 checkShare(share)
325         except RejectedShare as rej:
326                 share['rejectReason'] = str(rej)
327                 raise
328         finally:
329                 if '_origdata' in share:
330                         share['solution'] = share['_origdata']
331                 else:
332                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
333                 for i in loggersShare:
334                         i(share)
335
336 def newBlockNotification():
337         logging.getLogger('newBlockNotification').info('Received new block notification')
338         MM.updateMerkleTree()
339         # TODO: Force RESPOND TO LONGPOLLS?
340         pass
341
342 def newBlockNotificationSIGNAL(signum, frame):
343         # Use a new thread, in case the signal handler is called with locks held
344         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
345         thr.daemon = True
346         thr.start()
347
348 from signal import signal, SIGUSR1
349 signal(SIGUSR1, newBlockNotificationSIGNAL)
350
351
352 import os
353 import os.path
354 import pickle
355 import signal
356 import sys
357 from time import sleep
358 import traceback
359
360 SAVE_STATE_FILENAME = 'eloipool.worklog'
361
362 def stopServers():
363         logger = logging.getLogger('stopServers')
364         
365         if hasattr(stopServers, 'already'):
366                 logger.debug('Already tried to stop servers before')
367                 return
368         stopServers.already = True
369         
370         logger.info('Stopping servers...')
371         global bcnode, server
372         servers = (bcnode, server)
373         for s in servers:
374                 s.keepgoing = False
375         for s in servers:
376                 try:
377                         s.wakeup()
378                 except:
379                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
380         i = 0
381         while True:
382                 sl = []
383                 for s in servers:
384                         if s.running:
385                                 sl.append(s.__class__.__name__)
386                 if not sl:
387                         break
388                 i += 1
389                 if i >= 0x100:
390                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
391                         break
392                 sleep(0.01)
393         
394         for s in servers:
395                 for fd in s._fd.keys():
396                         os.close(fd)
397
398 def saveState(t = None):
399         logger = logging.getLogger('saveState')
400         
401         # Then, save data needed to resume work
402         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
403         i = 0
404         while True:
405                 try:
406                         with open(SAVE_STATE_FILENAME, 'wb') as f:
407                                 pickle.dump(t, f)
408                                 pickle.dump(DupeShareHACK, f)
409                                 pickle.dump(workLog, f)
410                         break
411                 except:
412                         i += 1
413                         if i >= 0x10000:
414                                 logger.error('Failed to save work\n' + traceback.format_exc())
415                                 try:
416                                         os.unlink(SAVE_STATE_FILENAME)
417                                 except:
418                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
419
420 def exit():
421         t = time()
422         stopServers()
423         saveState(t)
424         logging.getLogger('exit').info('Goodbye...')
425         os.kill(os.getpid(), signal.SIGTERM)
426         sys.exit(0)
427
428 def restart():
429         t = time()
430         stopServers()
431         saveState(t)
432         logging.getLogger('restart').info('Restarting...')
433         try:
434                 os.execv(sys.argv[0], sys.argv)
435         except:
436                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
437
438 def restoreState():
439         if not os.path.exists(SAVE_STATE_FILENAME):
440                 return
441         
442         global workLog, DupeShareHACK
443         
444         logger = logging.getLogger('restoreState')
445         s = os.stat(SAVE_STATE_FILENAME)
446         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
447         try:
448                 with open(SAVE_STATE_FILENAME, 'rb') as f:
449                         t = pickle.load(f)
450                         if type(t) == tuple:
451                                 if len(t) > 2:
452                                         # Future formats, not supported here
453                                         ver = t[3]
454                                         # TODO
455                                 
456                                 # Old format, from 2012-02-02 to 2012-02-03
457                                 workLog = t[0]
458                                 DupeShareHACK = t[1]
459                                 t = None
460                         else:
461                                 if isinstance(t, dict):
462                                         # Old format, from 2012-02-03 to 2012-02-03
463                                         DupeShareHACK = t
464                                         t = None
465                                 else:
466                                         # Current format, from 2012-02-03 onward
467                                         DupeShareHACK = pickle.load(f)
468                                 
469                                 if t + 120 >= time():
470                                         workLog = pickle.load(f)
471                                 else:
472                                         logger.debug('Skipping restore of expired workLog')
473         except:
474                 logger.error('Failed to restore state\n' + traceback.format_exc())
475                 return
476         logger.info('State restored successfully')
477         if t:
478                 logger.info('Total downtime: %g seconds' % (time() - t,))
479
480
481 from jsonrpcserver import JSONRPCListener, JSONRPCServer
482 import interactivemode
483 from networkserver import NetworkListener
484 import threading
485 import sharelogging
486 import imp
487
488 if __name__ == "__main__":
489         if not hasattr(config, 'ShareLogging'):
490                 config.ShareLogging = ()
491         if hasattr(config, 'DbOptions'):
492                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
493                 config.ShareLogging = list(config.ShareLogging)
494                 config.ShareLogging.append( {
495                         'type': 'sql',
496                         'engine': 'postgres',
497                         'dbopts': config.DbOptions,
498                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
499                 } )
500         for i in config.ShareLogging:
501                 if not hasattr(i, 'keys'):
502                         name, parameters = i
503                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
504                         if name == 'postgres':
505                                 name = 'sql'
506                                 i = {
507                                         'engine': 'postgres',
508                                         'dbopts': parameters,
509                                 }
510                         elif name == 'logfile':
511                                 i = {}
512                                 i['thropts'] = parameters
513                                 if 'filename' in parameters:
514                                         i['filename'] = parameters['filename']
515                                         i['thropts'] = dict(i['thropts'])
516                                         del i['thropts']['filename']
517                         else:
518                                 i = parameters
519                         i['type'] = name
520                 
521                 name = i['type']
522                 parameters = i
523                 try:
524                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
525                         m = imp.load_module(name, fp, pathname, description)
526                         lo = getattr(m, name)(**parameters)
527                         loggersShare.append(lo.logShare)
528                 except:
529                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
530
531         LSbc = []
532         if not hasattr(config, 'BitcoinNodeAddresses'):
533                 config.BitcoinNodeAddresses = ()
534         for a in config.BitcoinNodeAddresses:
535                 LSbc.append(NetworkListener(bcnode, a))
536         
537         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
538                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
539         
540         import jsonrpc_getmemorypool
541         import jsonrpc_getwork
542         import jsonrpc_setworkaux
543         
544         server = JSONRPCServer()
545         if hasattr(config, 'JSONRPCAddress'):
546                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
547                 if not hasattr(config, 'JSONRPCAddresses'):
548                         config.JSONRPCAddresses = []
549                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
550         LS = []
551         for a in config.JSONRPCAddresses:
552                 LS.append(JSONRPCListener(server, a))
553         if hasattr(config, 'SecretUser'):
554                 server.SecretUser = config.SecretUser
555         server.aux = MM.CoinbaseAux
556         server.getBlockHeader = getBlockHeader
557         server.getBlockTemplate = getBlockTemplate
558         server.receiveShare = receiveShare
559         server.RaiseRedFlags = RaiseRedFlags
560         server.ShareTarget = config.ShareTarget
561         
562         if hasattr(config, 'TrustedForwarders'):
563                 server.TrustedForwarders = config.TrustedForwarders
564         server.ServerName = config.ServerName
565         
566         MM.start()
567         
568         restoreState()
569         
570         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
571         bcnode_thr.daemon = True
572         bcnode_thr.start()
573         
574         server.serve_forever()