Include date/time in logging output
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(
24         format='%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s',
25         level=logging.DEBUG,
26 )
27 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
28         logging.getLogger(infoOnly).setLevel(logging.INFO)
29
30 def RaiseRedFlags(reason):
31         logging.getLogger('redflag').critical(reason)
32         return reason
33
34
35 from bitcoin.node import BitcoinLink, BitcoinNode
36 bcnode = BitcoinNode(config.UpstreamNetworkId)
37 bcnode.userAgent += b'Eloipool:0.1/'
38
39 import jsonrpc
40 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
41
42
43 from bitcoin.script import BitcoinScript
44 from bitcoin.txn import Txn
45 from base58 import b58decode
46 from struct import pack
47 import subprocess
48 from time import time
49
50 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
51         txn = Txn.new()
52         
53         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
54                 coinbased = 0
55                 try:
56                         cmd = config.CoinbaserCmd
57                         cmd = cmd.replace('%d', str(coinbaseValue))
58                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
59                         nout = int(p.stdout.readline())
60                         for i in range(nout):
61                                 amount = int(p.stdout.readline())
62                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
63                                 pkScript = BitcoinScript.toAddress(addr)
64                                 txn.addOutput(amount, pkScript)
65                                 coinbased += amount
66                 except:
67                         coinbased = coinbaseValue + 1
68                 if coinbased >= coinbaseValue:
69                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
70                         txn.outputs = []
71                 else:
72                         coinbaseValue -= coinbased
73         
74         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
75         txn.addOutput(coinbaseValue, pkScript)
76         
77         # TODO
78         # TODO: red flag on dupe coinbase
79         return txn
80
81
82 import jsonrpc_getwork
83 from util import Bits2Target
84
85 workLog = {}
86 networkTarget = None
87 DupeShareHACK = {}
88
89 server = None
90 def updateBlocks():
91         if server:
92                 server.wakeLongpoll()
93
94 def blockChanged():
95         global DupeShareHACK
96         DupeShareHACK = {}
97         jsonrpc_getwork._CheckForDupesHACK = {}
98         global MM, networkTarget, server
99         networkTarget = Bits2Target(MM.currentBlock[1])
100         workLog.clear()
101         updateBlocks()
102
103
104 from merklemaker import merkleMaker
105 MM = merkleMaker()
106 MM.__dict__.update(config.__dict__)
107 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
108 MM.clearCoinbaseTxn.assemble()
109 MM.makeCoinbaseTxn = makeCoinbaseTxn
110 MM.onBlockChange = blockChanged
111 MM.onBlockUpdate = updateBlocks
112 MM.start()
113
114
115 from binascii import b2a_hex
116 from copy import deepcopy
117 from struct import pack, unpack
118 from time import time
119 from util import RejectedShare, dblsha, hash2int, swap32
120 import jsonrpc
121 import threading
122 import traceback
123
124 gotwork = None
125 if hasattr(config, 'GotWorkURI'):
126         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
127
128 def submitGotwork(info):
129         try:
130                 gotwork.gotwork(info)
131         except:
132                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
133
134 def getBlockHeader(username):
135         MRD = MM.getMRD()
136         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
137         timestamp = pack('<L', int(time()))
138         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
139         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
140         return hdr
141
142 def getBlockTemplate(username):
143         MC = MM.getMC()
144         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
145         wliLen = coinbase[0]
146         wli = coinbase[1:wliLen+1]
147         workLog.setdefault(username, {})[wli] = (MC, time())
148         return MC
149
150 loggersShare = []
151
152 RBDs = []
153 RBPs = []
154
155 from bitcoin.varlen import varlenEncode, varlenDecode
156 import bitcoin.txn
157 def assembleBlock(blkhdr, txlist):
158         payload = blkhdr
159         payload += varlenEncode(len(txlist))
160         for tx in txlist:
161                 payload += tx.data
162         return payload
163
164 def blockSubmissionThread(payload):
165         while True:
166                 try:
167                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
168                         break
169                 except:
170                         pass
171
172 def checkShare(share):
173         shareTime = share['time'] = time()
174         
175         data = share['data']
176         data = data[:80]
177         (prevBlock, bits) = MM.currentBlock
178         sharePrevBlock = data[4:36]
179         if sharePrevBlock != prevBlock:
180                 if sharePrevBlock == MM.lastBlock[0]:
181                         raise RejectedShare('stale-prevblk')
182                 raise RejectedShare('bad-prevblk')
183         
184         # TODO: use userid
185         username = share['username']
186         if username not in workLog:
187                 raise RejectedShare('unknown-user')
188         
189         if data[72:76] != bits:
190                 raise RejectedShare('bad-diffbits')
191         if data[:4] != b'\1\0\0\0':
192                 raise RejectedShare('bad-version')
193         
194         shareMerkleRoot = data[36:68]
195         if 'blkdata' in share:
196                 pl = share['blkdata']
197                 (txncount, pl) = varlenDecode(pl)
198                 cbtxn = bitcoin.txn.Txn(pl)
199                 cbtxn.disassemble(retExtra=True)
200                 coinbase = cbtxn.getCoinbase()
201                 wliLen = coinbase[0]
202                 wli = coinbase[1:wliLen+1]
203                 mode = 'MC'
204                 moden = 1
205         else:
206                 wli = shareMerkleRoot
207                 mode = 'MRD'
208                 moden = 0
209         
210         MWL = workLog[username]
211         if wli not in MWL:
212                 raise RejectedShare('unknown-work')
213         (wld, issueT) = MWL[wli]
214         share[mode] = wld
215         
216         if data in DupeShareHACK:
217                 raise RejectedShare('duplicate')
218         DupeShareHACK[data] = None
219         
220         blkhash = dblsha(data)
221         if blkhash[28:] != b'\0\0\0\0':
222                 raise RejectedShare('H-not-zero')
223         blkhashn = hash2int(blkhash)
224         
225         global networkTarget
226         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
227         logfunc('BLKHASH: %64x' % (blkhashn,))
228         logfunc(' TARGET: %64x' % (networkTarget,))
229         
230         workMerkleTree = wld[1]
231         workCoinbase = wld[2]
232         
233         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
234         txlist = workMerkleTree.data
235         cbtxn = txlist[0]
236         cbtxn.setCoinbase(workCoinbase)
237         cbtxn.assemble()
238         
239         if blkhashn <= networkTarget:
240                 logfunc("Submitting upstream")
241                 if not moden:
242                         RBDs.append( deepcopy( (data, txlist) ) )
243                         payload = assembleBlock(data, txlist)
244                 else:
245                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
246                         payload = share['data'] + share['blkdata']
247                 logfunc('Real block payload: %s' % (payload,))
248                 RBPs.append(payload)
249                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
250                 bcnode.submitBlock(payload)
251                 share['upstreamResult'] = True
252                 MM.updateBlock(blkhash)
253         
254         # Gotwork hack...
255         if gotwork and blkhashn <= config.GotWorkTarget:
256                 try:
257                         coinbaseMrkl = cbtxn.data
258                         coinbaseMrkl += blkhash
259                         steps = workMerkleTree._steps
260                         coinbaseMrkl += pack('B', len(steps))
261                         for step in steps:
262                                 coinbaseMrkl += step
263                         coinbaseMrkl += b"\0\0\0\0"
264                         info = {}
265                         info['hash'] = b2a_hex(blkhash).decode('ascii')
266                         info['header'] = b2a_hex(data).decode('ascii')
267                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
268                         thr = threading.Thread(target=submitGotwork, args=(info,))
269                         thr.daemon = True
270                         thr.start()
271                 except:
272                         checkShare.logger.warning('Failed to build gotwork request')
273         
274         shareTimestamp = unpack('<L', data[68:72])[0]
275         if shareTime < issueT - 120:
276                 raise RejectedShare('stale-work')
277         if shareTimestamp < shareTime - 300:
278                 raise RejectedShare('time-too-old')
279         if shareTimestamp > shareTime + 7200:
280                 raise RejectedShare('time-too-new')
281         
282         if moden:
283                 cbpre = cbtxn.getCoinbase()
284                 cbpreLen = len(cbpre)
285                 if coinbase[:cbpreLen] != cbpre:
286                         raise RejectedShare('bad-cb-prefix')
287                 
288                 # Filter out known "I support" flags, to prevent exploits
289                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
290                         if coinbase.find(ff) > cbpreLen - len(ff):
291                                 raise RejectedShare('bad-cb-flag')
292                 
293                 if len(coinbase) > 100:
294                         raise RejectedShare('bad-cb-length')
295                 
296                 cbtxn = deepcopy(cbtxn)
297                 cbtxn.setCoinbase(coinbase)
298                 cbtxn.assemble()
299                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
300                         raise RejectedShare('bad-txnmrklroot')
301                 
302                 txlist = [cbtxn,] + txlist[1:]
303                 allowed = assembleBlock(data, txlist)
304                 if allowed != share['data'] + share['blkdata']:
305                         raise RejectedShare('bad-txns')
306 checkShare.logger = logging.getLogger('checkShare')
307
308 def receiveShare(share):
309         # TODO: username => userid
310         try:
311                 checkShare(share)
312         except RejectedShare as rej:
313                 share['rejectReason'] = str(rej)
314                 raise
315         finally:
316                 if '_origdata' in share:
317                         share['solution'] = share['_origdata']
318                 else:
319                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
320                 for i in loggersShare:
321                         i(share)
322
323 def newBlockNotification(signum, frame):
324         logging.getLogger('newBlockNotification').info('Received new block notification')
325         MM.updateMerkleTree()
326         # TODO: Force RESPOND TO LONGPOLLS?
327         pass
328
329 from signal import signal, SIGUSR1
330 signal(SIGUSR1, newBlockNotification)
331
332
333 import os
334 import os.path
335 import pickle
336 import signal
337 import sys
338 from time import sleep
339 import traceback
340
341 SAVE_STATE_FILENAME = 'eloipool.worklog'
342
343 def stopServers():
344         logger = logging.getLogger('stopServers')
345         
346         logger.info('Stopping servers...')
347         global bcnode, server
348         servers = (bcnode, server)
349         for s in servers:
350                 s.keepgoing = False
351         for s in servers:
352                 s.wakeup()
353         i = 0
354         while True:
355                 sl = []
356                 for s in servers:
357                         if s.running:
358                                 sl.append(s.__class__.__name__)
359                 if not sl:
360                         break
361                 i += 1
362                 if i >= 0x100:
363                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
364                         break
365                 sleep(0.01)
366         
367         for s in servers:
368                 for fd in s._fd.keys():
369                         os.close(fd)
370
371 def saveState(t = None):
372         logger = logging.getLogger('saveState')
373         
374         # Then, save data needed to resume work
375         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
376         i = 0
377         while True:
378                 try:
379                         with open(SAVE_STATE_FILENAME, 'wb') as f:
380                                 pickle.dump(t, f)
381                                 pickle.dump(DupeShareHACK, f)
382                                 pickle.dump(workLog, f)
383                         break
384                 except:
385                         i += 1
386                         if i >= 0x10000:
387                                 logger.error('Failed to save work\n' + traceback.format_exc())
388                                 try:
389                                         os.unlink(SAVE_STATE_FILENAME)
390                                 except:
391                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
392
393 def exit():
394         t = time()
395         stopServers()
396         saveState(t)
397         logging.getLogger('exit').info('Goodbye...')
398         os.kill(os.getpid(), signal.SIGTERM)
399         sys.exit(0)
400
401 def restart():
402         t = time()
403         stopServers()
404         saveState(t)
405         logging.getLogger('restart').info('Restarting...')
406         try:
407                 os.execv(sys.argv[0], sys.argv)
408         except:
409                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
410
411 def restoreState():
412         if not os.path.exists(SAVE_STATE_FILENAME):
413                 return
414         
415         global workLog, DupeShareHACK
416         
417         logger = logging.getLogger('restoreState')
418         s = os.stat(SAVE_STATE_FILENAME)
419         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
420         try:
421                 with open(SAVE_STATE_FILENAME, 'rb') as f:
422                         t = pickle.load(f)
423                         if type(t) == tuple:
424                                 workLog = t[0]
425                                 DupeShareHACK = t[1]
426                                 t = None
427                         else:
428                                 if isinstance(t, dict):
429                                         DupeShareHACK = t
430                                         t = None
431                                 else:
432                                         DupeShareHACK = pickle.load(f)
433                                 
434                                 if s.st_mtime + 120 >= time():
435                                         workLog = pickle.load(f)
436                                 else:
437                                         logger.debug('Skipping restore of expired workLog')
438         except:
439                 logger.error('Failed to restore state\n' + traceback.format_exc())
440                 return
441         logger.info('State restored successfully')
442         if t:
443                 logger.info('Total downtime: %g seconds' % (time() - t,))
444
445
446 from jsonrpcserver import JSONRPCListener, JSONRPCServer
447 import interactivemode
448 from networkserver import NetworkListener
449 import threading
450 import sharelogging
451 import imp
452
453 if __name__ == "__main__":
454         if not hasattr(config, 'ShareLogging'):
455                 config.ShareLogging = ()
456         if hasattr(config, 'DbOptions'):
457                 logging.getLogger('backwardCompatibility').warn('DbOptions configuration variable is deprecated; upgrade to ShareLogging var before 2013-03-05')
458                 config.ShareLogging = list(config.ShareLogging)
459                 config.ShareLogging.append( {
460                         'type': 'sql',
461                         'engine': 'postgres',
462                         'dbopts': config.DbOptions,
463                         'statement': "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values ({Q(remoteHost)}, {username}, {YN(not(rejectReason))}, {YN(upstreamResult)}, {rejectReason}, decode({solution}, 'hex'))",
464                 } )
465         for i in config.ShareLogging:
466                 if not hasattr(i, 'keys'):
467                         name, parameters = i
468                         logging.getLogger('backwardCompatibility').warn('Using short-term backward compatibility for ShareLogging[\'%s\']; be sure to update config before 2012-04-04' % (name,))
469                         if name == 'postgres':
470                                 name = 'sql'
471                                 i = {
472                                         'engine': 'postgres',
473                                         'dbopts': parameters,
474                                 }
475                         elif name == 'logfile':
476                                 i = {}
477                                 i['thropts'] = parameters
478                                 if 'filename' in parameters:
479                                         i['filename'] = parameters['filename']
480                                         i['thropts'] = dict(i['thropts'])
481                                         del i['thropts']['filename']
482                         else:
483                                 i = parameters
484                         i['type'] = name
485                 
486                 name = i['type']
487                 parameters = i
488                 try:
489                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
490                         m = imp.load_module(name, fp, pathname, description)
491                         lo = getattr(m, name)(**parameters)
492                         loggersShare.append(lo.logShare)
493                 except:
494                         logging.getLogger('sharelogging').error("Error setting up share logger %s: %s", name,  sys.exc_info())
495
496         LSbc = []
497         if not hasattr(config, 'BitcoinNodeAddresses'):
498                 config.BitcoinNodeAddresses = ()
499         for a in config.BitcoinNodeAddresses:
500                 LSbc.append(NetworkListener(bcnode, a))
501         
502         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
503                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
504         
505         import jsonrpc_getmemorypool
506         import jsonrpc_getwork
507         import jsonrpc_setworkaux
508         
509         server = JSONRPCServer()
510         if hasattr(config, 'JSONRPCAddress'):
511                 logging.getLogger('backwardCompatibility').warn('JSONRPCAddress configuration variable is deprecated; upgrade to JSONRPCAddresses list before 2013-03-05')
512                 if not hasattr(config, 'JSONRPCAddresses'):
513                         config.JSONRPCAddresses = []
514                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
515         LS = []
516         for a in config.JSONRPCAddresses:
517                 LS.append(JSONRPCListener(server, a))
518         if hasattr(config, 'SecretUser'):
519                 server.SecretUser = config.SecretUser
520         server.aux = MM.CoinbaseAux
521         server.getBlockHeader = getBlockHeader
522         server.getBlockTemplate = getBlockTemplate
523         server.receiveShare = receiveShare
524         server.RaiseRedFlags = RaiseRedFlags
525         
526         restoreState()
527         
528         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
529         bcnode_thr.daemon = True
530         bcnode_thr.start()
531         
532         server.serve_forever()