Made the logging of shares modular and added file logging
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker', 'Waker for JSONRPCServer', 'JSONRPCServer'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 import jsonrpc_getwork
80 from util import Bits2Target
81
82 workLog = {}
83 networkTarget = None
84 DupeShareHACK = {}
85
86 server = None
87 def updateBlocks():
88         if server:
89                 server.wakeLongpoll()
90
91 def blockChanged():
92         global DupeShareHACK
93         DupeShareHACK = {}
94         jsonrpc_getwork._CheckForDupesHACK = {}
95         global MM, networkTarget, server
96         networkTarget = Bits2Target(MM.currentBlock[1])
97         workLog.clear()
98         updateBlocks()
99
100
101 from merklemaker import merkleMaker
102 MM = merkleMaker()
103 MM.__dict__.update(config.__dict__)
104 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
105 MM.clearCoinbaseTxn.assemble()
106 MM.makeCoinbaseTxn = makeCoinbaseTxn
107 MM.onBlockChange = blockChanged
108 MM.onBlockUpdate = updateBlocks
109 MM.start()
110
111
112 from binascii import b2a_hex
113 from copy import deepcopy
114 from struct import pack, unpack
115 from time import time
116 from util import RejectedShare, dblsha, hash2int, swap32
117 import jsonrpc
118 import threading
119 import traceback
120
121 gotwork = None
122 if hasattr(config, 'GotWorkURI'):
123         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
124
125 def submitGotwork(info):
126         try:
127                 gotwork.gotwork(info)
128         except:
129                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
130
131 db = None
132 if hasattr(config, 'DbOptions'):
133         import psycopg2
134         db = psycopg2.connect(**config.DbOptions)
135
136 def getBlockHeader(username):
137         MRD = MM.getMRD()
138         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
139         timestamp = pack('<L', int(time()))
140         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
141         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
142         return hdr
143
144 def getBlockTemplate(username):
145         MC = MM.getMC()
146         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
147         wliLen = coinbase[0]
148         wli = coinbase[1:wliLen+1]
149         workLog.setdefault(username, {})[wli] = (MC, time())
150         return MC
151
152 loggersShare = []
153
154 RBDs = []
155 RBPs = []
156
157 from bitcoin.varlen import varlenEncode, varlenDecode
158 import bitcoin.txn
159 def assembleBlock(blkhdr, txlist):
160         payload = blkhdr
161         payload += varlenEncode(len(txlist))
162         for tx in txlist:
163                 payload += tx.data
164         return payload
165
166 def blockSubmissionThread(payload):
167         while True:
168                 try:
169                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
170                         break
171                 except:
172                         pass
173
174 def checkShare(share):
175         data = share['data']
176         data = data[:80]
177         (prevBlock, bits) = MM.currentBlock
178         sharePrevBlock = data[4:36]
179         if sharePrevBlock != prevBlock:
180                 if sharePrevBlock == MM.lastBlock[0]:
181                         raise RejectedShare('stale-prevblk')
182                 raise RejectedShare('bad-prevblk')
183         
184         # TODO: use userid
185         username = share['username']
186         if username not in workLog:
187                 raise RejectedShare('unknown-user')
188         
189         if data[72:76] != bits:
190                 raise RejectedShare('bad-diffbits')
191         if data[:4] != b'\1\0\0\0':
192                 raise RejectedShare('bad-version')
193         
194         shareMerkleRoot = data[36:68]
195         if 'blkdata' in share:
196                 pl = share['blkdata']
197                 (txncount, pl) = varlenDecode(pl)
198                 cbtxn = bitcoin.txn.Txn(pl)
199                 cbtxn.disassemble(retExtra=True)
200                 coinbase = cbtxn.getCoinbase()
201                 wliLen = coinbase[0]
202                 wli = coinbase[1:wliLen+1]
203                 mode = 'MC'
204                 moden = 1
205         else:
206                 wli = shareMerkleRoot
207                 mode = 'MRD'
208                 moden = 0
209         
210         MWL = workLog[username]
211         if wli not in MWL:
212                 raise RejectedShare('unknown-work')
213         (wld, issueT) = MWL[wli]
214         share[mode] = wld
215         
216         if data in DupeShareHACK:
217                 raise RejectedShare('duplicate')
218         DupeShareHACK[data] = None
219         
220         shareTime = share['time'] = time()
221         
222         blkhash = dblsha(data)
223         if blkhash[28:] != b'\0\0\0\0':
224                 raise RejectedShare('H-not-zero')
225         blkhashn = hash2int(blkhash)
226         
227         global networkTarget
228         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
229         logfunc('BLKHASH: %64x' % (blkhashn,))
230         logfunc(' TARGET: %64x' % (networkTarget,))
231         
232         workMerkleTree = wld[1]
233         workCoinbase = wld[2]
234         
235         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
236         txlist = workMerkleTree.data
237         cbtxn = txlist[0]
238         cbtxn.setCoinbase(workCoinbase)
239         cbtxn.assemble()
240         
241         if blkhashn <= networkTarget:
242                 logfunc("Submitting upstream")
243                 if not moden:
244                         RBDs.append( deepcopy( (data, txlist) ) )
245                         payload = assembleBlock(data, txlist)
246                 else:
247                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
248                         payload = share['data'] + share['blkdata']
249                 logfunc('Real block payload: %s' % (payload,))
250                 RBPs.append(payload)
251                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
252                 bcnode.submitBlock(payload)
253                 share['upstreamResult'] = True
254                 MM.updateBlock(blkhash)
255         
256         # Gotwork hack...
257         if gotwork and blkhashn <= config.GotWorkTarget:
258                 try:
259                         coinbaseMrkl = cbtxn.data
260                         coinbaseMrkl += blkhash
261                         steps = workMerkleTree._steps
262                         coinbaseMrkl += pack('B', len(steps))
263                         for step in steps:
264                                 coinbaseMrkl += step
265                         coinbaseMrkl += b"\0\0\0\0"
266                         info = {}
267                         info['hash'] = b2a_hex(blkhash).decode('ascii')
268                         info['header'] = b2a_hex(data).decode('ascii')
269                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
270                         thr = threading.Thread(target=submitGotwork, args=(info,))
271                         thr.daemon = True
272                         thr.start()
273                 except:
274                         checkShare.logger.warning('Failed to build gotwork request')
275         
276         shareTimestamp = unpack('<L', data[68:72])[0]
277         if shareTime < issueT - 120:
278                 raise RejectedShare('stale-work')
279         if shareTimestamp < shareTime - 300:
280                 raise RejectedShare('time-too-old')
281         if shareTimestamp > shareTime + 7200:
282                 raise RejectedShare('time-too-new')
283         
284         if moden:
285                 cbpre = cbtxn.getCoinbase()
286                 cbpreLen = len(cbpre)
287                 if coinbase[:cbpreLen] != cbpre:
288                         raise RejectedShare('bad-cb-prefix')
289                 
290                 # Filter out known "I support" flags, to prevent exploits
291                 for ff in (b'/P2SH/', b'NOP2SH', b'p2sh/CHV', b'p2sh/NOCHV'):
292                         if coinbase.find(ff) > cbpreLen - len(ff):
293                                 raise RejectedShare('bad-cb-flag')
294                 
295                 if len(coinbase) > 100:
296                         raise RejectedShare('bad-cb-length')
297                 
298                 cbtxn = deepcopy(cbtxn)
299                 cbtxn.setCoinbase(coinbase)
300                 cbtxn.assemble()
301                 if shareMerkleRoot != workMerkleTree.withFirst(cbtxn):
302                         raise RejectedShare('bad-txnmrklroot')
303                 
304                 txlist = [cbtxn,] + txlist[1:]
305                 allowed = assembleBlock(data, txlist)
306                 if allowed != share['data'] + share['blkdata']:
307                         raise RejectedShare('bad-txns')
308 checkShare.logger = logging.getLogger('checkShare')
309
310 def receiveShare(share):
311         # TODO: username => userid
312         try:
313                 checkShare(share)
314         except RejectedShare as rej:
315                 share['rejectReason'] = str(rej)
316                 raise
317         finally:
318                 if '_origdata' in share:
319                         share['solution'] = share['_origdata']
320                 else:
321                         share['solution'] = b2a_hex(swap32(share['data'])).decode('utf8')
322                 for i in loggersShare:
323                         i(share)
324
325 def newBlockNotification(signum, frame):
326         logging.getLogger('newBlockNotification').info('Received new block notification')
327         MM.updateMerkleTree()
328         # TODO: Force RESPOND TO LONGPOLLS?
329         pass
330
331 from signal import signal, SIGUSR1
332 signal(SIGUSR1, newBlockNotification)
333
334
335 import os
336 import os.path
337 import pickle
338 import signal
339 import sys
340 from time import sleep
341 import traceback
342
343 SAVE_STATE_FILENAME = 'eloipool.worklog'
344
345 def stopServers():
346         logger = logging.getLogger('stopServers')
347         
348         logger.info('Stopping servers...')
349         global bcnode, server
350         servers = (bcnode, server)
351         for s in servers:
352                 s.keepgoing = False
353         for s in servers:
354                 s.wakeup()
355         i = 0
356         while True:
357                 sl = []
358                 for s in servers:
359                         if s.running:
360                                 sl.append(s.__class__.__name__)
361                 if not sl:
362                         break
363                 i += 1
364                 if i >= 0x100:
365                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
366                         break
367                 sleep(0.01)
368         
369         for s in servers:
370                 for fd in s._fd.keys():
371                         os.close(fd)
372
373 def saveState(t = None):
374         logger = logging.getLogger('saveState')
375         
376         # Then, save data needed to resume work
377         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
378         i = 0
379         while True:
380                 try:
381                         with open(SAVE_STATE_FILENAME, 'wb') as f:
382                                 pickle.dump(t, f)
383                                 pickle.dump(DupeShareHACK, f)
384                                 pickle.dump(workLog, f)
385                         break
386                 except:
387                         i += 1
388                         if i >= 0x10000:
389                                 logger.error('Failed to save work\n' + traceback.format_exc())
390                                 try:
391                                         os.unlink(SAVE_STATE_FILENAME)
392                                 except:
393                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
394
395 def exit():
396         t = time()
397         stopServers()
398         saveState(t)
399         logging.getLogger('exit').info('Goodbye...')
400         os.kill(os.getpid(), signal.SIGTERM)
401         sys.exit(0)
402
403 def restart():
404         t = time()
405         stopServers()
406         saveState(t)
407         logging.getLogger('restart').info('Restarting...')
408         try:
409                 os.execv(sys.argv[0], sys.argv)
410         except:
411                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
412
413 def restoreState():
414         if not os.path.exists(SAVE_STATE_FILENAME):
415                 return
416         
417         global workLog, DupeShareHACK
418         
419         logger = logging.getLogger('restoreState')
420         s = os.stat(SAVE_STATE_FILENAME)
421         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, s.st_size))
422         try:
423                 with open(SAVE_STATE_FILENAME, 'rb') as f:
424                         t = pickle.load(f)
425                         if type(t) == tuple:
426                                 workLog = t[0]
427                                 DupeShareHACK = t[1]
428                                 t = None
429                         else:
430                                 if isinstance(t, dict):
431                                         DupeShareHACK = t
432                                         t = None
433                                 else:
434                                         DupeShareHACK = pickle.load(f)
435                                 
436                                 if s.st_mtime + 120 >= time():
437                                         workLog = pickle.load(f)
438                                 else:
439                                         logger.debug('Skipping restore of expired workLog')
440         except:
441                 logger.error('Failed to restore state\n' + traceback.format_exc())
442                 return
443         logger.info('State restored successfully')
444         if t:
445                 logger.info('Total downtime: %g seconds' % (time() - t,))
446
447
448 from jsonrpcserver import JSONRPCListener, JSONRPCServer
449 import interactivemode
450 from networkserver import NetworkListener
451 import threading
452 import sharelogging
453 import imp
454
455 if __name__ == "__main__":
456         for i in config.shareLogging:
457                 name, parameters = i
458                 try:
459                         fp, pathname, description = imp.find_module(name, sharelogging.__path__)
460                         m = imp.load_module(name, fp, pathname, description)
461                         m.setup(parameters)
462                         loggersShare.append(m.logShare)
463                 except:
464                         logging.getLogger('logging').warn("error setting up logger %s: %s",name,  sys.exc_info())
465
466         LSbc = []
467         if not hasattr(config, 'BitcoinNodeAddresses'):
468                 config.BitcoinNodeAddresses = ()
469         for a in config.BitcoinNodeAddresses:
470                 LSbc.append(NetworkListener(bcnode, a))
471         
472         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
473                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
474         
475         import jsonrpc_getmemorypool
476         import jsonrpc_getwork
477         import jsonrpc_setworkaux
478         
479         server = JSONRPCServer()
480         if hasattr(config, 'JSONRPCAddress'):
481                 if not hasattr(config, 'JSONRPCAddresses'):
482                         config.JSONRPCAddresses = []
483                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
484         LS = []
485         for a in config.JSONRPCAddresses:
486                 LS.append(JSONRPCListener(server, a))
487         if hasattr(config, 'SecretUser'):
488                 server.SecretUser = config.SecretUser
489         server.aux = MM.CoinbaseAux
490         server.getBlockHeader = getBlockHeader
491         server.getBlockTemplate = getBlockTemplate
492         server.receiveShare = receiveShare
493         server.RaiseRedFlags = RaiseRedFlags
494         
495         restoreState()
496         
497         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
498         bcnode_thr.daemon = True
499         bcnode_thr.start()
500         
501         server.serve_forever()