Plan on removal of deprecated config variables in a year
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpc_getwork
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpc_getwork._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int, swap32
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 def getBlockHeader(username):
132         MRD = MM.getMRD()
133         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
134         timestamp = pack('<L', int(time()))
135         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
136         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
137         return hdr
138
139 def getBlockTemplate(username):
140         MC = MM.getMC()
141         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
142         wliLen = coinbase[0]
143         wli = coinbase[1:wliLen+1]
144         workLog.setdefault(username, {})[wli] = (MC, time())
145         return MC
146
147 loggersShare = []
148
149 RBDs = []
150 RBPs = []
151
152 from bitcoin.varlen import varlenEncode, varlenDecode
153 import bitcoin.txn
154 def assembleBlock(blkhdr, txlist):
155         payload = blkhdr
156         payload += varlenEncode(len(txlist))
157         for tx in txlist:
158                 payload += tx.data
159         return payload
160
161 def blockSubmissionThread(payload):
162         while True:
163                 try:
164                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
165                         break
166                 except:
167                         pass
168
169 def checkShare(share):
170         shareTime = share['time'] = time()
171         
172         data = share['data']
173         data = data[:80]
174         (prevBlock, bits) = MM.currentBlock
175         sharePrevBlock = data[4:36]
176         if sharePrevBlock != prevBlock:
177                 if sharePrevBlock == MM.lastBlock[0]:
178                         raise RejectedShare('stale-prevblk')
179                 raise RejectedShare('bad-prevblk')
180         
181         # TODO: use userid
182         username = share['username']
183         if username not in workLog:
184                 raise RejectedShare('unknown-user')
185         
186         if data[72:76] != bits:
187                 raise RejectedShare('bad-diffbits')
188         if data[:4] != b'\1\0\0\0':
189                 raise RejectedShare('bad-version')
190         
191         shareMerkleRoot = data[36:68]
192         if 'blkdata' in share:
193                 pl = share['blkdata']
194                 (txncount, pl) = varlenDecode(pl)
195                 cbtxn = bitcoin.txn.Txn(pl)
196                 cbtxn.disassemble(retExtra=True)
197                 coinbase = cbtxn.getCoinbase()
198                 wliLen = coinbase[0]
199                 wli = coinbase[1:wliLen+1]
200                 mode = 'MC'
201                 moden = 1
202         else:
203                 wli = shareMerkleRoot
204                 mode = 'MRD'
205                 moden = 0
206         
207         MWL = workLog[username]
208         if wli not in MWL:
209                 raise RejectedShare('unknown-work')
210         (wld, issueT) = MWL[wli]
211         share[mode] = wld
212         
213         if data in DupeShareHACK:
214                 raise RejectedShare('duplicate')
215         DupeShareHACK[data] = None
216         
217         blkhash = dblsha(data)
218         if blkhash[28:] != b'\0\0\0\0':
219                 raise RejectedShare('H-not-zero')
220         blkhashn = hash2int(blkhash)
221         
222         global networkTarget
223         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
224         logfunc('BLKHASH: %64x' % (blkhashn,))
225         logfunc(' TARGET: %64x' % (networkTarget,))
226         
227         workMerkleTree = wld[1]
228         workCoinbase = wld[2]
229         
230         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
231         txlist = workMerkleTree.data
232         cbtxn = txlist[0]
233         cbtxn.setCoinbase(workCoinbase)
234         cbtxn.assemble()
235         
236         if blkhashn <= networkTarget:
237                 logfunc("Submitting upstream")
238                 if not moden:
239                         RBDs.append( deepcopy( (data, txlist) ) )
240                         payload = assembleBlock(data, txlist)
241                 else:
242                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
243                         payload = share['data'] + share['blkdata']
244                 logfunc('Real block payload: %s' % (payload,))
245                 RBPs.append(payload)
246                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
247                 bcnode.submitBlock(payload)
248                 share['upstreamResult'] = True
249                 MM.updateBlock(blkhash)
250         
251         # Gotwork hack...
252         if gotwork and blkhashn <= config.GotWorkTarget:
253                 try:
254                         coinbaseMrkl = cbtxn.data
255                         coinbaseMrkl += blkhash
256                         steps = workMerkleTree._steps
257                         coinbaseMrkl += pack('B', len(steps))
258                         for step in steps:
259                                 coinbaseMrkl += step
260                         coinbaseMrkl += b"\0\0\0\0"
261                         info = {}
262                         info['hash'] = b2a_hex(blkhash).decode('ascii')
263                         info['header'] = b2a_hex(data).decode('ascii')
264                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
265                         thr = threading.Thread(target=submitGotwork, args=(info,))
266                         thr.daemon = True
267                         thr.start()
268                 except:
269                         checkShare.logger.warning('Failed to build gotwork request')
270         
271         shareTimestamp = unpack('<L', data[68:72])[0]
272         if shareTime < issueT - 120:
273                 raise RejectedShare('stale-work')
274         if shareTimestamp < shareTime - 300:
275                 raise RejectedShare('time-too-old')
276         if shareTimestamp > shareTime + 7200:
277                 raise RejectedShare('time-too-new')
278         
279         if moden:
280                 cbpre = cbtxn.getCoinbase()
281                 cbpreLen = len(cbpre)
282                 if coinbase[:cbpreLen] != cbpre:
283                         raise RejectedShare('bad-cb-prefix')
284                 
285                 # Filter out known "I support" flags, to prevent exploits
286                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
287                         if coinbase.find(ff) > cbpreLen - len(ff):
288                                 raise RejectedShare('bad-cb-flag')
289                 
290                 if len(coinbase) > 100:
291                         raise RejectedShare('bad-cb-length')
292                 
293                 cbtxn = deepcopy(cbtxn)
294                 cbtxn.setCoinbase(coinbase)
295                 cbtxn.assemble()
296                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
297                         raise RejectedShare('bad-txnmrklroot')
298                 
299                 txlist = [cbtxn,] + txlist[1:]
300                 allowed = assembleBlock(data, txlist)
301                 if allowed != share['data'] + share['blkdata']:
302                         raise RejectedShare('bad-txns')
303 checkShare.logger = logging.getLogger('checkShare')
304
305 def receiveShare(share):
306         # TODO: username => userid
307         try:
308                 checkShare(share)
309         except RejectedShare as rej:
310                 share['rejectReason'] = str(rej)
311                 raise
312         finally:
313                 if '_origdata' in share:
314                         share['solution'] = share['_origdata']
315                 else:
316                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
317                 for i in loggersShare:
318                         i(share)
319
320 def newBlockNotification(signum, frame):
321         logging.getLogger('newBlockNotification').info('Received new block notification')
322         MM.updateMerkleTree()
323         # TODO: Force RESPOND TO LONGPOLLS?
324         pass
325
326 from signal import signal, SIGUSR1
327 signal(SIGUSR1, newBlockNotification)
328
329
330 import os
331 import os.path
332 import pickle
333 import signal
334 import sys
335 from time import sleep
336 import traceback
337
338 SAVE_STATE_FILENAME = 'eloipool.worklog'
339
340 def stopServers():
341         logger = logging.getLogger('stopServers')
342         
343         logger.info('Stopping servers...')
344         global bcnode, server
345         servers = (bcnode, server)
346         for s in servers:
347                 s.keepgoing = False
348         for s in servers:
349                 s.wakeup()
350         i = 0
351         while True:
352                 sl = []
353                 for s in servers:
354                         if s.running:
355                                 sl.append(s.__class__.__name__)
356                 if not sl:
357                         break
358                 i += 1
359                 if i >= 0x100:
360                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
361                         break
362                 sleep(0.01)
363         
364         for s in servers:
365                 for fd in s._fd.keys():
366                         os.close(fd)
367
368 def saveState(t = None):
369         logger = logging.getLogger('saveState')
370         
371         # Then, save data needed to resume work
372         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
373         i = 0
374         while True:
375                 try:
376                         with open(SAVE_STATE_FILENAME, 'wb') as f:
377                                 pickle.dump(t, f)
378                                 pickle.dump(DupeShareHACK, f)
379                                 pickle.dump(workLog, f)
380                         break
381                 except:
382                         i += 1
383                         if i >= 0x10000:
384                                 logger.error('Failed to save work\n' + traceback.format_exc())
385                                 try:
386                                         os.unlink(SAVE_STATE_FILENAME)
387                                 except:
388                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
389
390 def exit():
391         t = time()
392         stopServers()
393         saveState(t)
394         logging.getLogger('exit').info('Goodbye...')
395         os.kill(os.getpid(), signal.SIGTERM)
396         sys.exit(0)
397
398 def restart():
399         t = time()
400         stopServers()
401         saveState(t)
402         logging.getLogger('restart').info('Restarting...')
403         try:
404                 os.execv(sys.argv[0], sys.argv)
405         except:
406                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
407
408 def restoreState():
409         if not os.path.exists(SAVE_STATE_FILENAME):
410                 return
411         
412         global workLog, DupeShareHACK
413         
414         logger = logging.getLogger('restoreState')
415         s = os.stat(SAVE_STATE_FILENAME)
416         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
417         try:
418                 with open(SAVE_STATE_FILENAME, 'rb') as f:
419                         t = pickle.load(f)
420                         if type(t) == tuple:
421                                 workLog = t[0]
422                                 DupeShareHACK = t[1]
423                                 t = None
424                         else:
425                                 if isinstance(t, dict):
426                                         DupeShareHACK = t
427                                         t = None
428                                 else:
429                                         DupeShareHACK = pickle.load(f)
430                                 
431                                 if s.st_mtime + 120 >= time():
432                                         workLog = pickle.load(f)
433                                 else:
434                                         logger.debug('Skipping restore of expired workLog')
435         except:
436                 logger.error('Failed to restore state\n' + traceback.format_exc())
437                 return
438         logger.info('State restored successfully')
439         if t:
440                 logger.info('Total downtime: %g seconds' % (time() - t,))
441
442
443 from jsonrpcserver import JSONRPCListener, JSONRPCServer
444 import interactivemode
445 from networkserver import NetworkListener
446 import threading
447 import sharelogging
448 import imp
449
450 if __name__ == "__main__":
451         if not hasattr(config, 'ShareLogging'):
452                 config.ShareLogging = ()
453         if hasattr(config, 'DbOptions'):
454                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
455                 config.ShareLogging = list(config.ShareLogging)
456                 config.ShareLogging.append( {
457                         'type': 'sql',
458                         'engine': 'postgres',
459                         'dbopts': config.DbOptions,
460                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
461                 } )
462         for i in config.ShareLogging:
463                 if not hasattr(i, 'keys'):
464                         name, parameters = i
465                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
466                         if name == 'postgres':
467                                 name = 'sql'
468                                 i = {
469                                         'engine': 'postgres',
470                                         'dbopts': parameters,
471                                 }
472                         elif name == 'logfile':
473                                 i = {}
474                                 i['thropts'] = parameters
475                                 if 'filename' in parameters:
476                                         i['filename'] = parameters['filename']
477                                         i['thropts'] = dict(i['thropts'])
478                                         del i['thropts']['filename']
479                         else:
480                                 i = parameters
481                         i['type'] = name
482                 
483                 name = i['type']
484                 parameters = i
485                 try:
486                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
487                         m = imp.load_module(name, fp, pathname, description)
488                         lo = getattr(m, name)(**parameters)
489                         loggersShare.append(lo.logShare)
490                 except:
491                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
492
493         LSbc = []
494         if not hasattr(config, 'BitcoinNodeAddresses'):
495                 config.BitcoinNodeAddresses = ()
496         for a in config.BitcoinNodeAddresses:
497                 LSbc.append(NetworkListener(bcnode, a))
498         
499         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
500                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
501         
502         import jsonrpc_getmemorypool
503         import jsonrpc_getwork
504         import jsonrpc_setworkaux
505         
506         server = JSONRPCServer()
507         if hasattr(config, 'JSONRPCAddress'):
508                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
509                 if not hasattr(config, 'JSONRPCAddresses'):
510                         config.JSONRPCAddresses = []
511                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
512         LS = []
513         for a in config.JSONRPCAddresses:
514                 LS.append(JSONRPCListener(server, a))
515         if hasattr(config, 'SecretUser'):
516                 server.SecretUser = config.SecretUser
517         server.aux = MM.CoinbaseAux
518         server.getBlockHeader = getBlockHeader
519         server.getBlockTemplate = getBlockTemplate
520         server.receiveShare = receiveShare
521         server.RaiseRedFlags = RaiseRedFlags
522         
523         restoreState()
524         
525         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
526         bcnode_thr.daemon = True
527         bcnode_thr.start()
528         
529         server.serve_forever()