Merge branch 'bugfix_bitcoin_txn_disasm' into serve_getmemorypool
[bitcoin:eloipool.git] / eloipool.py
1 #!/usr/bin/python3
2 # Eloipool - Python Bitcoin pool server
3 # Copyright (C) 2011-2012  Luke Dashjr <luke-jr+eloipool@utopios.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU Affero General Public License as
7 # published by the Free Software Foundation, either version 3 of the
8 # License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU Affero General Public License for more details.
14 #
15 # You should have received a copy of the GNU Affero General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import config
19
20
21 import logging
22
23 logging.basicConfig(level=logging.DEBUG)
24 for infoOnly in ('checkShare', 'JSONRPCHandler', 'merkleMaker'):
25         logging.getLogger(infoOnly).setLevel(logging.INFO)
26
27 def RaiseRedFlags(reason):
28         logging.getLogger('redflag').critical(reason)
29         return reason
30
31
32 from bitcoin.node import BitcoinLink, BitcoinNode
33 bcnode = BitcoinNode(config.UpstreamNetworkId)
34 bcnode.userAgent += b'Eloipool:0.1/'
35
36 import jsonrpc
37 UpstreamBitcoindJSONRPC = jsonrpc.ServiceProxy(config.UpstreamURI)
38
39
40 from bitcoin.script import BitcoinScript
41 from bitcoin.txn import Txn
42 from base58 import b58decode
43 from struct import pack
44 import subprocess
45 from time import time
46
47 def makeCoinbaseTxn(coinbaseValue, useCoinbaser = True):
48         t = Txn.new()
49         
50         if useCoinbaser and hasattr(config, 'CoinbaserCmd') and config.CoinbaserCmd:
51                 coinbased = 0
52                 try:
53                         cmd = config.CoinbaserCmd
54                         cmd = cmd.replace('%d', str(coinbaseValue))
55                         p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
56                         nout = int(p.stdout.readline())
57                         for i in range(nout):
58                                 amount = int(p.stdout.readline())
59                                 addr = p.stdout.readline().rstrip(b'\n').decode('utf8')
60                                 pkScript = BitcoinScript.toAddress(addr)
61                                 t.addOutput(amount, pkScript)
62                                 coinbased += amount
63                 except:
64                         coinbased = coinbaseValue + 1
65                 if coinbased >= coinbaseValue:
66                         logging.getLogger('makeCoinbaseTxn').error('Coinbaser failed!')
67                         t.outputs = []
68                 else:
69                         coinbaseValue -= coinbased
70         
71         pkScript = BitcoinScript.toAddress(config.TrackerAddr)
72         t.addOutput(coinbaseValue, pkScript)
73         
74         # TODO
75         # TODO: red flag on dupe coinbase
76         return t
77
78
79 from util import Bits2Target
80
81 workLog = {}
82 networkTarget = None
83 DupeShareHACK = {}
84
85 server = None
86 def updateBlocks():
87         if server:
88                 server.wakeLongpoll()
89
90 def blockChanged():
91         global DupeShareHACK
92         DupeShareHACK = {}
93         global MM, networkTarget, server
94         networkTarget = Bits2Target(MM.currentBlock[1])
95         workLog.clear()
96         updateBlocks()
97
98
99 from merklemaker import merkleMaker
100 MM = merkleMaker()
101 MM.__dict__.update(config.__dict__)
102 MM.clearCoinbaseTxn = makeCoinbaseTxn(5000000000, False)  # FIXME
103 MM.clearCoinbaseTxn.assemble()
104 MM.makeCoinbaseTxn = makeCoinbaseTxn
105 MM.onBlockChange = blockChanged
106 MM.onBlockUpdate = updateBlocks
107 MM.start()
108
109
110 from binascii import b2a_hex
111 from copy import deepcopy
112 from struct import pack, unpack
113 from time import time
114 from util import RejectedShare, dblsha, hash2int
115 import jsonrpc
116 import threading
117 import traceback
118
119 gotwork = None
120 if hasattr(config, 'GotWorkURI'):
121         gotwork = jsonrpc.ServiceProxy(config.GotWorkURI)
122
123 def submitGotwork(info):
124         try:
125                 gotwork.gotwork(info)
126         except:
127                 checkShare.logger.warning('Failed to submit gotwork\n' + traceback.format_exc())
128
129 db = None
130 if hasattr(config, 'DbOptions'):
131         import psycopg2
132         db = psycopg2.connect(**config.DbOptions)
133
134 def getBlockHeader(username):
135         MRD = MM.getMRD()
136         (merkleRoot, merkleTree, coinbase, prevBlock, bits, rollPrevBlk) = MRD
137         timestamp = pack('<L', int(time()))
138         hdr = b'\1\0\0\0' + prevBlock + merkleRoot + timestamp + bits + b'iolE'
139         workLog.setdefault(username, {})[merkleRoot] = (MRD, time())
140         return hdr
141
142 def getBlockTemplate(username):
143         MC = MM.getMC()
144         (dummy, merkleTree, coinbase, prevBlock, bits) = MC
145         workLog.setdefault(username, {})[coinbase] = (MC, time())
146         return MC
147
148 def YN(b):
149         if b is None:
150                 return None
151         return 'Y' if b else 'N'
152
153 def logShare(share):
154         if db is None:
155                 return
156         dbc = db.cursor()
157         rem_host = share.get('remoteHost', '?')
158         username = share['username']
159         reason = share.get('rejectReason', None)
160         upstreamResult = share.get('upstreamResult', None)
161         solution = share['_origdata']
162         #solution = b2a_hex(solution).decode('utf8')
163         stmt = "insert into shares (rem_host, username, our_result, upstream_result, reason, solution) values (%s, %s, %s, %s, %s, decode(%s, 'hex'))"
164         params = (rem_host, username, YN(not reason), YN(upstreamResult), reason, solution)
165         dbc.execute(stmt, params)
166         db.commit()
167
168 RBDs = []
169 RBPs = []
170
171 from bitcoin.varlen import varlenEncode
172 def assembleBlock(blkhdr, txlist):
173         payload = blkhdr
174         payload += varlenEncode(len(txlist))
175         for tx in txlist:
176                 payload += tx.data
177         return payload
178
179 def blockSubmissionThread(payload):
180         while True:
181                 try:
182                         UpstreamBitcoindJSONRPC.getmemorypool(b2a_hex(payload).decode('ascii'))
183                         break
184                 except:
185                         pass
186
187 def checkShare(share):
188         data = share['data']
189         data = data[:80]
190         (prevBlock, bits) = MM.currentBlock
191         sharePrevBlock = data[4:36]
192         if sharePrevBlock != prevBlock:
193                 if sharePrevBlock == MM.lastBlock[0]:
194                         raise RejectedShare('stale-prevblk')
195                 raise RejectedShare('bad-prevblk')
196         
197         shareMerkleRoot = data[36:68]
198         # TODO: use userid
199         username = share['username']
200         if username not in workLog:
201                 raise RejectedShare('unknown-user')
202         
203         if data[72:76] != bits:
204                 raise RejectedShare('bad-diffbits')
205         if data[:4] != b'\1\0\0\0':
206                 raise RejectedShare('bad-version')
207         
208         MWL = workLog[username]
209         if shareMerkleRoot not in MWL:
210                 raise RejectedShare('unknown-work')
211         (MRD, t) = MWL[shareMerkleRoot]
212         share['MRD'] = MRD
213         
214         if data in DupeShareHACK:
215                 raise RejectedShare('duplicate')
216         DupeShareHACK[data] = None
217         
218         shareTimestamp = unpack('<L', data[68:72])[0]
219         shareTime = share['time'] = time()
220         if shareTime < t - 120:
221                 raise RejectedShare('stale-work')
222         if shareTimestamp < shareTime - 300:
223                 raise RejectedShare('time-too-old')
224         if shareTimestamp > shareTime + 7200:
225                 raise RejectedShare('time-too-new')
226         
227         blkhash = dblsha(data)
228         if blkhash[28:] != b'\0\0\0\0':
229                 raise RejectedShare('H-not-zero')
230         blkhashn = hash2int(blkhash)
231         
232         global networkTarget
233         logfunc = getattr(checkShare.logger, 'info' if blkhashn <= networkTarget else 'debug')
234         logfunc('BLKHASH: %64x' % (blkhashn,))
235         logfunc(' TARGET: %64x' % (networkTarget,))
236         
237         txlist = MRD[1].data
238         t = txlist[0]
239         t.setCoinbase(MRD[2])
240         t.assemble()
241         
242         if blkhashn <= networkTarget:
243                 logfunc("Submitting upstream")
244                 RBDs.append( deepcopy( (data, txlist) ) )
245                 payload = assembleBlock(data, txlist)
246                 logfunc('Real block payload: %s' % (payload,))
247                 RBPs.append(payload)
248                 threading.Thread(target=blockSubmissionThread, args=(payload,)).start()
249                 bcnode.submitBlock(payload)
250                 share['upstreamResult'] = True
251                 MM.updateBlock(blkhash)
252         
253         # Gotwork hack...
254         if gotwork and blkhashn <= config.GotWorkTarget:
255                 try:
256                         coinbaseMrkl = t.data
257                         coinbaseMrkl += blkhash
258                         steps = MRD[1]._steps
259                         coinbaseMrkl += pack('B', len(steps))
260                         for step in steps:
261                                 coinbaseMrkl += step
262                         coinbaseMrkl += b"\0\0\0\0"
263                         info = {}
264                         info['hash'] = b2a_hex(blkhash).decode('ascii')
265                         info['header'] = b2a_hex(data).decode('ascii')
266                         info['coinbaseMrkl'] = b2a_hex(coinbaseMrkl).decode('ascii')
267                         thr = threading.Thread(target=submitGotwork, args=(info,))
268                         thr.daemon = True
269                         thr.start()
270                 except:
271                         checkShare.logger.warning('Failed to build gotwork request')
272         
273         logShare(share)
274 checkShare.logger = logging.getLogger('checkShare')
275
276 def receiveShare(share):
277         # TODO: username => userid
278         try:
279                 checkShare(share)
280         except RejectedShare as rej:
281                 share['rejectReason'] = str(rej)
282                 logShare(share)
283                 raise
284         # TODO
285
286 def newBlockNotification(signum, frame):
287         logging.getLogger('newBlockNotification').info('Received new block notification')
288         MM.updateMerkleTree()
289         # TODO: Force RESPOND TO LONGPOLLS?
290         pass
291
292 from signal import signal, SIGUSR1
293 signal(SIGUSR1, newBlockNotification)
294
295
296 import os
297 import os.path
298 import pickle
299 import signal
300 import sys
301 from time import sleep
302 import traceback
303
304 SAVE_STATE_FILENAME = 'eloipool.worklog'
305
306 def stopServers():
307         logger = logging.getLogger('stopServers')
308         
309         logger.info('Stopping servers...')
310         global bcnode, server
311         servers = (bcnode, server)
312         for s in servers:
313                 s.keepgoing = False
314         for s in servers:
315                 s.wakeup()
316         i = 0
317         while True:
318                 sl = []
319                 for s in servers:
320                         if s.running:
321                                 sl.append(s.__class__.__name__)
322                 if not sl:
323                         break
324                 i += 1
325                 if i >= 0x100:
326                         logger.error('Servers taking too long to stop (%s), giving up' % (', '.join(sl)))
327                         break
328                 sleep(0.01)
329         
330         for s in servers:
331                 for fd in s._fd.keys():
332                         os.close(fd)
333
334 def saveState():
335         logger = logging.getLogger('saveState')
336         
337         # Then, save data needed to resume work
338         logger.info('Saving work state to \'%s\'...' % (SAVE_STATE_FILENAME,))
339         i = 0
340         while True:
341                 try:
342                         with open(SAVE_STATE_FILENAME, 'wb') as f:
343                                 pickle.dump( (workLog, DupeShareHACK), f )
344                         break
345                 except:
346                         i += 1
347                         if i >= 0x10000:
348                                 logger.error('Failed to save work\n' + traceback.format_exc())
349                                 try:
350                                         os.unlink(SAVE_STATE_FILENAME)
351                                 except:
352                                         logger.error(('Failed to unlink \'%s\'; resume may have trouble\n' % (SAVE_STATE_FILENAME,)) + traceback.format_exc())
353
354 def exit():
355         stopServers()
356         saveState()
357         logging.getLogger('exit').info('Goodbye...')
358         os.kill(os.getpid(), signal.SIGTERM)
359         sys.exit(0)
360
361 def restart():
362         stopServers()
363         saveState()
364         logging.getLogger('restart').info('Restarting...')
365         try:
366                 os.execv(sys.argv[0], sys.argv)
367         except:
368                 logging.getLogger('restart').error('Failed to exec\n' + traceback.format_exc())
369
370 def restoreState():
371         if not os.path.exists(SAVE_STATE_FILENAME):
372                 return
373         
374         global workLog, DupeShareHACK
375         
376         logger = logging.getLogger('restoreState')
377         logger.info('Restoring saved state from \'%s\' (%d bytes)' % (SAVE_STATE_FILENAME, os.stat(SAVE_STATE_FILENAME).st_size))
378         try:
379                 with open(SAVE_STATE_FILENAME, 'rb') as f:
380                         data = pickle.load(f)
381                         workLog = data[0]
382                         DupeShareHACK = data[1]
383         except:
384                 logger.error('Failed to restore state\n' + traceback.format_exc())
385                 return
386         logger.info('State restored successfully')
387
388
389 from jsonrpcserver import JSONRPCListener, JSONRPCServer
390 import interactivemode
391 from networkserver import NetworkListener
392 import threading
393
394 if __name__ == "__main__":
395         LSbc = []
396         if not hasattr(config, 'BitcoinNodeAddresses'):
397                 config.BitcoinNodeAddresses = ()
398         for a in config.BitcoinNodeAddresses:
399                 LSbc.append(NetworkListener(bcnode, a))
400         
401         if hasattr(config, 'UpstreamBitcoindNode') and config.UpstreamBitcoindNode:
402                 BitcoinLink(bcnode, dest=config.UpstreamBitcoindNode)
403         
404         server = JSONRPCServer()
405         if hasattr(config, 'JSONRPCAddress'):
406                 if not hasattr(config, 'JSONRPCAddresses'):
407                         config.JSONRPCAddresses = []
408                 config.JSONRPCAddresses.insert(0, config.JSONRPCAddress)
409         LS = []
410         for a in config.JSONRPCAddresses:
411                 LS.append(JSONRPCListener(server, a))
412         if hasattr(config, 'SecretUser'):
413                 server.SecretUser = config.SecretUser
414         server.aux = MM.CoinbaseAux
415         server.getBlockHeader = getBlockHeader
416         server.getBlockTemplate = getBlockTemplate
417         server.receiveShare = receiveShare
418         server.RaiseRedFlags = RaiseRedFlags
419         
420         restoreState()
421         
422         bcnode_thr = threading.Thread(target=bcnode.serve_forever)
423         bcnode_thr.daemon = True
424         bcnode_thr.start()
425         
426         server.serve_forever()