(Debug) logger name consistency
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpc_getwork
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpc_getwork._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int, swap32
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 def getBlockHeader(username):
132         MRD = MM.getMRD()
133         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
134         timestamp = pack('<L', int(time()))
135         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
136         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
137         return hdr
138
139 def getBlockTemplate(username):
140         MC = MM.getMC()
141         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
142         wliLen = coinbase[0]
143         wli = coinbase[1:wliLen+1]
144         workLog.setdefault(username, {})[wli] = (MC, time())
145         return MC
146
147 loggersShare = []
148
149 RBDs = []
150 RBPs = []
151
152 from bitcoin.varlen import varlenEncode, varlenDecode
153 import bitcoin.txn
154 def assembleBlock(blkhdr, txlist):
155         payload = blkhdr
156         payload += varlenEncode(len(txlist))
157         for tx in txlist:
158                 payload += tx.data
159         return payload
160
161 def blockSubmissionThread(payload):
162         while True:
163                 try:
164                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
165                         break
166                 except:
167                         pass
168
169 def checkShare(share):
170         data = share['data']
171         data = data[:80]
172         (prevBlock, bits) = MM.currentBlock
173         sharePrevBlock = data[4:36]
174         if sharePrevBlock != prevBlock:
175                 if sharePrevBlock == MM.lastBlock[0]:
176                         raise RejectedShare('stale-prevblk')
177                 raise RejectedShare('bad-prevblk')
178         
179         # TODO: use userid
180         username = share['username']
181         if username not in workLog:
182                 raise RejectedShare('unknown-user')
183         
184         if data[72:76] != bits:
185                 raise RejectedShare('bad-diffbits')
186         if data[:4] != b'\1\0\0\0':
187                 raise RejectedShare('bad-version')
188         
189         shareMerkleRoot = data[36:68]
190         if 'blkdata' in share:
191                 pl = share['blkdata']
192                 (txncount, pl) = varlenDecode(pl)
193                 cbtxn = bitcoin.txn.Txn(pl)
194                 cbtxn.disassemble(retExtra=True)
195                 coinbase = cbtxn.getCoinbase()
196                 wliLen = coinbase[0]
197                 wli = coinbase[1:wliLen+1]
198                 mode = 'MC'
199                 moden = 1
200         else:
201                 wli = shareMerkleRoot
202                 mode = 'MRD'
203                 moden = 0
204         
205         MWL = workLog[username]
206         if wli not in MWL:
207                 raise RejectedShare('unknown-work')
208         (wld, issueT) = MWL[wli]
209         share[mode] = wld
210         
211         if data in DupeShareHACK:
212                 raise RejectedShare('duplicate')
213         DupeShareHACK[data] = None
214         
215         shareTime = share['time'] = time()
216         
217         blkhash = dblsha(data)
218         if blkhash[28:] != b'\0\0\0\0':
219                 raise RejectedShare('H-not-zero')
220         blkhashn = hash2int(blkhash)
221         
222         global networkTarget
223         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
224         logfunc('BLKHASH: %64x' % (blkhashn,))
225         logfunc(' TARGET: %64x' % (networkTarget,))
226         
227         workMerkleTree = wld[1]
228         workCoinbase = wld[2]
229         
230         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
231         txlist = workMerkleTree.data
232         cbtxn = txlist[0]
233         cbtxn.setCoinbase(workCoinbase)
234         cbtxn.assemble()
235         
236         if blkhashn <= networkTarget:
237                 logfunc("Submitting upstream")
238                 if not moden:
239                         RBDs.append( deepcopy( (data, txlist) ) )
240                         payload = assembleBlock(data, txlist)
241                 else:
242                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
243                         payload = share['data'] + share['blkdata']
244                 logfunc('Real block payload: %s' % (payload,))
245                 RBPs.append(payload)
246                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
247                 bcnode.submitBlock(payload)
248                 share['upstreamResult'] = True
249                 MM.updateBlock(blkhash)
250         
251         # Gotwork hack...
252         if gotwork and blkhashn <= config.GotWorkTarget:
253                 try:
254                         coinbaseMrkl = cbtxn.data
255                         coinbaseMrkl += blkhash
256                         steps = workMerkleTree._steps
257                         coinbaseMrkl += pack('B', len(steps))
258                         for step in steps:
259                                 coinbaseMrkl += step
260                         coinbaseMrkl += b"\0\0\0\0"
261                         info = {}
262                         info['hash'] = b2a_hex(blkhash).decode('ascii')
263                         info['header'] = b2a_hex(data).decode('ascii')
264                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
265                         thr = threading.Thread(target=submitGotwork, args=(info,))
266                         thr.daemon = True
267                         thr.start()
268                 except:
269                         checkShare.logger.warning('Failed to build gotwork request')
270         
271         shareTimestamp = unpack('<L', data[68:72])[0]
272         if shareTime < issueT - 120:
273                 raise RejectedShare('stale-work')
274         if shareTimestamp < shareTime - 300:
275                 raise RejectedShare('time-too-old')
276         if shareTimestamp > shareTime + 7200:
277                 raise RejectedShare('time-too-new')
278         
279         if moden:
280                 cbpre = cbtxn.getCoinbase()
281                 cbpreLen = len(cbpre)
282                 if coinbase[:cbpreLen] != cbpre:
283                         raise RejectedShare('bad-cb-prefix')
284                 
285                 # Filter out known "I support" flags, to prevent exploits
286                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
287                         if coinbase.find(ff) > cbpreLen - len(ff):
288                                 raise RejectedShare('bad-cb-flag')
289                 
290                 if len(coinbase) > 100:
291                         raise RejectedShare('bad-cb-length')
292                 
293                 cbtxn = deepcopy(cbtxn)
294                 cbtxn.setCoinbase(coinbase)
295                 cbtxn.assemble()
296                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
297                         raise RejectedShare('bad-txnmrklroot')
298                 
299                 txlist = [cbtxn,] + txlist[1:]
300                 allowed = assembleBlock(data, txlist)
301                 if allowed != share['data'] + share['blkdata']:
302                         raise RejectedShare('bad-txns')
303 checkShare.logger = logging.getLogger('checkShare')
304
305 def receiveShare(share):
306         # TODO: username => userid
307         try:
308                 checkShare(share)
309         except RejectedShare as rej:
310                 share['rejectReason'] = str(rej)
311                 raise
312         finally:
313                 if '_origdata' in share:
314                         share['solution'] = share['_origdata']
315                 else:
316                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
317                 for i in loggersShare:
318                         i(share)
319
320 def newBlockNotification(signum, frame):
321         logging.getLogger('newBlockNotification').info('Received new block notification')
322         MM.updateMerkleTree()
323         # TODO: Force RESPOND TO LONGPOLLS?
324         pass
325
326 from signal import signal, SIGUSR1
327 signal(SIGUSR1, newBlockNotification)
328
329
330 import os
331 import os.path
332 import pickle
333 import signal
334 import sys
335 from time import sleep
336 import traceback
337
338 SAVE_STATE_FILENAME = 'eloipool.worklog'
339
340 def stopServers():
341         logger = logging.getLogger('stopServers')
342         
343         logger.info('Stopping servers...')
344         global bcnode, server
345         servers = (bcnode, server)
346         for s in servers:
347                 s.keepgoing = False
348         for s in servers:
349                 s.wakeup()
350         i = 0
351         while True:
352                 sl = []
353                 for s in servers:
354                         if s.running:
355                                 sl.append(s.__class__.__name__)
356                 if not sl:
357                         break
358                 i += 1
359                 if i >= 0x100:
360                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
361                         break
362                 sleep(0.01)
363         
364         for s in servers:
365                 for fd in s._fd.keys():
366                         os.close(fd)
367
368 def saveState(t = None):
369         logger = logging.getLogger('saveState')
370         
371         # Then, save data needed to resume work
372         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
373         i = 0
374         while True:
375                 try:
376                         with open(SAVE_STATE_FILENAME, 'wb') as f:
377                                 pickle.dump(t, f)
378                                 pickle.dump(DupeShareHACK, f)
379                                 pickle.dump(workLog, f)
380                         break
381                 except:
382                         i += 1
383                         if i >= 0x10000:
384                                 logger.error('Failed to save work\n' + traceback.format_exc())
385                                 try:
386                                         os.unlink(SAVE_STATE_FILENAME)
387                                 except:
388                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
389
390 def exit():
391         t = time()
392         stopServers()
393         saveState(t)
394         logging.getLogger('exit').info('Goodbye...')
395         os.kill(os.getpid(), signal.SIGTERM)
396         sys.exit(0)
397
398 def restart():
399         t = time()
400         stopServers()
401         saveState(t)
402         logging.getLogger('restart').info('Restarting...')
403         try:
404                 os.execv(sys.argv[0], sys.argv)
405         except:
406                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
407
408 def restoreState():
409         if not os.path.exists(SAVE_STATE_FILENAME):
410                 return
411         
412         global workLog, DupeShareHACK
413         
414         logger = logging.getLogger('restoreState')
415         s = os.stat(SAVE_STATE_FILENAME)
416         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
417         try:
418                 with open(SAVE_STATE_FILENAME, 'rb') as f:
419                         t = pickle.load(f)
420                         if type(t) == tuple:
421                                 workLog = t[0]
422                                 DupeShareHACK = t[1]
423                                 t = None
424                         else:
425                                 if isinstance(t, dict):
426                                         DupeShareHACK = t
427                                         t = None
428                                 else:
429                                         DupeShareHACK = pickle.load(f)
430                                 
431                                 if s.st_mtime + 120 >= time():
432                                         workLog = pickle.load(f)
433                                 else:
434                                         logger.debug('Skipping restore of expired workLog')
435         except:
436                 logger.error('Failed to restore state\n' + traceback.format_exc())
437                 return
438         logger.info('State restored successfully')
439         if t:
440                 logger.info('Total downtime: %g seconds' % (time() - t,))
441
442
443 from jsonrpcserver import JSONRPCListener, JSONRPCServer
444 import interactivemode
445 from networkserver import NetworkListener
446 import threading
447 import sharelogging
448 import imp
449
450 if __name__ == "__main__":
451         if not hasattr(config, 'ShareLogging'):
452                 config.ShareLogging = ()
453         if hasattr(config, 'DbOptions'):
454                 config.ShareLogging = list(config.ShareLogging)
455                 config.ShareLogging.append( ('postgres', config.DbOptions) )
456         for i in config.ShareLogging:
457                 name, parameters = i
458                 try:
459                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
460                         m = imp.load_module(name, fp, pathname, description)
461                         lo = getattr(m, name)(**parameters)
462                         loggersShare.append(lo.logShare)
463                 except:
464                         logging.getLogger('sharelogging').warn("Error setting up share logger %s: %s", name,  sys.exc_info())
465
466         LSbc = []
467         if not hasattr(config, 'BitcoinNodeAddresses'):
468                 config.BitcoinNodeAddresses = ()
469         for a in config.BitcoinNodeAddresses:
470                 LSbc.append(NetworkListener(bcnode, a))
471         
472         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
473                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
474         
475         import jsonrpc_getmemorypool
476         import jsonrpc_getwork
477         import jsonrpc_setworkaux
478         
479         server = JSONRPCServer()
480         if hasattr(config, 'JSONRPCAddress'):
481                 if not hasattr(config, 'JSONRPCAddresses'):
482                         config.JSONRPCAddresses = []
483                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
484         LS = []
485         for a in config.JSONRPCAddresses:
486                 LS.append(JSONRPCListener(server, a))
487         if hasattr(config, 'SecretUser'):
488                 server.SecretUser = config.SecretUser
489         server.aux = MM.CoinbaseAux
490         server.getBlockHeader = getBlockHeader
491         server.getBlockTemplate = getBlockTemplate
492         server.receiveShare = receiveShare
493         server.RaiseRedFlags = RaiseRedFlags
494         
495         restoreState()
496         
497         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
498         bcnode_thr.daemon = True
499         bcnode_thr.start()
500         
501         server.serve_forever()