Bugfix: Postgres only tolerates string solution, not bytes
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 from util import Bits2Target
80
81 workLog = {}
82 networkTarget = None
83 DupeShareHACK = {}
84
85 server = None
86 def updateBlocks():
87         if server:
88                 server.wakeLongpoll()
89
90 def blockChanged():
91         global DupeShareHACK
92         DupeShareHACK = {}
93         global MM, networkTarget, server
94         networkTarget = Bits2Target(MM.currentBlock[1])
95         workLog.clear()
96         updateBlocks()
97
98
99 from merklemaker import merkleMaker
100 MM = merkleMaker()
101 MM.__dict__.update(config.__dict__)
102 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
103 MM.clearCoinbaseTxn.assemble()
104 MM.makeCoinbaseTxn = makeCoinbaseTxn
105 MM.onBlockChange = blockChanged
106 MM.onBlockUpdate = updateBlocks
107 MM.start()
108
109
110 from binascii import b2a_hex
111 from copy import deepcopy
112 from struct import pack, unpack
113 from time import time
114 from util import RejectedShare, dblsha, hash2int, swap32
115 import jsonrpc
116 import threading
117 import traceback
118
119 gotwork = None
120 if hasattr(config, 'GotWorkURI'):
121         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
122
123 def submitGotwork(info):
124         try:
125                 gotwork.gotwork(info)
126         except:
127                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
128
129 db = None
130 if hasattr(config, 'DbOptions'):
131         import psycopg2
132         db = psycopg2.connect(**config.DbOptions)
133
134 def getBlockHeader(username):
135         MRD = MM.getMRD()
136         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
137         timestamp = pack('<L', int(time()))
138         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
139         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
140         return hdr
141
142 def getBlockTemplate(username):
143         MC = MM.getMC()
144         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
145         workLog.setdefault(username, {})[coinbase] = (MC, time())
146         return MC
147
148 def YN(b):
149         if b is None:
150                 return None
151         return 'Y' if b else 'N'
152
153 def logShare(share):
154         if db is None:
155                 return
156         dbc = db.cursor()
157         rem_host = share.get('remoteHost', '?')
158         username = share['username']
159         reason = share.get('rejectReason', None)
160         upstreamResult = share.get('upstreamResult', None)
161         if '_origdata' in share:
162                 solution = share['_origdata']
163         else:
164                 solution = b2a_hex(swap32(share['data'])).decode('utf8')
165         #solution = b2a_hex(solution).decode('utf8')
166         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
167         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
168         dbc.execute(stmt, params)
169         db.commit()
170
171 RBDs = []
172 RBPs = []
173
174 from bitcoin.varlen import varlenEncode, varlenDecode
175 import bitcoin.txn
176 def assembleBlock(blkhdr, txlist):
177         payload = blkhdr
178         payload += varlenEncode(len(txlist))
179         for tx in txlist:
180                 payload += tx.data
181         return payload
182
183 def blockSubmissionThread(payload):
184         while True:
185                 try:
186                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
187                         break
188                 except:
189                         pass
190
191 def checkShare(share):
192         data = share['data']
193         data = data[:80]
194         (prevBlock, bits) = MM.currentBlock
195         sharePrevBlock = data[4:36]
196         if sharePrevBlock != prevBlock:
197                 if sharePrevBlock == MM.lastBlock[0]:
198                         raise RejectedShare('stale-prevblk')
199                 raise RejectedShare('bad-prevblk')
200         
201         # TODO: use userid
202         username = share['username']
203         if username not in workLog:
204                 raise RejectedShare('unknown-user')
205         
206         if data[72:76] != bits:
207                 raise RejectedShare('bad-diffbits')
208         if data[:4] != b'\1\0\0\0':
209                 raise RejectedShare('bad-version')
210         
211         shareMerkleRoot = data[36:68]
212         if 'blkdata' in share:
213                 pl = share['blkdata']
214                 (txncount, pl) = varlenDecode(pl)
215                 cbtxn = bitcoin.txn.Txn(pl)
216                 cbtxn.disassemble(retExtra=True)
217                 wli = cbtxn.getCoinbase()
218                 mode = 'MC'
219                 moden = 1
220         else:
221                 wli = shareMerkleRoot
222                 mode = 'MRD'
223                 moden = 0
224         
225         MWL = workLog[username]
226         if wli not in MWL:
227                 raise RejectedShare('unknown-work')
228         (wld, t) = MWL[wli]
229         share[mode] = wld
230         
231         if data in DupeShareHACK:
232                 raise RejectedShare('duplicate')
233         DupeShareHACK[data] = None
234         
235         shareTimestamp = unpack('<L', data[68:72])[0]
236         shareTime = share['time'] = time()
237         if shareTime < t - 120:
238                 raise RejectedShare('stale-work')
239         if shareTimestamp < shareTime - 300:
240                 raise RejectedShare('time-too-old')
241         if shareTimestamp > shareTime + 7200:
242                 raise RejectedShare('time-too-new')
243         
244         blkhash = dblsha(data)
245         if blkhash[28:] != b'\0\0\0\0':
246                 raise RejectedShare('H-not-zero')
247         blkhashn = hash2int(blkhash)
248         
249         global networkTarget
250         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
251         logfunc('BLKHASH: %64x' % (blkhashn,))
252         logfunc(' TARGET: %64x' % (networkTarget,))
253         
254         workMerkleTree = wld[1]
255         workCoinbase = wld[2]
256         
257         # NOTE: this isn't actually needed for MC mode, but we're abusing it for a trivial share check...
258         txlist = workMerkleTree.data
259         cbtxn = txlist[0]
260         cbtxn.setCoinbase(workCoinbase)
261         cbtxn.assemble()
262         
263         if blkhashn <= networkTarget:
264                 logfunc("Submitting upstream")
265                 if not moden:
266                         RBDs.append( deepcopy( (data, txlist) ) )
267                         payload = assembleBlock(data, txlist)
268                 else:
269                         RBDs.append( deepcopy( (data, txlist, share['blkdata']) ) )
270                         payload = share['data'] + share['blkdata']
271                 logfunc('Real block payload: %s' % (payload,))
272                 RBPs.append(payload)
273                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
274                 bcnode.submitBlock(payload)
275                 share['upstreamResult'] = True
276                 MM.updateBlock(blkhash)
277         
278         # Gotwork hack...
279         if gotwork and blkhashn <= config.GotWorkTarget:
280                 try:
281                         coinbaseMrkl = cbtxn.data
282                         coinbaseMrkl += blkhash
283                         steps = workMerkleTree._steps
284                         coinbaseMrkl += pack('B', len(steps))
285                         for step in steps:
286                                 coinbaseMrkl += step
287                         coinbaseMrkl += b"\0\0\0\0"
288                         info = {}
289                         info['hash'] = b2a_hex(blkhash).decode('ascii')
290                         info['header'] = b2a_hex(data).decode('ascii')
291                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
292                         thr = threading.Thread(target=submitGotwork, args=(info,))
293                         thr.daemon = True
294                         thr.start()
295                 except:
296                         checkShare.logger.warning('Failed to build gotwork request')
297         
298         if moden:
299                 if shareMerkleRoot != workMerkleTree.merkleRoot():
300                         raise RejectedShare('bad-txnmrklroot')
301                 allowed = assembleBlock(data, txlist)
302                 if allowed != share['data'] + share['blkdata']:
303                         raise RejectedShare('bad-txns')
304         
305         logShare(share)
306 checkShare.logger = logging.getLogger('checkShare')
307
308 def receiveShare(share):
309         # TODO: username => userid
310         try:
311                 checkShare(share)
312         except RejectedShare as rej:
313                 share['rejectReason'] = str(rej)
314                 logShare(share)
315                 raise
316         # TODO
317
318 def newBlockNotification(signum, frame):
319         logging.getLogger('newBlockNotification').info('Received new block notification')
320         MM.updateMerkleTree()
321         # TODO: Force RESPOND TO LONGPOLLS?
322         pass
323
324 from signal import signal, SIGUSR1
325 signal(SIGUSR1, newBlockNotification)
326
327
328 import os
329 import os.path
330 import pickle
331 import signal
332 import sys
333 from time import sleep
334 import traceback
335
336 SAVE_STATE_FILENAME = 'eloipool.worklog'
337
338 def stopServers():
339         logger = logging.getLogger('stopServers')
340         
341         logger.info('Stopping servers...')
342         global bcnode, server
343         servers = (bcnode, server)
344         for s in servers:
345                 s.keepgoing = False
346         for s in servers:
347                 s.wakeup()
348         i = 0
349         while True:
350                 sl = []
351                 for s in servers:
352                         if s.running:
353                                 sl.append(s.__class__.__name__)
354                 if not sl:
355                         break
356                 i += 1
357                 if i >= 0x100:
358                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
359                         break
360                 sleep(0.01)
361         
362         for s in servers:
363                 for fd in s._fd.keys():
364                         os.close(fd)
365
366 def saveState():
367         logger = logging.getLogger('saveState')
368         
369         # Then, save data needed to resume work
370         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
371         i = 0
372         while True:
373                 try:
374                         with open(SAVE_STATE_FILENAME, 'wb') as f:
375                                 pickle.dump( (workLog, DupeShareHACK), f )
376                         break
377                 except:
378                         i += 1
379                         if i >= 0x10000:
380                                 logger.error('Failed to save work\n' + traceback.format_exc())
381                                 try:
382                                         os.unlink(SAVE_STATE_FILENAME)
383                                 except:
384                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
385
386 def exit():
387         stopServers()
388         saveState()
389         logging.getLogger('exit').info('Goodbye...')
390         os.kill(os.getpid(), signal.SIGTERM)
391         sys.exit(0)
392
393 def restart():
394         stopServers()
395         saveState()
396         logging.getLogger('restart').info('Restarting...')
397         try:
398                 os.execv(sys.argv[0], sys.argv)
399         except:
400                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
401
402 def restoreState():
403         if not os.path.exists(SAVE_STATE_FILENAME):
404                 return
405         
406         global workLog, DupeShareHACK
407         
408         logger = logging.getLogger('restoreState')
409         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
410         try:
411                 with open(SAVE_STATE_FILENAME, 'rb') as f:
412                         data = pickle.load(f)
413                         workLog = data[0]
414                         DupeShareHACK = data[1]
415         except:
416                 logger.error('Failed to restore state\n' + traceback.format_exc())
417                 return
418         logger.info('State restored successfully')
419
420
421 from jsonrpcserver import JSONRPCListener, JSONRPCServer
422 import interactivemode
423 from networkserver import NetworkListener
424 import threading
425
426 if __name__ == "__main__":
427         LSbc = []
428         if not hasattr(config, 'BitcoinNodeAddresses'):
429                 config.BitcoinNodeAddresses = ()
430         for a in config.BitcoinNodeAddresses:
431                 LSbc.append(NetworkListener(bcnode, a))
432         
433         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
434                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
435         
436         server = JSONRPCServer()
437         if hasattr(config, 'JSONRPCAddress'):
438                 if not hasattr(config, 'JSONRPCAddresses'):
439                         config.JSONRPCAddresses = []
440                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
441         LS = []
442         for a in config.JSONRPCAddresses:
443                 LS.append(JSONRPCListener(server, a))
444         if hasattr(config, 'SecretUser'):
445                 server.SecretUser = config.SecretUser
446         server.aux = MM.CoinbaseAux
447         server.getBlockHeader = getBlockHeader
448         server.getBlockTemplate = getBlockTemplate
449         server.receiveShare = receiveShare
450         server.RaiseRedFlags = RaiseRedFlags
451         
452         restoreState()
453         
454         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
455         bcnode_thr.daemon = True
456         bcnode_thr.start()
457         
458         server.serve_forever()