Phi: nicer scroll animation for METAR widget
[fg:fgdata.git] / Nasal / mp_broadcast.nas
1 ###############################################################################
2 ##
3 ##  A message based information broadcast for the multiplayer network.
4 ##
5 ##  Copyright (C) 2008 - 2013  Anders Gidenstam  (anders(at)gidenstam.org)
6 ##  This file is licensed under the GPL license version 2 or later.
7 ##
8 ###############################################################################
9
10 ###############################################################################
11 # Event broadcast channel using a MP enabled string property.
12 # Events from users in multiplayer.ignore are ignored.
13 #
14 # EventChannel.new(mpp_path)
15 #   Create a new event broadcast channel. Any MP user with the same
16 #   primitive will receive all messages sent to the channel from the point
17 #   she/he joined (barring severe MP packet loss).
18 #   NOTE: Message delivery is not guaranteed.
19 #     mpp_path - MP property path                        : string
20 #
21 # EventChannel.register(event_hash, handler)
22 #   Register a handler for the event identified by the hash event_hash.
23 #     event_hash - hash value for the event : a unique 4 character string
24 #     handler    - a handler function for the event : func (sender, msg)
25 #
26 # EventChannel.deregister(event_hash)
27 #   Deregister the handler for the event identified by the hash event_hash.
28 #     event_hash - hash value for the event : a unique 4 character string
29 #
30 # EventChannel.send(event_hash, msg)
31 #   Sends the event event_hash with the message msg to the channel.
32 #     event_hash - hash value for the event : a unique 4 character string
33 #     msg        - text string with Binary data encoded data : string
34 #
35 # EventChannel.die()
36 #   Destroy this EventChannel instance.
37 #
38 var EventChannel = {};
39 EventChannel.new = func (mpp_path) {
40   var obj = BroadcastChannel.new(mpp_path,
41                                  func (n, msg) { obj._process(n, msg) });
42   # Save send from being overriden.
43   obj.parent_send = obj.send;
44   # Put EventChannel methods before BroadcastChannel methods.
45   obj.parents = [EventChannel] ~ obj.parents;
46   obj.events  = {};
47   return obj;
48 }
49 EventChannel.register = func (event_hash,
50                               handler) {
51   me.events[event_hash] = handler;
52 }
53 EventChannel.deregister = func (event_hash) {
54   delete(me.events, event_hash);
55 }
56 EventChannel.send = func (event_hash,
57                           msg) {
58   me.parent_send(event_hash ~ msg);
59 }
60 ############################################################
61 # Internals.
62 EventChannel._process = func (n, msg) {
63   var event_hash = Binary.readHash(msg);
64   if (contains(me.events, event_hash)) {
65     me.events[event_hash](n, substr(msg, Binary.sizeOf["Hash"]));
66   }
67 }
68
69 ###############################################################################
70 # Broadcast primitive using a MP enabled string property.
71 # Broadcasts from users in multiplayer.ignore are ignored.
72 #
73 # BroadcastChannel.new(mpp_path, process)
74 #   Create a new broadcast primitive. Any MP user with the same
75 #   primitive will receive all messages sent to the channel from the point
76 #   she/he joined (barring severe MP packet loss).
77 #   NOTE: Message delivery is not guaranteed.
78 #     mpp_path - MP property path                        : string
79 #     process  - handler called when receiving a message : func (n, msg)
80 #                n is the base node of the senders property tree
81 #                (i.e. /ai/models/multiplay[x])
82 #     send_to_self - if 1 locally sent messages are      : int {0,1}
83 #                    delivered just like remote messages.
84 #                    If 0 locally sent messages are not delivered
85 #                    to the local receiver.
86 #     accept_predicate - function to select which        : func (p)
87 #                        multiplayers to listen to.
88 #                        p is the multiplayer entry node.
89 #                        The default is to accept any multiplayer.
90 #     on_disconnect - function to be called when an      : func (p)
91 #                     accepted MP user leaves.
92 #     enable_send   - Set to 0 to disable sending.
93 #
94 # BroadcastChannel.send(msg)
95 #   Sends the message msg to the channel.
96 #     msg - text string with Binary data encoded data : string
97 #
98 # BroadcastChannel.die()
99 #   Destroy this BroadcastChannel instance.
100 #
101 var BroadcastChannel = {};
102 BroadcastChannel.new = func (mpp_path, process,
103                              send_to_self = 0,
104                              accept_predicate = nil,
105                              on_disconnect = nil,
106                              enable_send=1) {
107   var obj = { parents      : [BroadcastChannel],
108               mpp_path     : mpp_path,
109               send_node    : enable_send ? props.globals.getNode(mpp_path, 1) 
110                                          : nil,
111               process_msg  : process,
112               send_to_self : send_to_self,
113               accept_predicate :
114                 (accept_predicate != nil) ? accept_predicate
115                                           : func (p) { return 1; },
116               on_disconnect : (on_disconnect != nil) ? on_disconnect
117                                                      : func (p) { return; },
118               # Internal state.
119               started      : 0,    # External state: started/stopped.
120               running      : 0,    # Internal state: running or not.
121               send_buf     : [],
122               peers        : {},
123               loopid       : 0,
124               last_time    : 0.0,  # For join handling.
125               last_send    : 0.0   # For the send queue 
126             };
127   if (enable_send and (obj.send_node == nil)) {
128     printlog("warn",
129              "BroadcastChannel invalid send node.");
130     return nil;
131   }
132   setlistener(obj.ONLINE_pp, func {
133     obj.set_state();
134   });
135   obj.start();
136
137   return obj;
138 }
139 BroadcastChannel.send = func (msg) {
140   if (!me.running or me.send_node == nil)
141     return;
142
143   var t = getprop("/sim/time/elapsed-sec");
144   if (((t - me.last_send) > me.SEND_TIME) and (size(me.send_buf) == 0)) {
145     me.send_node.setValue(msg);
146     me.last_send = t;
147     if (me.send_to_self) me.process_msg(props.globals, msg);
148   } else {
149     append(me.send_buf, msg);
150   }
151 }
152 BroadcastChannel.die = func {
153   me.loopid += 1;
154   me.started = 0;
155   me.running = 0;
156   #print("BroadcastChannel[" ~ me.mpp_path ~ "] ...  destroyed.");
157 }
158 BroadcastChannel.start = func {
159   #print("mp_broadcast.nas: starting channel " ~ me.mpp_path ~ ".");
160   me.started = 1;
161   me.set_state();
162 }
163 BroadcastChannel.stop = func {
164   #print("mp_broadcast.nas: stopping channel " ~ me.mpp_path ~ ".");
165   me.started = 0;
166   me.set_state();
167 }
168
169 ############################################################
170 # Internals.
171 BroadcastChannel.ONLINE_pp = "/sim/multiplay/online";
172 BroadcastChannel.PERIOD    = 1.3; 
173 BroadcastChannel.SEND_TIME = 0.6;
174 BroadcastChannel.set_state = func {
175   if (me.started and getprop(me.ONLINE_pp)) {
176     if (me.running) return;
177     #print("mp_broadcast.nas: activating channel " ~ me.mpp_path ~ ".");
178     me.running = 1;
179     me._loop_(me.loopid += 1);
180   } else {
181     #print("mp_broadcast.nas: deactivating channel " ~ me.mpp_path ~ ".");
182     me.running = 0;
183     me.loopid += 1;
184   }
185 }
186 BroadcastChannel.update = func {
187   var t = getprop("/sim/time/elapsed-sec");
188   var process_msg = me.process_msg;
189
190   # Handled join/leave. This is done more seldom.
191   if ((t - me.last_time) > me.PERIOD) {
192     var mpplayers =
193       props.globals.getNode("/ai/models").getChildren("multiplayer");
194     foreach (var pilot; mpplayers) {
195       var valid = pilot.getChild("valid");
196       if ((valid != nil) and valid.getValue() and
197           !contains(multiplayer.ignore,
198                     pilot.getChild("callsign").getValue())) {
199         if ((me.peers[pilot.getIndex()] == nil) and
200             me.accept_predicate(pilot)) {
201           me.peers[pilot.getIndex()] =
202             MessageChannel.
203             new(pilot.getNode(me.mpp_path),
204                 MessageChannel.new_message_handler(process_msg, pilot));
205         }
206       } else {
207         if (contains(me.peers, pilot.getIndex())) {
208           delete(me.peers, pilot.getIndex());
209           me.on_disconnect(pilot);
210         }
211       }
212     }
213     me.last_time = t;
214   }
215   # Process new messages.
216   foreach (var w; keys(me.peers)) {
217     if (me.peers[w] != nil) me.peers[w].update();
218   }
219   # Check send buffer.
220   if (me.send_node == nil) return;
221
222   if ((t - me.last_send) > me.SEND_TIME) {
223     if (size(me.send_buf) > 0) {
224       me.send_node.setValue(me.send_buf[0]);
225       if (me.send_to_self) me.process_msg(props.globals, me.send_buf[0]);
226       me.send_buf = subvec(me.send_buf, 1);
227       me.last_send = t;
228     } else {
229       # Nothing new to send. Reset the send property to save bandwidth.
230       me.send_node.setValue("");
231     }
232   }
233 }
234 BroadcastChannel._loop_ = func (id) {
235   me.running or return;
236   id == me.loopid or return;
237
238   #print("mp_broadcast.nas: " ~ me.mpp_path ~ ":" ~ id ~ ".");
239   me.update();
240   settimer(func { me._loop_(id); }, 0, 1);
241 }
242 ######################################################################
243
244 ###############################################################################
245 # Lamport clock. Useful for creating a total order for events or messages.
246 # The users' callsigns are used to break ties.
247 #
248 # LamportClock.new()
249 #   Creates a new lamport clock for this user.
250 #
251 # LamportClock.merge(sender, sender_timestamp)
252 #   Merges the timestamp from the sender with the local clock.
253 #     sender           : base node of the senders property tree
254 #     sender_timestamp : the timestamp received from the sender.
255 #   Returns 1 if the local clock was advanced; 0 otherwise.
256 #
257 # LamportClock.advance()
258 #   Advances the local clock one tick.
259 #
260 # LamportClock.timestamp()
261 #   Returns an encoded 4 character long timestamp from the local clock.
262 #
263 var LamportClock = {
264   # LamportClock.new()
265   #   Creates a new lamport clock for this user.
266   new : func {
267     var obj = {
268       parents  : [LamportClock],
269       callsign : getprop("/sim/multiplay/callsign"),
270       time     : 0
271     };
272     return obj;
273   },
274   merge : func (sender, sender_timestamp) {
275     var sender_time = Binary.decodeInt28(sender_timestamp);
276     if (sender_time > me.time) {
277       me.time = sender_time;
278       return 1;
279     } elsif ((sender_time == me.time) and
280              (cmp(sender.getNode("callsign").getValue(), me.callsign) > 0)) {
281       return 1;
282     } else {
283       # The received timestamp is old and should be ignored.
284       return 0;
285     }
286   },
287   advance : func {
288     me.time += 1;
289   },
290   timestamp : func {
291     return Binary.encodeInt28(me.time);
292   }
293 };
294
295
296 ###############################################################################
297 # Some routines for encoding/decoding values into/from a string. 
298 # NOTE: MP is picky about what it sends in a string propery.
299 #       Encode 7 bits as a printable 8 bit character.
300 var Binary = {};
301 Binary.TWOTO27 =  134217728;
302 Binary.TWOTO28 =  268435456;
303 Binary.TWOTO31 = 2147483648;
304 Binary.TWOTO32 = 4294967296;
305 Binary.sizeOf = {};
306 ############################################################
307 Binary.sizeOf["int"] = 5;
308 Binary.encodeInt = func (int) {
309   var bf = bits.buf(5);
310   if (int < 0) int += Binary.TWOTO32;
311   var r = int;
312   for (var i = 0; i < 5; i += 1) {
313     var c = math.mod(r, 128);
314     bf[4-i] = c + `A`;
315     r = (r - c)/128;
316   }
317   return bf;
318 }
319 ############################################################
320 Binary.decodeInt = func (str) {
321   var v = 0;
322   var b = 1;
323   for (var i = 0; i < 5; i += 1) {
324     v += (str[4-i] - `A`) * b;
325     b *= 128;
326   }
327   if (v / Binary.TWOTO31 >= 1) v -= Binary.TWOTO32;
328   return int(v);
329 }
330 ############################################################
331 # NOTE: This encodes a 7 bit byte.
332 Binary.sizeOf["byte"] = 1;
333 Binary.encodeByte = func (int) {
334   var bf = bits.buf(1);
335   if (int < 0) int += 128;
336   bf[0] = math.mod(int, 128) + `A`;
337   return bf;
338 }
339 ############################################################
340 Binary.decodeByte = func (str) {
341   var v = str[0] - `A`;
342   if (v / 64 >= 1) v -= 128;
343   return int(v);
344 }
345 ############################################################
346 # NOTE: This encodes a 28 bit integer.
347 Binary.sizeOf["int28"] = 4;
348 Binary.encodeInt28 = func (int) {
349   var bf = bits.buf(4);
350   if (int < 0) int += Binary.TWOTO32;
351   var r = int;
352   for (var i = 0; i < 4; i += 1) {
353     var c = math.mod(r, 128);
354     bf[3-i] = c + `A`;
355     r = (r - c)/128;
356   }
357   return bf;
358 }
359 ############################################################
360 Binary.decodeInt28 = func (str) {
361   var v = 0;
362   var b = 1;
363   for (var i = 0; i < 4; i += 1) {
364     v += (str[3-i] - `A`) * b;
365     b *= 128;
366   }
367   if (v / Binary.TWOTO27 >= 1) v -= Binary.TWOTO28;
368   return int(v);
369 }
370 ############################################################
371 # NOTE: This can neither handle huge values nor really tiny.
372 Binary.sizeOf["double"] = 2*Binary.sizeOf["int"];
373 Binary.encodeDouble = func (d) {
374   return Binary.encodeInt(int(d)) ~
375          Binary.encodeInt((d - int(d)) * Binary.TWOTO31);
376 }
377 ############################################################
378 Binary.decodeDouble = func (str) {
379   return Binary.decodeInt(substr(str, 0)) +
380          Binary.decodeInt(substr(str, 5)) / Binary.TWOTO31;
381 }
382 ############################################################
383 # Encodes a geo.Coord object.
384 Binary.sizeOf["Coord"] = 3*Binary.sizeOf["double"];
385 Binary.encodeCoord = func (coord) {
386   return Binary.encodeDouble(coord.lat()) ~
387          Binary.encodeDouble(coord.lon()) ~
388          Binary.encodeDouble(coord.alt());
389 }
390 ############################################################
391 # Decodes an encoded geo.Coord object.
392 Binary.decodeCoord = func (str) {
393   var coord = geo.aircraft_position();
394   coord.set_latlon(Binary.decodeDouble(substr(str, 0)),
395                    Binary.decodeDouble(substr(str, 10)),
396                    Binary.decodeDouble(substr(str, 20)));
397   return coord;
398 }
399 ############################################################
400 # Encodes a string as a hash value.
401 Binary.sizeOf["Hash"] = 4;
402 Binary.stringHash = func (str) {
403   var hash = 0;
404   for(var i=0; i<size(str); i+=1) {
405       hash += math.mod(32*hash + str[i], Binary.TWOTO28-3);
406   }
407   return substr(Binary.encodeInt(hash), 1, 4);
408 }
409 ############################################################
410 # Decodes an encoded geo.Coord object.
411 Binary.readHash = func (str) {
412   return substr(str, 0, Binary.sizeOf["Hash"]);
413 }
414 ############################################################
415 Binary.sizeOf["LamportTS"] = 4;
416 ######################################################################
417
418 ###############################################################################
419 # Detects incomming messages encoded in a string property.
420 #   n       - MP source : property node
421 #   process - action    : func (v)
422 # NOTE: This is a low level component.
423 #       The same object is seldom used for both sending and receiving.
424 var MessageChannel = {};
425 MessageChannel.new = func (n = nil, process = nil) {
426   var obj = { parents     : [MessageChannel],
427               node        : n, 
428               process_msg : process,
429               old         : "" };
430   return obj;
431 }
432 MessageChannel.update = func {
433   if (me.node == nil) return;
434
435   var msg = me.node.getValue();
436   if (!streq(typeof(msg), "scalar")) return;
437
438   if ((me.process_msg != nil) and
439       !streq(msg, "") and
440       !streq(msg, me.old)) {
441     me.process_msg(msg);
442     me.old = msg;
443   }
444 }
445 MessageChannel.send = func (msg) {
446   me.node.setValue(msg);
447 }
448 MessageChannel.new_message_handler = func (handler, arg1) {
449   var local_arg1 = arg1; # Disconnect from future changes to arg1.
450   return func (msg) { handler(local_arg1, msg) };
451 };