Merge branch 'checkShare_vars' into serve_getmemorypool
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         txn = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 txn.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         txn.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         txn.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return txn
77
78
79 from util import Bits2Target
80
81 workLog = {}
82 networkTarget = None
83 DupeShareHACK = {}
84
85 server = None
86 def updateBlocks():
87         if server:
88                 server.wakeLongpoll()
89
90 def blockChanged():
91         global DupeShareHACK
92         DupeShareHACK = {}
93         global MM, networkTarget, server
94         networkTarget = Bits2Target(MM.currentBlock[1])
95         workLog.clear()
96         updateBlocks()
97
98
99 from merklemaker import merkleMaker
100 MM = merkleMaker()
101 MM.__dict__.update(config.__dict__)
102 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
103 MM.clearCoinbaseTxn.assemble()
104 MM.makeCoinbaseTxn = makeCoinbaseTxn
105 MM.onBlockChange = blockChanged
106 MM.onBlockUpdate = updateBlocks
107 MM.start()
108
109
110 from binascii import b2a_hex
111 from copy import deepcopy
112 from struct import pack, unpack
113 from time import time
114 from util import RejectedShare, dblsha, hash2int
115 import jsonrpc
116 import threading
117 import traceback
118
119 gotwork = None
120 if hasattr(config, 'GotWorkURI'):
121         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
122
123 def submitGotwork(info):
124         try:
125                 gotwork.gotwork(info)
126         except:
127                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
128
129 db = None
130 if hasattr(config, 'DbOptions'):
131         import psycopg2
132         db = psycopg2.connect(**config.DbOptions)
133
134 def getBlockHeader(username):
135         MRD = MM.getMRD()
136         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
137         timestamp = pack('<L', int(time()))
138         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
139         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
140         return hdr
141
142 def getBlockTemplate(username):
143         MC = MM.getMC()
144         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
145         workLog.setdefault(username, {})[coinbase] = (MC, time())
146         return MC
147
148 def YN(b):
149         if b is None:
150                 return None
151         return 'Y' if b else 'N'
152
153 def logShare(share):
154         if db is None:
155                 return
156         dbc = db.cursor()
157         rem_host = share.get('remoteHost', '?')
158         username = share['username']
159         reason = share.get('rejectReason', None)
160         upstreamResult = share.get('upstreamResult', None)
161         solution = share['_origdata']
162         #solution = b2a_hex(solution).decode('utf8')
163         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
164         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
165         dbc.execute(stmt, params)
166         db.commit()
167
168 RBDs = []
169 RBPs = []
170
171 from bitcoin.varlen import varlenEncode
172 def assembleBlock(blkhdr, txlist):
173         payload = blkhdr
174         payload += varlenEncode(len(txlist))
175         for tx in txlist:
176                 payload += tx.data
177         return payload
178
179 def blockSubmissionThread(payload):
180         while True:
181                 try:
182                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
183                         break
184                 except:
185                         pass
186
187 def checkShare(share):
188         data = share['data']
189         data = data[:80]
190         (prevBlock, bits) = MM.currentBlock
191         sharePrevBlock = data[4:36]
192         if sharePrevBlock != prevBlock:
193                 if sharePrevBlock == MM.lastBlock[0]:
194                         raise RejectedShare('stale-prevblk')
195                 raise RejectedShare('bad-prevblk')
196         
197         shareMerkleRoot = data[36:68]
198         # TODO: use userid
199         username = share['username']
200         if username not in workLog:
201                 raise RejectedShare('unknown-user')
202         
203         if data[72:76] != bits:
204                 raise RejectedShare('bad-diffbits')
205         if data[:4] != b'\1\0\0\0':
206                 raise RejectedShare('bad-version')
207         
208         MWL = workLog[username]
209         if shareMerkleRoot not in MWL:
210                 raise RejectedShare('unknown-work')
211         (MRD, t) = MWL[shareMerkleRoot]
212         share['MRD'] = MRD
213         
214         if data in DupeShareHACK:
215                 raise RejectedShare('duplicate')
216         DupeShareHACK[data] = None
217         
218         shareTimestamp = unpack('<L', data[68:72])[0]
219         shareTime = share['time'] = time()
220         if shareTime < t - 120:
221                 raise RejectedShare('stale-work')
222         if shareTimestamp < shareTime - 300:
223                 raise RejectedShare('time-too-old')
224         if shareTimestamp > shareTime + 7200:
225                 raise RejectedShare('time-too-new')
226         
227         blkhash = dblsha(data)
228         if blkhash[28:] != b'\0\0\0\0':
229                 raise RejectedShare('H-not-zero')
230         blkhashn = hash2int(blkhash)
231         
232         global networkTarget
233         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
234         logfunc('BLKHASH: %64x' % (blkhashn,))
235         logfunc(' TARGET: %64x' % (networkTarget,))
236         
237         workMerkleTree = MRD[1]
238         workCoinbase = MRD[2]
239         
240         txlist = workMerkleTree.data
241         cbtxn = txlist[0]
242         cbtxn.setCoinbase(workCoinbase)
243         cbtxn.assemble()
244         
245         if blkhashn <= networkTarget:
246                 logfunc("Submitting upstream")
247                 RBDs.append( deepcopy( (data, txlist) ) )
248                 payload = assembleBlock(data, txlist)
249                 logfunc('Real block payload: %s' % (payload,))
250                 RBPs.append(payload)
251                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
252                 bcnode.submitBlock(payload)
253                 share['upstreamResult'] = True
254                 MM.updateBlock(blkhash)
255         
256         # Gotwork hack...
257         if gotwork and blkhashn <= config.GotWorkTarget:
258                 try:
259                         coinbaseMrkl = cbtxn.data
260                         coinbaseMrkl += blkhash
261                         steps = workMerkleTree._steps
262                         coinbaseMrkl += pack('B', len(steps))
263                         for step in steps:
264                                 coinbaseMrkl += step
265                         coinbaseMrkl += b"\0\0\0\0"
266                         info = {}
267                         info['hash'] = b2a_hex(blkhash).decode('ascii')
268                         info['header'] = b2a_hex(data).decode('ascii')
269                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
270                         thr = threading.Thread(target=submitGotwork, args=(info,))
271                         thr.daemon = True
272                         thr.start()
273                 except:
274                         checkShare.logger.warning('Failed to build gotwork request')
275         
276         logShare(share)
277 checkShare.logger = logging.getLogger('checkShare')
278
279 def receiveShare(share):
280         # TODO: username => userid
281         try:
282                 checkShare(share)
283         except RejectedShare as rej:
284                 share['rejectReason'] = str(rej)
285                 logShare(share)
286                 raise
287         # TODO
288
289 def newBlockNotification(signum, frame):
290         logging.getLogger('newBlockNotification').info('Received new block notification')
291         MM.updateMerkleTree()
292         # TODO: Force RESPOND TO LONGPOLLS?
293         pass
294
295 from signal import signal, SIGUSR1
296 signal(SIGUSR1, newBlockNotification)
297
298
299 import os
300 import os.path
301 import pickle
302 import signal
303 import sys
304 from time import sleep
305 import traceback
306
307 SAVE_STATE_FILENAME = 'eloipool.worklog'
308
309 def stopServers():
310         logger = logging.getLogger('stopServers')
311         
312         logger.info('Stopping servers...')
313         global bcnode, server
314         servers = (bcnode, server)
315         for s in servers:
316                 s.keepgoing = False
317         for s in servers:
318                 s.wakeup()
319         i = 0
320         while True:
321                 sl = []
322                 for s in servers:
323                         if s.running:
324                                 sl.append(s.__class__.__name__)
325                 if not sl:
326                         break
327                 i += 1
328                 if i >= 0x100:
329                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
330                         break
331                 sleep(0.01)
332         
333         for s in servers:
334                 for fd in s._fd.keys():
335                         os.close(fd)
336
337 def saveState():
338         logger = logging.getLogger('saveState')
339         
340         # Then, save data needed to resume work
341         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
342         i = 0
343         while True:
344                 try:
345                         with open(SAVE_STATE_FILENAME, 'wb') as f:
346                                 pickle.dump( (workLog, DupeShareHACK), f )
347                         break
348                 except:
349                         i += 1
350                         if i >= 0x10000:
351                                 logger.error('Failed to save work\n' + traceback.format_exc())
352                                 try:
353                                         os.unlink(SAVE_STATE_FILENAME)
354                                 except:
355                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
356
357 def exit():
358         stopServers()
359         saveState()
360         logging.getLogger('exit').info('Goodbye...')
361         os.kill(os.getpid(), signal.SIGTERM)
362         sys.exit(0)
363
364 def restart():
365         stopServers()
366         saveState()
367         logging.getLogger('restart').info('Restarting...')
368         try:
369                 os.execv(sys.argv[0], sys.argv)
370         except:
371                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
372
373 def restoreState():
374         if not os.path.exists(SAVE_STATE_FILENAME):
375                 return
376         
377         global workLog, DupeShareHACK
378         
379         logger = logging.getLogger('restoreState')
380         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
381         try:
382                 with open(SAVE_STATE_FILENAME, 'rb') as f:
383                         data = pickle.load(f)
384                         workLog = data[0]
385                         DupeShareHACK = data[1]
386         except:
387                 logger.error('Failed to restore state\n' + traceback.format_exc())
388                 return
389         logger.info('State restored successfully')
390
391
392 from jsonrpcserver import JSONRPCListener, JSONRPCServer
393 import interactivemode
394 from networkserver import NetworkListener
395 import threading
396
397 if __name__ == "__main__":
398         LSbc = []
399         if not hasattr(config, 'BitcoinNodeAddresses'):
400                 config.BitcoinNodeAddresses = ()
401         for a in config.BitcoinNodeAddresses:
402                 LSbc.append(NetworkListener(bcnode, a))
403         
404         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
405                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
406         
407         server = JSONRPCServer()
408         if hasattr(config, 'JSONRPCAddress'):
409                 if not hasattr(config, 'JSONRPCAddresses'):
410                         config.JSONRPCAddresses = []
411                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
412         LS = []
413         for a in config.JSONRPCAddresses:
414                 LS.append(JSONRPCListener(server, a))
415         if hasattr(config, 'SecretUser'):
416                 server.SecretUser = config.SecretUser
417         server.aux = MM.CoinbaseAux
418         server.getBlockHeader = getBlockHeader
419         server.getBlockTemplate = getBlockTemplate
420         server.receiveShare = receiveShare
421         server.RaiseRedFlags = RaiseRedFlags
422         
423         restoreState()
424         
425         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
426         bcnode_thr.daemon = True
427         bcnode_thr.start()
428         
429         server.serve_forever()