Merge branch 'master' into coinbase_height
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20 if not hasattr(config, 'ServerName'):
21         config.ServerName = 'Unnamed Eloipool'
22
23 if not hasattr(config, 'ShareTarget'):
24         config.ShareTarget = 0x00000000ffffffffffffffffffffffffffffffffffffffffffffffffffffffff
25
26
27 import logging
28
29 if len(logging.root.handlers) == 0:
30         logging.basicConfig(
31                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
32                 level=logging.DEBUG,
33         )
34         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
35                 logging.getLogger(infoOnly).setLevel(logging.INFO)
36
37 def RaiseRedFlags(reason):
38         logging.getLogger('redflag').critical(reason)
39         return reason
40
41
42 from bitcoin.node import BitcoinLink, BitcoinNode
43 bcnode = BitcoinNode(config.UpstreamNetworkId)
44 bcnode.userAgent += b'Eloipool:0.1/'
45
46 import jsonrpc
47 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
48
49
50 from bitcoin.script import BitcoinScript
51 from bitcoin.txn import Txn
52 from base58 import b58decode
53 from struct import pack
54 import subprocess
55 from time import time
56
57 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
58         txn = Txn.new()
59         
60         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
61                 coinbased = 0
62                 try:
63                         cmd = config.CoinbaserCmd
64                         cmd = cmd.replace('%d', str(coinbaseValue))
65                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
66                         nout = int(p.stdout.readline())
67                         for i in range(nout):
68                                 amount = int(p.stdout.readline())
69                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
70                                 pkScript = BitcoinScript.toAddress(addr)
71                                 txn.addOutput(amount, pkScript)
72                                 coinbased += amount
73                 except:
74                         coinbased = coinbaseValue + 1
75                 if coinbased >= coinbaseValue:
76                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
77                         txn.outputs = []
78                 else:
79                         coinbaseValue -= coinbased
80         
81         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
82         txn.addOutput(coinbaseValue, pkScript)
83         
84         # TODO
85         # TODO: red flag on dupe coinbase
86         return txn
87
88
89 import jsonrpc_getwork
90 from util import Bits2Target
91
92 workLog = {}
93 networkTarget = None
94 DupeShareHACK = {}
95
96 server = None
97 def updateBlocks():
98         server.wakeLongpoll()
99
100 def blockChanged():
101         global DupeShareHACK
102         DupeShareHACK = {}
103         jsonrpc_getwork._CheckForDupesHACK = {}
104         global MM, networkTarget, server
105         bits = MM.currentBlock[2]
106         if bits is None:
107                 networkTarget = None
108         else:
109                 networkTarget = Bits2Target(bits)
110         workLog.clear()
111         updateBlocks()
112
113
114 from merklemaker import merkleMaker
115 MM = merkleMaker()
116 MM.__dict__.update(config.__dict__)
117 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
118 MM.clearCoinbaseTxn.assemble()
119 MM.makeCoinbaseTxn = makeCoinbaseTxn
120 MM.onBlockChange = blockChanged
121 MM.onBlockUpdate = updateBlocks
122
123
124 from binascii import b2a_hex
125 from copy import deepcopy
126 from struct import pack, unpack
127 import threading
128 from time import time
129 from util import RejectedShare, dblsha, hash2int, swap32
130 import jsonrpc
131 import traceback
132
133 gotwork = None
134 if hasattr(config, 'GotWorkURI'):
135         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
136
137 def submitGotwork(info):
138         try:
139                 gotwork.gotwork(info)
140         except:
141                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
142
143 def getBlockHeader(username):
144         MRD = MM.getMRD()
145         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
146         timestamp = pack('<L', int(time()))
147         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
148         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
149         return (hdr, workLog[username][merkleRoot])
150
151 def getBlockTemplate(username):
152         MC = MM.getMC()
153         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
154         wliLen = coinbase[0]
155         wli = coinbase[1:wliLen+1]
156         workLog.setdefault(username, {})[wli] = (MC, time())
157         return MC
158
159 loggersShare = []
160
161 RBDs = []
162 RBPs = []
163
164 from bitcoin.varlen import varlenEncode, varlenDecode
165 import bitcoin.txn
166 def assembleBlock(blkhdr, txlist):
167         payload = blkhdr
168         payload += varlenEncode(len(txlist))
169         for tx in txlist:
170                 payload += tx.data
171         return payload
172
173 def blockSubmissionThread(payload):
174         while True:
175                 try:
176                         rv = UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
177                         break
178                 except:
179                         pass
180         if not rv:
181                 RaiseRedFlags('Upstream rejected block!')
182
183 _STA = '%064x' % (config.ShareTarget,)
184 def checkShare(share):
185         shareTime = share['time'] = time()
186         
187         data = share['data']
188         data = data[:80]
189         (prevBlock, height, bits) = MM.currentBlock
190         sharePrevBlock = data[4:36]
191         if sharePrevBlock != prevBlock:
192                 if sharePrevBlock == MM.lastBlock[0]:
193                         raise RejectedShare('stale-prevblk')
194                 raise RejectedShare('bad-prevblk')
195         
196         # TODO: use userid
197         username = share['username']
198         if username not in workLog:
199                 raise RejectedShare('unknown-user')
200         
201         if data[72:76] != bits:
202                 raise RejectedShare('bad-diffbits')
203         if data[:4] != b'\1\0\0\0':
204                 raise RejectedShare('bad-version')
205         
206         shareMerkleRoot = data[36:68]
207         if 'blkdata' in share:
208                 pl = share['blkdata']
209                 (txncount, pl) = varlenDecode(pl)
210                 cbtxn = bitcoin.txn.Txn(pl)
211                 cbtxn.disassemble(retExtra=True)
212                 coinbase = cbtxn.getCoinbase()
213                 wliLen = coinbase[0]
214                 wli = coinbase[1:wliLen+1]
215                 mode = 'MC'
216                 moden = 1
217         else:
218                 wli = shareMerkleRoot
219                 mode = 'MRD'
220                 moden = 0
221         
222         MWL = workLog[username]
223         if wli not in MWL:
224                 raise RejectedShare('unknown-work')
225         (wld, issueT) = MWL[wli]
226         share[mode] = wld
227         
228         if data in DupeShareHACK:
229                 raise RejectedShare('duplicate')
230         DupeShareHACK[data] = None
231         
232         blkhash = dblsha(data)
233         if blkhash[28:] != b'\0\0\0\0':
234                 raise RejectedShare('H-not-zero')
235         blkhashn = hash2int(blkhash)
236         
237         global networkTarget
238         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
239         logfunc('BLKHASH: %64x' % (blkhashn,))
240         logfunc(' TARGET: %64x' % (networkTarget,))
241         
242         workMerkleTree = wld[1]
243         workCoinbase = wld[2]
244         
245         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
246         txlist = workMerkleTree.data
247         txlist = [deepcopy(txlist[0]),] + txlist[1:]
248         cbtxn = txlist[0]
249         cbtxn.setCoinbase(workCoinbase)
250         cbtxn.assemble()
251         
252         if blkhashn <= networkTarget:
253                 logfunc("Submitting upstream")
254                 RBDs.append( deepcopy( (data, txlist, share.get('blkdata', None), workMerkleTree) ) )
255                 if not moden:
256                         payload = assembleBlock(data, txlist)
257                 else:
258                         payload = share['data'] + share['blkdata']
259                 logfunc('Real block payload: %s' % (payload,))
260                 RBPs.append(payload)
261                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
262                 bcnode.submitBlock(payload)
263                 share['upstreamResult'] = True
264                 MM.updateBlock(blkhash)
265         
266         # Gotwork hack...
267         if gotwork and blkhashn <= config.GotWorkTarget:
268                 try:
269                         coinbaseMrkl = cbtxn.data
270                         coinbaseMrkl += blkhash
271                         steps = workMerkleTree._steps
272                         coinbaseMrkl += pack('B', len(steps))
273                         for step in steps:
274                                 coinbaseMrkl += step
275                         coinbaseMrkl += b"\0\0\0\0"
276                         info = {}
277                         info['hash'] = b2a_hex(blkhash).decode('ascii')
278                         info['header'] = b2a_hex(data).decode('ascii')
279                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
280                         thr = threading.Thread(target=submitGotwork, args=(info,))
281                         thr.daemon = True
282                         thr.start()
283                 except:
284                         checkShare.logger.warning('Failed to build gotwork request')
285         
286         if blkhashn > config.ShareTarget:
287                 raise RejectedShare('high-hash')
288         share['target'] = config.ShareTarget
289         share['_targethex'] = _STA
290         
291         shareTimestamp = unpack('<L', data[68:72])[0]
292         if shareTime < issueT - 120:
293                 raise RejectedShare('stale-work')
294         if shareTimestamp < shareTime - 300:
295                 raise RejectedShare('time-too-old')
296         if shareTimestamp > shareTime + 7200:
297                 raise RejectedShare('time-too-new')
298         
299         if moden:
300                 cbpre = cbtxn.getCoinbase()
301                 cbpreLen = len(cbpre)
302                 if coinbase[:cbpreLen] != cbpre:
303                         raise RejectedShare('bad-cb-prefix')
304                 
305                 # Filter out known "I support" flags, to prevent exploits
306                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
307                         if coinbase.find(ff) > max(-1, cbpreLen - len(ff)):
308                                 raise RejectedShare('bad-cb-flag')
309                 
310                 if len(coinbase) > 100:
311                         raise RejectedShare('bad-cb-length')
312                 
313                 cbtxn.setCoinbase(coinbase)
314                 cbtxn.assemble()
315                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
316                         raise RejectedShare('bad-txnmrklroot')
317                 
318                 allowed = assembleBlock(data, txlist)
319                 if allowed != share['data'] + share['blkdata']:
320                         raise RejectedShare('bad-txns')
321 checkShare.logger = logging.getLogger('checkShare')
322
323 def receiveShare(share):
324         # TODO: username => userid
325         try:
326                 checkShare(share)
327         except RejectedShare as rej:
328                 share['rejectReason'] = str(rej)
329                 raise
330         finally:
331                 if '_origdata' in share:
332                         share['solution'] = share['_origdata']
333                 else:
334                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
335                 for i in loggersShare:
336                         i(share)
337
338 def newBlockNotification():
339         logging.getLogger('newBlockNotification').info('Received new block notification')
340         MM.updateMerkleTree()
341         # TODO: Force RESPOND TO LONGPOLLS?
342         pass
343
344 def newBlockNotificationSIGNAL(signum, frame):
345         # Use a new thread, in case the signal handler is called with locks held
346         thr = threading.Thread(target=newBlockNotification, name='newBlockNotification via signal %s' % (signum,))
347         thr.daemon = True
348         thr.start()
349
350 from signal import signal, SIGUSR1
351 signal(SIGUSR1, newBlockNotificationSIGNAL)
352
353
354 import os
355 import os.path
356 import pickle
357 import signal
358 import sys
359 from time import sleep
360 import traceback
361
362 SAVE_STATE_FILENAME = 'eloipool.worklog'
363
364 def stopServers():
365         logger = logging.getLogger('stopServers')
366         
367         if hasattr(stopServers, 'already'):
368                 logger.debug('Already tried to stop servers before')
369                 return
370         stopServers.already = True
371         
372         logger.info('Stopping servers...')
373         global bcnode, server
374         servers = (bcnode, server)
375         for s in servers:
376                 s.keepgoing = False
377         for s in servers:
378                 try:
379                         s.wakeup()
380                 except:
381                         logger.error('Failed to stop server %s\n%s' % (s, traceback.format_exc()))
382         i = 0
383         while True:
384                 sl = []
385                 for s in servers:
386                         if s.running:
387                                 sl.append(s.__class__.__name__)
388                 if not sl:
389                         break
390                 i += 1
391                 if i >= 0x100:
392                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
393                         break
394                 sleep(0.01)
395         
396         for s in servers:
397                 for fd in s._fd.keys():
398                         os.close(fd)
399
400 def saveState(t = None):
401         logger = logging.getLogger('saveState')
402         
403         # Then, save data needed to resume work
404         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
405         i = 0
406         while True:
407                 try:
408                         with open(SAVE_STATE_FILENAME, 'wb') as f:
409                                 pickle.dump(t, f)
410                                 pickle.dump(DupeShareHACK, f)
411                                 pickle.dump(workLog, f)
412                         break
413                 except:
414                         i += 1
415                         if i >= 0x10000:
416                                 logger.error('Failed to save work\n' + traceback.format_exc())
417                                 try:
418                                         os.unlink(SAVE_STATE_FILENAME)
419                                 except:
420                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
421
422 def exit():
423         t = time()
424         stopServers()
425         saveState(t)
426         logging.getLogger('exit').info('Goodbye...')
427         os.kill(os.getpid(), signal.SIGTERM)
428         sys.exit(0)
429
430 def restart():
431         t = time()
432         stopServers()
433         saveState(t)
434         logging.getLogger('restart').info('Restarting...')
435         try:
436                 os.execv(sys.argv[0], sys.argv)
437         except:
438                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
439
440 def restoreState():
441         if not os.path.exists(SAVE_STATE_FILENAME):
442                 return
443         
444         global workLog, DupeShareHACK
445         
446         logger = logging.getLogger('restoreState')
447         s = os.stat(SAVE_STATE_FILENAME)
448         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
449         try:
450                 with open(SAVE_STATE_FILENAME, 'rb') as f:
451                         t = pickle.load(f)
452                         if type(t) == tuple:
453                                 if len(t) > 2:
454                                         # Future formats, not supported here
455                                         ver = t[3]
456                                         # TODO
457                                 
458                                 # Old format, from 2012-02-02 to 2012-02-03
459                                 workLog = t[0]
460                                 DupeShareHACK = t[1]
461                                 t = None
462                         else:
463                                 if isinstance(t, dict):
464                                         # Old format, from 2012-02-03 to 2012-02-03
465                                         DupeShareHACK = t
466                                         t = None
467                                 else:
468                                         # Current format, from 2012-02-03 onward
469                                         DupeShareHACK = pickle.load(f)
470                                 
471                                 if t + 120 >= time():
472                                         workLog = pickle.load(f)
473                                 else:
474                                         logger.debug('Skipping restore of expired workLog')
475         except:
476                 logger.error('Failed to restore state\n' + traceback.format_exc())
477                 return
478         logger.info('State restored successfully')
479         if t:
480                 logger.info('Total downtime: %g seconds' % (time() - t,))
481
482
483 from jsonrpcserver import JSONRPCListener, JSONRPCServer
484 import interactivemode
485 from networkserver import NetworkListener
486 import threading
487 import sharelogging
488 import imp
489
490 if __name__ == "__main__":
491         if not hasattr(config, 'ShareLogging'):
492                 config.ShareLogging = ()
493         if hasattr(config, 'DbOptions'):
494                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
495                 config.ShareLogging = list(config.ShareLogging)
496                 config.ShareLogging.append( {
497                         'type': 'sql',
498                         'engine': 'postgres',
499                         'dbopts': config.DbOptions,
500                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
501                 } )
502         for i in config.ShareLogging:
503                 if not hasattr(i, 'keys'):
504                         name, parameters = i
505                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
506                         if name == 'postgres':
507                                 name = 'sql'
508                                 i = {
509                                         'engine': 'postgres',
510                                         'dbopts': parameters,
511                                 }
512                         elif name == 'logfile':
513                                 i = {}
514                                 i['thropts'] = parameters
515                                 if 'filename' in parameters:
516                                         i['filename'] = parameters['filename']
517                                         i['thropts'] = dict(i['thropts'])
518                                         del i['thropts']['filename']
519                         else:
520                                 i = parameters
521                         i['type'] = name
522                 
523                 name = i['type']
524                 parameters = i
525                 try:
526                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
527                         m = imp.load_module(name, fp, pathname, description)
528                         lo = getattr(m, name)(**parameters)
529                         loggersShare.append(lo.logShare)
530                 except:
531                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
532
533         LSbc = []
534         if not hasattr(config, 'BitcoinNodeAddresses'):
535                 config.BitcoinNodeAddresses = ()
536         for a in config.BitcoinNodeAddresses:
537                 LSbc.append(NetworkListener(bcnode, a))
538         
539         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
540                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
541         
542         import jsonrpc_getmemorypool
543         import jsonrpc_getwork
544         import jsonrpc_setworkaux
545         
546         server = JSONRPCServer()
547         if hasattr(config, 'JSONRPCAddress'):
548                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
549                 if not hasattr(config, 'JSONRPCAddresses'):
550                         config.JSONRPCAddresses = []
551                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
552         LS = []
553         for a in config.JSONRPCAddresses:
554                 LS.append(JSONRPCListener(server, a))
555         if hasattr(config, 'SecretUser'):
556                 server.SecretUser = config.SecretUser
557         server.aux = MM.CoinbaseAux
558         server.getBlockHeader = getBlockHeader
559         server.getBlockTemplate = getBlockTemplate
560         server.receiveShare = receiveShare
561         server.RaiseRedFlags = RaiseRedFlags
562         server.ShareTarget = config.ShareTarget
563         
564         if hasattr(config, 'TrustedForwarders'):
565                 server.TrustedForwarders = config.TrustedForwarders
566         server.ServerName = config.ServerName
567         
568         MM.start()
569         
570         restoreState()
571         
572         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
573         bcnode_thr.daemon = True
574         bcnode_thr.start()
575         
576         server.serve_forever()