Bugfix: Move TrustedForwarders initialization to NetworkServer constructor
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 if len(logging.root.handlers) == 0:
24         logging.basicConfig(
25                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
26                 level=logging.DEBUG,
27         )
28         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
29                 logging.getLogger(infoOnly).setLevel(logging.INFO)
30
31 def RaiseRedFlags(reason):
32         logging.getLogger('redflag').critical(reason)
33         return reason
34
35
36 from bitcoin.node import BitcoinLink, BitcoinNode
37 bcnode = BitcoinNode(config.UpstreamNetworkId)
38 bcnode.userAgent += b'Eloipool:0.1/'
39
40 import jsonrpc
41 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
42
43
44 from bitcoin.script import BitcoinScript
45 from bitcoin.txn import Txn
46 from base58 import b58decode
47 from struct import pack
48 import subprocess
49 from time import time
50
51 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
52         txn = Txn.new()
53         
54         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
55                 coinbased = 0
56                 try:
57                         cmd = config.CoinbaserCmd
58                         cmd = cmd.replace('%d', str(coinbaseValue))
59                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
60                         nout = int(p.stdout.readline())
61                         for i in range(nout):
62                                 amount = int(p.stdout.readline())
63                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
64                                 pkScript = BitcoinScript.toAddress(addr)
65                                 txn.addOutput(amount, pkScript)
66                                 coinbased += amount
67                 except:
68                         coinbased = coinbaseValue + 1
69                 if coinbased >= coinbaseValue:
70                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
71                         txn.outputs = []
72                 else:
73                         coinbaseValue -= coinbased
74         
75         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
76         txn.addOutput(coinbaseValue, pkScript)
77         
78         # TODO
79         # TODO: red flag on dupe coinbase
80         return txn
81
82
83 import jsonrpc_getwork
84 from util import Bits2Target
85
86 workLog = {}
87 networkTarget = None
88 DupeShareHACK = {}
89
90 server = None
91 def updateBlocks():
92         if server:
93                 server.wakeLongpoll()
94
95 def blockChanged():
96         global DupeShareHACK
97         DupeShareHACK = {}
98         jsonrpc_getwork._CheckForDupesHACK = {}
99         global MM, networkTarget, server
100         networkTarget = Bits2Target(MM.currentBlock[1])
101         workLog.clear()
102         updateBlocks()
103
104
105 from merklemaker import merkleMaker
106 MM = merkleMaker()
107 MM.__dict__.update(config.__dict__)
108 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
109 MM.clearCoinbaseTxn.assemble()
110 MM.makeCoinbaseTxn = makeCoinbaseTxn
111 MM.onBlockChange = blockChanged
112 MM.onBlockUpdate = updateBlocks
113 MM.start()
114
115
116 from binascii import b2a_hex
117 from copy import deepcopy
118 from struct import pack, unpack
119 from time import time
120 from util import RejectedShare, dblsha, hash2int, swap32
121 import jsonrpc
122 import threading
123 import traceback
124
125 gotwork = None
126 if hasattr(config, 'GotWorkURI'):
127         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
128
129 def submitGotwork(info):
130         try:
131                 gotwork.gotwork(info)
132         except:
133                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
134
135 def getBlockHeader(username):
136         MRD = MM.getMRD()
137         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
138         timestamp = pack('<L', int(time()))
139         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
140         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
141         return (hdr, workLog[username][merkleRoot])
142
143 def getBlockTemplate(username):
144         MC = MM.getMC()
145         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
146         wliLen = coinbase[0]
147         wli = coinbase[1:wliLen+1]
148         workLog.setdefault(username, {})[wli] = (MC, time())
149         return MC
150
151 loggersShare = []
152
153 RBDs = []
154 RBPs = []
155
156 from bitcoin.varlen import varlenEncode, varlenDecode
157 import bitcoin.txn
158 def assembleBlock(blkhdr, txlist):
159         payload = blkhdr
160         payload += varlenEncode(len(txlist))
161         for tx in txlist:
162                 payload += tx.data
163         return payload
164
165 def blockSubmissionThread(payload):
166         while True:
167                 try:
168                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
169                         break
170                 except:
171                         pass
172
173 def checkShare(share):
174         shareTime = share['time'] = time()
175         
176         data = share['data']
177         data = data[:80]
178         (prevBlock, bits) = MM.currentBlock
179         sharePrevBlock = data[4:36]
180         if sharePrevBlock != prevBlock:
181                 if sharePrevBlock == MM.lastBlock[0]:
182                         raise RejectedShare('stale-prevblk')
183                 raise RejectedShare('bad-prevblk')
184         
185         # TODO: use userid
186         username = share['username']
187         if username not in workLog:
188                 raise RejectedShare('unknown-user')
189         
190         if data[72:76] != bits:
191                 raise RejectedShare('bad-diffbits')
192         if data[:4] != b'\1\0\0\0':
193                 raise RejectedShare('bad-version')
194         
195         shareMerkleRoot = data[36:68]
196         if 'blkdata' in share:
197                 pl = share['blkdata']
198                 (txncount, pl) = varlenDecode(pl)
199                 cbtxn = bitcoin.txn.Txn(pl)
200                 cbtxn.disassemble(retExtra=True)
201                 coinbase = cbtxn.getCoinbase()
202                 wliLen = coinbase[0]
203                 wli = coinbase[1:wliLen+1]
204                 mode = 'MC'
205                 moden = 1
206         else:
207                 wli = shareMerkleRoot
208                 mode = 'MRD'
209                 moden = 0
210         
211         MWL = workLog[username]
212         if wli not in MWL:
213                 raise RejectedShare('unknown-work')
214         (wld, issueT) = MWL[wli]
215         share[mode] = wld
216         
217         if data in DupeShareHACK:
218                 raise RejectedShare('duplicate')
219         DupeShareHACK[data] = None
220         
221         blkhash = dblsha(data)
222         if blkhash[28:] != b'\0\0\0\0':
223                 raise RejectedShare('H-not-zero')
224         blkhashn = hash2int(blkhash)
225         
226         global networkTarget
227         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
228         logfunc('BLKHASH: %64x' % (blkhashn,))
229         logfunc(' TARGET: %64x' % (networkTarget,))
230         
231         workMerkleTree = wld[1]
232         workCoinbase = wld[2]
233         
234         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
235         txlist = workMerkleTree.data
236         txlist = [deepcopy(txlist[0]),] + txlist[1:]
237         cbtxn = txlist[0]
238         cbtxn.setCoinbase(workCoinbase)
239         cbtxn.assemble()
240         
241         if blkhashn <= networkTarget:
242                 logfunc("Submitting upstream")
243                 if not moden:
244                         RBDs.append( deepcopy( (data, txlist) ) )
245                         payload = assembleBlock(data, txlist)
246                 else:
247                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
248                         payload = share['data'] + share['blkdata']
249                 logfunc('Real block payload: %s' % (payload,))
250                 RBPs.append(payload)
251                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
252                 bcnode.submitBlock(payload)
253                 share['upstreamResult'] = True
254                 MM.updateBlock(blkhash)
255         
256         # Gotwork hack...
257         if gotwork and blkhashn <= config.GotWorkTarget:
258                 try:
259                         coinbaseMrkl = cbtxn.data
260                         coinbaseMrkl += blkhash
261                         steps = workMerkleTree._steps
262                         coinbaseMrkl += pack('B', len(steps))
263                         for step in steps:
264                                 coinbaseMrkl += step
265                         coinbaseMrkl += b"\0\0\0\0"
266                         info = {}
267                         info['hash'] = b2a_hex(blkhash).decode('ascii')
268                         info['header'] = b2a_hex(data).decode('ascii')
269                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
270                         thr = threading.Thread(target=submitGotwork, args=(info,))
271                         thr.daemon = True
272                         thr.start()
273                 except:
274                         checkShare.logger.warning('Failed to build gotwork request')
275         
276         shareTimestamp = unpack('<L', data[68:72])[0]
277         if shareTime < issueT - 120:
278                 raise RejectedShare('stale-work')
279         if shareTimestamp < shareTime - 300:
280                 raise RejectedShare('time-too-old')
281         if shareTimestamp > shareTime + 7200:
282                 raise RejectedShare('time-too-new')
283         
284         if moden:
285                 cbpre = cbtxn.getCoinbase()
286                 cbpreLen = len(cbpre)
287                 if coinbase[:cbpreLen] != cbpre:
288                         raise RejectedShare('bad-cb-prefix')
289                 
290                 # Filter out known "I support" flags, to prevent exploits
291                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
292                         if coinbase.find(ff) > cbpreLen - len(ff):
293                                 raise RejectedShare('bad-cb-flag')
294                 
295                 if len(coinbase) > 100:
296                         raise RejectedShare('bad-cb-length')
297                 
298                 cbtxn.setCoinbase(coinbase)
299                 cbtxn.assemble()
300                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
301                         raise RejectedShare('bad-txnmrklroot')
302                 
303                 allowed = assembleBlock(data, txlist)
304                 if allowed != share['data'] + share['blkdata']:
305                         raise RejectedShare('bad-txns')
306 checkShare.logger = logging.getLogger('checkShare')
307
308 def receiveShare(share):
309         # TODO: username => userid
310         try:
311                 checkShare(share)
312         except RejectedShare as rej:
313                 share['rejectReason'] = str(rej)
314                 raise
315         finally:
316                 if '_origdata' in share:
317                         share['solution'] = share['_origdata']
318                 else:
319                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
320                 for i in loggersShare:
321                         i(share)
322
323 def newBlockNotification(signum, frame):
324         logging.getLogger('newBlockNotification').info('Received new block notification')
325         MM.updateMerkleTree()
326         # TODO: Force RESPOND TO LONGPOLLS?
327         pass
328
329 from signal import signal, SIGUSR1
330 signal(SIGUSR1, newBlockNotification)
331
332
333 import os
334 import os.path
335 import pickle
336 import signal
337 import sys
338 from time import sleep
339 import traceback
340
341 SAVE_STATE_FILENAME = 'eloipool.worklog'
342
343 def stopServers():
344         logger = logging.getLogger('stopServers')
345         
346         logger.info('Stopping servers...')
347         global bcnode, server
348         servers = (bcnode, server)
349         for s in servers:
350                 s.keepgoing = False
351         for s in servers:
352                 s.wakeup()
353         i = 0
354         while True:
355                 sl = []
356                 for s in servers:
357                         if s.running:
358                                 sl.append(s.__class__.__name__)
359                 if not sl:
360                         break
361                 i += 1
362                 if i >= 0x100:
363                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
364                         break
365                 sleep(0.01)
366         
367         for s in servers:
368                 for fd in s._fd.keys():
369                         os.close(fd)
370
371 def saveState(t = None):
372         logger = logging.getLogger('saveState')
373         
374         # Then, save data needed to resume work
375         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
376         i = 0
377         while True:
378                 try:
379                         with open(SAVE_STATE_FILENAME, 'wb') as f:
380                                 pickle.dump(t, f)
381                                 pickle.dump(DupeShareHACK, f)
382                                 pickle.dump(workLog, f)
383                         break
384                 except:
385                         i += 1
386                         if i >= 0x10000:
387                                 logger.error('Failed to save work\n' + traceback.format_exc())
388                                 try:
389                                         os.unlink(SAVE_STATE_FILENAME)
390                                 except:
391                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
392
393 def exit():
394         t = time()
395         stopServers()
396         saveState(t)
397         logging.getLogger('exit').info('Goodbye...')
398         os.kill(os.getpid(), signal.SIGTERM)
399         sys.exit(0)
400
401 def restart():
402         t = time()
403         stopServers()
404         saveState(t)
405         logging.getLogger('restart').info('Restarting...')
406         try:
407                 os.execv(sys.argv[0], sys.argv)
408         except:
409                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
410
411 def restoreState():
412         if not os.path.exists(SAVE_STATE_FILENAME):
413                 return
414         
415         global workLog, DupeShareHACK
416         
417         logger = logging.getLogger('restoreState')
418         s = os.stat(SAVE_STATE_FILENAME)
419         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
420         try:
421                 with open(SAVE_STATE_FILENAME, 'rb') as f:
422                         t = pickle.load(f)
423                         if type(t) == tuple:
424                                 workLog = t[0]
425                                 DupeShareHACK = t[1]
426                                 t = None
427                         else:
428                                 if isinstance(t, dict):
429                                         DupeShareHACK = t
430                                         t = None
431                                 else:
432                                         DupeShareHACK = pickle.load(f)
433                                 
434                                 if s.st_mtime + 120 >= time():
435                                         workLog = pickle.load(f)
436                                 else:
437                                         logger.debug('Skipping restore of expired workLog')
438         except:
439                 logger.error('Failed to restore state\n' + traceback.format_exc())
440                 return
441         logger.info('State restored successfully')
442         if t:
443                 logger.info('Total downtime: %g seconds' % (time() - t,))
444
445
446 from jsonrpcserver import JSONRPCListener, JSONRPCServer
447 import interactivemode
448 from networkserver import NetworkListener
449 import threading
450 import sharelogging
451 import imp
452
453 if __name__ == "__main__":
454         if not hasattr(config, 'ShareLogging'):
455                 config.ShareLogging = ()
456         if hasattr(config, 'DbOptions'):
457                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
458                 config.ShareLogging = list(config.ShareLogging)
459                 config.ShareLogging.append( {
460                         'type': 'sql',
461                         'engine': 'postgres',
462                         'dbopts': config.DbOptions,
463                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
464                 } )
465         for i in config.ShareLogging:
466                 if not hasattr(i, 'keys'):
467                         name, parameters = i
468                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
469                         if name == 'postgres':
470                                 name = 'sql'
471                                 i = {
472                                         'engine': 'postgres',
473                                         'dbopts': parameters,
474                                 }
475                         elif name == 'logfile':
476                                 i = {}
477                                 i['thropts'] = parameters
478                                 if 'filename' in parameters:
479                                         i['filename'] = parameters['filename']
480                                         i['thropts'] = dict(i['thropts'])
481                                         del i['thropts']['filename']
482                         else:
483                                 i = parameters
484                         i['type'] = name
485                 
486                 name = i['type']
487                 parameters = i
488                 try:
489                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
490                         m = imp.load_module(name, fp, pathname, description)
491                         lo = getattr(m, name)(**parameters)
492                         loggersShare.append(lo.logShare)
493                 except:
494                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
495
496         LSbc = []
497         if not hasattr(config, 'BitcoinNodeAddresses'):
498                 config.BitcoinNodeAddresses = ()
499         for a in config.BitcoinNodeAddresses:
500                 LSbc.append(NetworkListener(bcnode, a))
501         
502         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
503                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
504         
505         import jsonrpc_getmemorypool
506         import jsonrpc_getwork
507         import jsonrpc_setworkaux
508         
509         server = JSONRPCServer()
510         if hasattr(config, 'JSONRPCAddress'):
511                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
512                 if not hasattr(config, 'JSONRPCAddresses'):
513                         config.JSONRPCAddresses = []
514                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
515         LS = []
516         for a in config.JSONRPCAddresses:
517                 LS.append(JSONRPCListener(server, a))
518         if hasattr(config, 'SecretUser'):
519                 server.SecretUser = config.SecretUser
520         server.aux = MM.CoinbaseAux
521         server.getBlockHeader = getBlockHeader
522         server.getBlockTemplate = getBlockTemplate
523         server.receiveShare = receiveShare
524         server.RaiseRedFlags = RaiseRedFlags
525         
526         if hasattr(config, 'TrustedForwarders'):
527                 server.TrustedForwarders = config.TrustedForwarders
528         
529         restoreState()
530         
531         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
532         bcnode_thr.daemon = True
533         bcnode_thr.start()
534         
535         server.serve_forever()