Only initialize logging if it hasn't already been
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 if len(logging.root.handlers) == 0:
24         logging.basicConfig(
25                 format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
26                 level=logging.DEBUG,
27         )
28         for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
29                 logging.getLogger(infoOnly).setLevel(logging.INFO)
30
31 def RaiseRedFlags(reason):
32         logging.getLogger('redflag').critical(reason)
33         return reason
34
35
36 from bitcoin.node import BitcoinLink, BitcoinNode
37 bcnode = BitcoinNode(config.UpstreamNetworkId)
38 bcnode.userAgent += b'Eloipool:0.1/'
39
40 import jsonrpc
41 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
42
43
44 from bitcoin.script import BitcoinScript
45 from bitcoin.txn import Txn
46 from base58 import b58decode
47 from struct import pack
48 import subprocess
49 from time import time
50
51 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
52         txn = Txn.new()
53         
54         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
55                 coinbased = 0
56                 try:
57                         cmd = config.CoinbaserCmd
58                         cmd = cmd.replace('%d', str(coinbaseValue))
59                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
60                         nout = int(p.stdout.readline())
61                         for i in range(nout):
62                                 amount = int(p.stdout.readline())
63                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
64                                 pkScript = BitcoinScript.toAddress(addr)
65                                 txn.addOutput(amount, pkScript)
66                                 coinbased += amount
67                 except:
68                         coinbased = coinbaseValue + 1
69                 if coinbased >= coinbaseValue:
70                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
71                         txn.outputs = []
72                 else:
73                         coinbaseValue -= coinbased
74         
75         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
76         txn.addOutput(coinbaseValue, pkScript)
77         
78         # TODO
79         # TODO: red flag on dupe coinbase
80         return txn
81
82
83 import jsonrpc_getwork
84 from util import Bits2Target
85
86 workLog = {}
87 networkTarget = None
88 DupeShareHACK = {}
89
90 server = None
91 def updateBlocks():
92         if server:
93                 server.wakeLongpoll()
94
95 def blockChanged():
96         global DupeShareHACK
97         DupeShareHACK = {}
98         jsonrpc_getwork._CheckForDupesHACK = {}
99         global MM, networkTarget, server
100         networkTarget = Bits2Target(MM.currentBlock[1])
101         workLog.clear()
102         updateBlocks()
103
104
105 from merklemaker import merkleMaker
106 MM = merkleMaker()
107 MM.__dict__.update(config.__dict__)
108 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
109 MM.clearCoinbaseTxn.assemble()
110 MM.makeCoinbaseTxn = makeCoinbaseTxn
111 MM.onBlockChange = blockChanged
112 MM.onBlockUpdate = updateBlocks
113 MM.start()
114
115
116 from binascii import b2a_hex
117 from copy import deepcopy
118 from struct import pack, unpack
119 from time import time
120 from util import RejectedShare, dblsha, hash2int, swap32
121 import jsonrpc
122 import threading
123 import traceback
124
125 gotwork = None
126 if hasattr(config, 'GotWorkURI'):
127         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
128
129 def submitGotwork(info):
130         try:
131                 gotwork.gotwork(info)
132         except:
133                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
134
135 def getBlockHeader(username):
136         MRD = MM.getMRD()
137         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
138         timestamp = pack('<L', int(time()))
139         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
140         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
141         return hdr
142
143 def getBlockTemplate(username):
144         MC = MM.getMC()
145         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
146         wliLen = coinbase[0]
147         wli = coinbase[1:wliLen+1]
148         workLog.setdefault(username, {})[wli] = (MC, time())
149         return MC
150
151 loggersShare = []
152
153 RBDs = []
154 RBPs = []
155
156 from bitcoin.varlen import varlenEncode, varlenDecode
157 import bitcoin.txn
158 def assembleBlock(blkhdr, txlist):
159         payload = blkhdr
160         payload += varlenEncode(len(txlist))
161         for tx in txlist:
162                 payload += tx.data
163         return payload
164
165 def blockSubmissionThread(payload):
166         while True:
167                 try:
168                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
169                         break
170                 except:
171                         pass
172
173 def checkShare(share):
174         shareTime = share['time'] = time()
175         
176         data = share['data']
177         data = data[:80]
178         (prevBlock, bits) = MM.currentBlock
179         sharePrevBlock = data[4:36]
180         if sharePrevBlock != prevBlock:
181                 if sharePrevBlock == MM.lastBlock[0]:
182                         raise RejectedShare('stale-prevblk')
183                 raise RejectedShare('bad-prevblk')
184         
185         # TODO: use userid
186         username = share['username']
187         if username not in workLog:
188                 raise RejectedShare('unknown-user')
189         
190         if data[72:76] != bits:
191                 raise RejectedShare('bad-diffbits')
192         if data[:4] != b'\1\0\0\0':
193                 raise RejectedShare('bad-version')
194         
195         shareMerkleRoot = data[36:68]
196         if 'blkdata' in share:
197                 pl = share['blkdata']
198                 (txncount, pl) = varlenDecode(pl)
199                 cbtxn = bitcoin.txn.Txn(pl)
200                 cbtxn.disassemble(retExtra=True)
201                 coinbase = cbtxn.getCoinbase()
202                 wliLen = coinbase[0]
203                 wli = coinbase[1:wliLen+1]
204                 mode = 'MC'
205                 moden = 1
206         else:
207                 wli = shareMerkleRoot
208                 mode = 'MRD'
209                 moden = 0
210         
211         MWL = workLog[username]
212         if wli not in MWL:
213                 raise RejectedShare('unknown-work')
214         (wld, issueT) = MWL[wli]
215         share[mode] = wld
216         
217         if data in DupeShareHACK:
218                 raise RejectedShare('duplicate')
219         DupeShareHACK[data] = None
220         
221         blkhash = dblsha(data)
222         if blkhash[28:] != b'\0\0\0\0':
223                 raise RejectedShare('H-not-zero')
224         blkhashn = hash2int(blkhash)
225         
226         global networkTarget
227         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
228         logfunc('BLKHASH: %64x' % (blkhashn,))
229         logfunc(' TARGET: %64x' % (networkTarget,))
230         
231         workMerkleTree = wld[1]
232         workCoinbase = wld[2]
233         
234         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
235         txlist = workMerkleTree.data
236         cbtxn = txlist[0]
237         cbtxn.setCoinbase(workCoinbase)
238         cbtxn.assemble()
239         
240         if blkhashn <= networkTarget:
241                 logfunc("Submitting upstream")
242                 if not moden:
243                         RBDs.append( deepcopy( (data, txlist) ) )
244                         payload = assembleBlock(data, txlist)
245                 else:
246                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
247                         payload = share['data'] + share['blkdata']
248                 logfunc('Real block payload: %s' % (payload,))
249                 RBPs.append(payload)
250                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
251                 bcnode.submitBlock(payload)
252                 share['upstreamResult'] = True
253                 MM.updateBlock(blkhash)
254         
255         # Gotwork hack...
256         if gotwork and blkhashn <= config.GotWorkTarget:
257                 try:
258                         coinbaseMrkl = cbtxn.data
259                         coinbaseMrkl += blkhash
260                         steps = workMerkleTree._steps
261                         coinbaseMrkl += pack('B', len(steps))
262                         for step in steps:
263                                 coinbaseMrkl += step
264                         coinbaseMrkl += b"\0\0\0\0"
265                         info = {}
266                         info['hash'] = b2a_hex(blkhash).decode('ascii')
267                         info['header'] = b2a_hex(data).decode('ascii')
268                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
269                         thr = threading.Thread(target=submitGotwork, args=(info,))
270                         thr.daemon = True
271                         thr.start()
272                 except:
273                         checkShare.logger.warning('Failed to build gotwork request')
274         
275         shareTimestamp = unpack('<L', data[68:72])[0]
276         if shareTime < issueT - 120:
277                 raise RejectedShare('stale-work')
278         if shareTimestamp < shareTime - 300:
279                 raise RejectedShare('time-too-old')
280         if shareTimestamp > shareTime + 7200:
281                 raise RejectedShare('time-too-new')
282         
283         if moden:
284                 cbpre = cbtxn.getCoinbase()
285                 cbpreLen = len(cbpre)
286                 if coinbase[:cbpreLen] != cbpre:
287                         raise RejectedShare('bad-cb-prefix')
288                 
289                 # Filter out known "I support" flags, to prevent exploits
290                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
291                         if coinbase.find(ff) > cbpreLen - len(ff):
292                                 raise RejectedShare('bad-cb-flag')
293                 
294                 if len(coinbase) > 100:
295                         raise RejectedShare('bad-cb-length')
296                 
297                 cbtxn = deepcopy(cbtxn)
298                 cbtxn.setCoinbase(coinbase)
299                 cbtxn.assemble()
300                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
301                         raise RejectedShare('bad-txnmrklroot')
302                 
303                 txlist = [cbtxn,] + txlist[1:]
304                 allowed = assembleBlock(data, txlist)
305                 if allowed != share['data'] + share['blkdata']:
306                         raise RejectedShare('bad-txns')
307 checkShare.logger = logging.getLogger('checkShare')
308
309 def receiveShare(share):
310         # TODO: username => userid
311         try:
312                 checkShare(share)
313         except RejectedShare as rej:
314                 share['rejectReason'] = str(rej)
315                 raise
316         finally:
317                 if '_origdata' in share:
318                         share['solution'] = share['_origdata']
319                 else:
320                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
321                 for i in loggersShare:
322                         i(share)
323
324 def newBlockNotification(signum, frame):
325         logging.getLogger('newBlockNotification').info('Received new block notification')
326         MM.updateMerkleTree()
327         # TODO: Force RESPOND TO LONGPOLLS?
328         pass
329
330 from signal import signal, SIGUSR1
331 signal(SIGUSR1, newBlockNotification)
332
333
334 import os
335 import os.path
336 import pickle
337 import signal
338 import sys
339 from time import sleep
340 import traceback
341
342 SAVE_STATE_FILENAME = 'eloipool.worklog'
343
344 def stopServers():
345         logger = logging.getLogger('stopServers')
346         
347         logger.info('Stopping servers...')
348         global bcnode, server
349         servers = (bcnode, server)
350         for s in servers:
351                 s.keepgoing = False
352         for s in servers:
353                 s.wakeup()
354         i = 0
355         while True:
356                 sl = []
357                 for s in servers:
358                         if s.running:
359                                 sl.append(s.__class__.__name__)
360                 if not sl:
361                         break
362                 i += 1
363                 if i >= 0x100:
364                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
365                         break
366                 sleep(0.01)
367         
368         for s in servers:
369                 for fd in s._fd.keys():
370                         os.close(fd)
371
372 def saveState(t = None):
373         logger = logging.getLogger('saveState')
374         
375         # Then, save data needed to resume work
376         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
377         i = 0
378         while True:
379                 try:
380                         with open(SAVE_STATE_FILENAME, 'wb') as f:
381                                 pickle.dump(t, f)
382                                 pickle.dump(DupeShareHACK, f)
383                                 pickle.dump(workLog, f)
384                         break
385                 except:
386                         i += 1
387                         if i >= 0x10000:
388                                 logger.error('Failed to save work\n' + traceback.format_exc())
389                                 try:
390                                         os.unlink(SAVE_STATE_FILENAME)
391                                 except:
392                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
393
394 def exit():
395         t = time()
396         stopServers()
397         saveState(t)
398         logging.getLogger('exit').info('Goodbye...')
399         os.kill(os.getpid(), signal.SIGTERM)
400         sys.exit(0)
401
402 def restart():
403         t = time()
404         stopServers()
405         saveState(t)
406         logging.getLogger('restart').info('Restarting...')
407         try:
408                 os.execv(sys.argv[0], sys.argv)
409         except:
410                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
411
412 def restoreState():
413         if not os.path.exists(SAVE_STATE_FILENAME):
414                 return
415         
416         global workLog, DupeShareHACK
417         
418         logger = logging.getLogger('restoreState')
419         s = os.stat(SAVE_STATE_FILENAME)
420         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
421         try:
422                 with open(SAVE_STATE_FILENAME, 'rb') as f:
423                         t = pickle.load(f)
424                         if type(t) == tuple:
425                                 workLog = t[0]
426                                 DupeShareHACK = t[1]
427                                 t = None
428                         else:
429                                 if isinstance(t, dict):
430                                         DupeShareHACK = t
431                                         t = None
432                                 else:
433                                         DupeShareHACK = pickle.load(f)
434                                 
435                                 if s.st_mtime + 120 >= time():
436                                         workLog = pickle.load(f)
437                                 else:
438                                         logger.debug('Skipping restore of expired workLog')
439         except:
440                 logger.error('Failed to restore state\n' + traceback.format_exc())
441                 return
442         logger.info('State restored successfully')
443         if t:
444                 logger.info('Total downtime: %g seconds' % (time() - t,))
445
446
447 from jsonrpcserver import JSONRPCListener, JSONRPCServer
448 import interactivemode
449 from networkserver import NetworkListener
450 import threading
451 import sharelogging
452 import imp
453
454 if __name__ == "__main__":
455         if not hasattr(config, 'ShareLogging'):
456                 config.ShareLogging = ()
457         if hasattr(config, 'DbOptions'):
458                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
459                 config.ShareLogging = list(config.ShareLogging)
460                 config.ShareLogging.append( {
461                         'type': 'sql',
462                         'engine': 'postgres',
463                         'dbopts': config.DbOptions,
464                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
465                 } )
466         for i in config.ShareLogging:
467                 if not hasattr(i, 'keys'):
468                         name, parameters = i
469                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
470                         if name == 'postgres':
471                                 name = 'sql'
472                                 i = {
473                                         'engine': 'postgres',
474                                         'dbopts': parameters,
475                                 }
476                         elif name == 'logfile':
477                                 i = {}
478                                 i['thropts'] = parameters
479                                 if 'filename' in parameters:
480                                         i['filename'] = parameters['filename']
481                                         i['thropts'] = dict(i['thropts'])
482                                         del i['thropts']['filename']
483                         else:
484                                 i = parameters
485                         i['type'] = name
486                 
487                 name = i['type']
488                 parameters = i
489                 try:
490                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
491                         m = imp.load_module(name, fp, pathname, description)
492                         lo = getattr(m, name)(**parameters)
493                         loggersShare.append(lo.logShare)
494                 except:
495                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
496
497         LSbc = []
498         if not hasattr(config, 'BitcoinNodeAddresses'):
499                 config.BitcoinNodeAddresses = ()
500         for a in config.BitcoinNodeAddresses:
501                 LSbc.append(NetworkListener(bcnode, a))
502         
503         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
504                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
505         
506         import jsonrpc_getmemorypool
507         import jsonrpc_getwork
508         import jsonrpc_setworkaux
509         
510         server = JSONRPCServer()
511         if hasattr(config, 'JSONRPCAddress'):
512                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
513                 if not hasattr(config, 'JSONRPCAddresses'):
514                         config.JSONRPCAddresses = []
515                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
516         LS = []
517         for a in config.JSONRPCAddresses:
518                 LS.append(JSONRPCListener(server, a))
519         if hasattr(config, 'SecretUser'):
520                 server.SecretUser = config.SecretUser
521         server.aux = MM.CoinbaseAux
522         server.getBlockHeader = getBlockHeader
523         server.getBlockTemplate = getBlockTemplate
524         server.receiveShare = receiveShare
525         server.RaiseRedFlags = RaiseRedFlags
526         
527         restoreState()
528         
529         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
530         bcnode_thr.daemon = True
531         bcnode_thr.start()
532         
533         server.serve_forever()