From b795c6184e5e40f89bca319e13e64d559dc355b5 Mon Sep 17 00:00:00 2001 From: Bob Farrell Date: Fri, 2 May 2025 19:57:49 +0100 Subject: [PATCH] WIP --- src/jetzig/channels/Channel.zig | 81 ++++- src/jetzig/http/Server.zig | 3 + src/jetzig/middleware/channels/channels.js | 332 +++++++++++---------- src/jetzig/websockets/Websocket.zig | 35 +-- 4 files changed, 272 insertions(+), 179 deletions(-) diff --git a/src/jetzig/channels/Channel.zig b/src/jetzig/channels/Channel.zig index 503126e..e989141 100644 --- a/src/jetzig/channels/Channel.zig +++ b/src/jetzig/channels/Channel.zig @@ -4,6 +4,13 @@ const httpz = @import("httpz"); const jetzig = @import("../../jetzig.zig"); +pub const Env = struct { + store: *jetzig.kv.Store.GeneralStore, + cache: *jetzig.kv.Store.CacheStore, + job_queue: *jetzig.kv.Store.JobQueueStore, + logger: jetzig.loggers.Logger, +}; + pub fn RoutedChannel(Routes: type) type { return struct { const Channel = @This(); @@ -12,6 +19,32 @@ pub fn RoutedChannel(Routes: type) type { websocket: *jetzig.websockets.RoutedWebsocket(Routes), state: *jetzig.data.Value, data: *jetzig.data.Data, + _connections: *std.StringHashMap(Connection), + env: Env, + + const Connection = struct { + object: *jetzig.data.Value, + data: *jetzig.Data, + }; + + pub fn init(allocator: std.mem.Allocator, websocket: *jetzig.websockets.Websocket) !Channel { + const connections = try allocator.create(std.StringHashMap(Connection)); + connections.* = std.StringHashMap(Connection).init(allocator); + + return .{ + .allocator = allocator, + .websocket = websocket, + .state = try websocket.getState(), + .data = websocket.data, + ._connections = connections, + .env = .{ + .store = websocket.store, + .cache = websocket.cache, + .job_queue = websocket.job_queue, + .logger = websocket.logger, + }, + }; + } pub fn publish(channel: Channel, data: anytype) !void { var stack_fallback = std.heap.stackFallback(4096, channel.allocator); @@ -23,7 +56,7 @@ pub fn RoutedChannel(Routes: type) type { const writer = write_buffer.writer(); try std.json.stringify(data, .{}, writer); try write_buffer.flush(); - channel.websocket.logger.DEBUG( + channel.env.logger.DEBUG( "Published Channel message for `{s}`", .{channel.websocket.route.path}, ) catch {}; @@ -45,12 +78,48 @@ pub fn RoutedChannel(Routes: type) type { try writer.writeAll("__jetzig_event__:"); try std.json.stringify(.{ .method = method, .params = args }, .{}, writer); try write_buffer.flush(); - channel.websocket.logger.DEBUG( + channel.env.logger.DEBUG( "Invoked Javascript function `{s}` for `{s}`", .{ @tagName(method), channel.websocket.route.path }, ) catch {}; } + pub fn connect(channel: Channel, comptime scope: []const u8) !*jetzig.data.Value { + if (channel._connections.get(scope)) |cached| { + return cached.object; + } + + if (channel.websocket.session_id.len != 32) return error.JetzigInvalidSessionIdLength; + + const connections = channel.get("_connections") orelse try channel.put("_connections", .array); + const connection_id = for (connections.items(.array)) |connection| { + if (connection.getT(.string, "scope")) |connection_scope| { + if (std.mem.eql(u8, connection_scope, scope)) { + break connection.getT(.string, "id") orelse return error.JetzigInvalidChannelState; + } + } + } else blk: { + const id = try channel.allocator.alloc(u8, 32); + _ = jetzig.util.generateRandomString(id); + try connections.append(.{ .id = id, .scope = scope }); + break :blk id; + }; + + var buf: [32 + ":".len + 32]u8 = undefined; + const connection_key = try std.fmt.bufPrint(&buf, "{s}:{s}", .{ channel.websocket.session_id, connection_id }); + return try channel.websocket.channels.get(channel.data, connection_key) orelse blk: { + const data = try channel.allocator.create(jetzig.Data); + data.* = jetzig.Data.init(channel.allocator); + const object = try data.root(.object); + const duped_connection_key = try channel.allocator.dupe(u8, connection_key); + try channel.websocket.channels.put(duped_connection_key, object); + try channel._connections.put(scope, .{ .data = data, .object = object }); + const connections_state = channel.get("_connections_state") orelse try channel.put("_connections_state", .object); + try connections_state.put(scope, object); + break :blk object; + }; + } + pub fn getT( channel: Channel, comptime T: jetzig.data.Data.ValueType, @@ -76,7 +145,13 @@ pub fn RoutedChannel(Routes: type) type { } pub fn sync(channel: Channel) !void { - try channel.websocket.syncState(channel); + try channel.websocket.syncState(channel.data, "__root__"); + var it = channel._connections.iterator(); + while (it.next()) |entry| { + const data = entry.value_ptr.*.data; + const scope = entry.key_ptr.*; + try channel.websocket.syncState(data, scope); + } } }; } diff --git a/src/jetzig/http/Server.zig b/src/jetzig/http/Server.zig index b1d04c4..c79fd94 100644 --- a/src/jetzig/http/Server.zig +++ b/src/jetzig/http/Server.zig @@ -225,6 +225,9 @@ pub fn RoutedServer(Routes: type) type { .route = route, .session_id = session_id, .channels = self.channels, + .store = self.store, + .cache = self.cache, + .job_queue = self.job_queue, .logger = self.logger, }, ); diff --git a/src/jetzig/middleware/channels/channels.js b/src/jetzig/middleware/channels/channels.js index 7492f60..14395da 100644 --- a/src/jetzig/middleware/channels/channels.js +++ b/src/jetzig/middleware/channels/channels.js @@ -2,175 +2,189 @@ window.jetzig = window.jetzig ? window.jetzig : {} jetzig = window.jetzig; (() => { + const state_tag = "__jetzig_channel_state__:"; + const actions_tag = "__jetzig_actions__:"; + const event_tag = "__jetzig_event__:"; + const transform = (value, state, element) => { - const id = element.getAttribute('jetzig-id'); - const transformer = id && jetzig.channel.transformers[id]; - if (transformer) { - return transformer(value, state, element); + const id = element.getAttribute('jetzig-id'); + const transformer = id && jetzig.channel.transformers[id]; + if (transformer) { + return transformer(value, state, element); + } else { + return value === undefined || value == null ? '' : `${value}` + } + }; + + const reduceState = (ref, state) => { + if (!ref.startsWith('$.')) throw new Error(`Unexpected ref format: ${ref}`); + const args = ref.split('.'); + args.shift(); + const isNumeric = (string) => [...string].every(char => '0123456789'.includes(char)); + const isObject = (object) => object && typeof object === 'object'; + return args.reduce((acc, arg) => { + if (isNumeric(arg)) { + if (acc && Array.isArray(acc) && acc.length > arg) return acc[parseInt(arg)]; + return null; } else { - return value === undefined || value == null ? '' : `${value}` + if (acc && isObject(acc)) return acc[arg]; + return null; } + }, state); + }; + + const handleState = (event, channel) => { + // TODO: Handle different scopes and update elements based on scope. + const detagged = event.data.slice(state_tag.length); + const scope = detagged.split(':', 1)[0]; + const state = JSON.parse(detagged.slice(scope.length + 1)); + Object.entries(channel.elementMap).forEach(([ref, elements]) => { + const value = reduceState(ref, state); + elements.forEach(element => element.innerHTML = transform(value, state, element)); + }); + channel.stateChangedCallbacks.forEach((callback) => { + callback(state); + }); + }; + + const handleEvent = (event, channel) => { + const data = JSON.parse(event.data.slice(event_tag.length)); + if (Object.hasOwn(channel.invokeCallbacks, data.method)) { + channel.invokeCallbacks[data.method].forEach(callback => { + callback(data); + }); + } + }; + + const handleAction = (event, channel) => { + const data = JSON.parse(event.data.slice(actions_tag.length)); + data.actions.forEach(action => { + channel.action_specs[action.name] = { + callback: (...params) => { + if (action.params.length != params.length) { + throw new Error(`Invalid params for action '${action.name}'. Expected ${action.params.length} params, found ${params.length}`); + } + [...action.params].forEach((param, index) => { + if (param.type !== typeof params[index]) { + const err = `Incorrect argument type for argument ${index} in '${action.name}'. Expected: ${param.type}, found ${typeof params[index]}`; + switch (param.type) { + case "string": + params[index] = `${params[index]}`; + break; + case "integer": + try { params[index] = parseInt(params[index]) } catch { + throw new Error(err); + }; + break; + case "float": + try { params[index] = parseFloat(params[index]) } catch { + throw new Error(err); + }; + case "boolean": + params[index] = ["true", "y", "1", "yes", "t"].includes(params[index]); + break; + default: + throw new Error(err); + } + } + }); + + channel.websocket.send(`_invoke:${action.name}:${JSON.stringify(params)}`); + }, + spec: { ...action }, + }; + channel.actions[action.name] = channel.action_specs[action.name].callback; + }); + document.querySelectorAll('[jetzig-click]').forEach(element => { + const ref = element.getAttribute('jetzig-click'); + const action = channel.action_specs[ref]; + if (action) { + element.addEventListener('click', () => { + const args = []; + action.spec.params.forEach(param => { + const arg = element.dataset[param.name]; + if (arg === undefined) { + throw new Error(`Expected 'data-${param.name}' attribute for '${action.name}' click handler.`); + } else { + args.push(element.dataset[param.name]); + } + }); + action.callback(...args); + }); + } else { + throw new Error(`Unknown click handler: '${ref}'`); + } + }); }; jetzig.channel = { - websocket: null, - actions: {}, - action_specs: {}, - stateChangedCallbacks: [], - messageCallbacks: [], - invokeCallbacks: {}, - elementMap: {}, - transformers: {}, - onStateChanged: function(callback) { this.stateChangedCallbacks.push(callback); }, - onMessage: function(callback) { this.messageCallbacks.push(callback); }, - init: function(host, path) { - this.websocket = new WebSocket(`ws://${host}${path}`); - this.websocket.addEventListener("message", (event) => { - const state_tag = "__jetzig_channel_state__:"; - const actions_tag = "__jetzig_actions__:"; - const event_tag = "__jetzig_event__:"; - - if (event.data.startsWith(state_tag)) { - const state = JSON.parse(event.data.slice(state_tag.length)); - Object.entries(this.elementMap).forEach(([ref, elements]) => { - const value = reduceState(ref, state); - elements.forEach(element => element.innerHTML = transform(value, state, element)); - }); - this.stateChangedCallbacks.forEach((callback) => { - callback(state); - }); - } else if (event.data.startsWith(event_tag)) { - const data = JSON.parse(event.data.slice(event_tag.length)); - if (Object.hasOwn(this.invokeCallbacks, data.method)) { - this.invokeCallbacks[data.method].forEach(callback => { - callback(data); - }); - } - } else if (event.data.startsWith(actions_tag)) { - const data = JSON.parse(event.data.slice(actions_tag.length)); - data.actions.forEach(action => { - this.action_specs[action.name] = { - callback: (...params) => { - if (action.params.length != params.length) { - throw new Error(`Invalid params for action '${action.name}'. Expected ${action.params.length} params, found ${params.length}`); - } - [...action.params].forEach((param, index) => { - if (param.type !== typeof params[index]) { - const err = `Incorrect argument type for argument ${index} in '${action.name}'. Expected: ${param.type}, found ${typeof params[index]}`; - switch (param.type) { - case "string": - params[index] = `${params[index]}`; - break; - case "integer": - try { params[index] = parseInt(params[index]) } catch { - throw new Error(err); - }; - break; - case "float": - try { params[index] = parseFloat(params[index]) } catch { - throw new Error(err); - }; - case "boolean": - params[index] = ["true", "y", "1", "yes", "t"].includes(params[index]); - break; - default: - throw new Error(err); - } - } - }); - - this.websocket.send(`_invoke:${action.name}:${JSON.stringify(params)}`); - }, - spec: { ...action }, - }; - this.actions[action.name] = this.action_specs[action.name].callback; - }); - document.querySelectorAll('[jetzig-click]').forEach(element => { - const ref = element.getAttribute('jetzig-click'); - const action = this.action_specs[ref]; - if (action) { - element.addEventListener('click', () => { - const args = []; - action.spec.params.forEach(param => { - const arg = element.dataset[param.name]; - if (arg === undefined) { - throw new Error(`Expected 'data-${param.name}' attribute for '${action.name}' click handler.`); - } else { - args.push(element.dataset[param.name]); - } - }); - action.callback(...args); - }); - } else { - throw new Error(`Unknown click handler: '${ref}'`); - } - }); - } else { - const data = JSON.parse(event.data); - this.messageCallbacks.forEach((callback) => { - callback(data); - }); - } - + websocket: null, + actions: {}, + action_specs: {}, + stateChangedCallbacks: [], + messageCallbacks: [], + invokeCallbacks: {}, + elementMap: {}, + transformers: {}, + onStateChanged: function(callback) { this.stateChangedCallbacks.push(callback); }, + onMessage: function(callback) { this.messageCallbacks.push(callback); }, + init: function(host, path) { + this.websocket = new WebSocket(`ws://${host}${path}`); + this.websocket.addEventListener("message", (event) => { + if (event.data.startsWith(state_tag)) { + handleState(event, this); + } else if (event.data.startsWith(event_tag)) { + handleEvent(event, this); + } else if (event.data.startsWith(actions_tag)) { + handleAction(event, this); + } else { + const data = JSON.parse(event.data); + this.messageCallbacks.forEach((callback) => { + callback(data); }); + } + }); - const reduceState = (ref, state) => { - if (!ref.startsWith('$.')) throw new Error(`Unexpected ref format: ${ref}`); - const args = ref.split('.'); - args.shift(); - const isNumeric = (string) => [...string].every(char => '0123456789'.includes(char)); - const isObject = (object) => object && typeof object === 'object'; - return args.reduce((acc, arg) => { - if (isNumeric(arg)) { - if (acc && Array.isArray(acc) && acc.length > arg) return acc[parseInt(arg)]; - return null; - } else { - if (acc && isObject(acc)) return acc[arg]; - return null; - } - }, state); - }; + document.querySelectorAll('[jetzig-connect]').forEach(element => { + const ref = element.getAttribute('jetzig-connect'); + if (!this.elementMap[ref]) this.elementMap[ref] = []; + const id = `jetzig-${crypto.randomUUID()}`; + element.setAttribute('jetzig-id', id); + this.elementMap[ref].push(element); + const transformer = element.getAttribute('jetzig-transform'); + if (transformer) { + this.transformers[id] = new Function("value", "$", "element", `return ${transformer};`); + } + }); - document.querySelectorAll('[jetzig-connect]').forEach(element => { - const ref = element.getAttribute('jetzig-connect'); - if (!this.elementMap[ref]) this.elementMap[ref] = []; - const id = `jetzig-${crypto.randomUUID()}`; - element.setAttribute('jetzig-id', id); - this.elementMap[ref].push(element); - const transformer = element.getAttribute('jetzig-transform'); - if (transformer) { - this.transformers[id] = new Function("value", "$", "element", `return ${transformer};`); - } + const styled_elements = document.querySelectorAll('[jetzig-style]'); + this.onStateChanged(state => { + styled_elements.forEach(element => { + const func = new Function("$", `return ${element.getAttribute('jetzig-style')};`) + const styles = func(state); + Object.entries(styles).forEach(([key, value]) => { + element.style.setProperty(key, value); }); + }); + }); - const styled_elements = document.querySelectorAll('[jetzig-style]'); - this.onStateChanged(state => { - styled_elements.forEach(element => { - const func = new Function("$", `return ${element.getAttribute('jetzig-style')};`) - const styles = func(state); - Object.entries(styles).forEach(([key, value]) => { - element.style.setProperty(key, value); - }); - }); - }); - - // this.websocket.addEventListener("open", (event) => { - // // TODO - // this.publish("websockets", {}); - // }); - }, - receive: function(ref, callback) { - if (Object.hasOwn(this.invokeCallbacks, ref)) { - this.invokeCallbacks[ref].push(callback); - } else { - this.invokeCallbacks[ref] = [callback]; - } - }, - publish: function(data) { - if (this.websocket) { - const json = JSON.stringify(data); - this.websocket.send(json); - } - }, + // this.websocket.addEventListener("open", (event) => { + // // TODO + // this.publish("websockets", {}); + // }); + }, + receive: function(ref, callback) { + if (Object.hasOwn(this.invokeCallbacks, ref)) { + this.invokeCallbacks[ref].push(callback); + } else { + this.invokeCallbacks[ref] = [callback]; + } + }, + publish: function(data) { + if (this.websocket) { + const json = JSON.stringify(data); + this.websocket.send(json); + } + }, }; })(); diff --git a/src/jetzig/websockets/Websocket.zig b/src/jetzig/websockets/Websocket.zig index 3978f28..712ec80 100644 --- a/src/jetzig/websockets/Websocket.zig +++ b/src/jetzig/websockets/Websocket.zig @@ -9,6 +9,9 @@ pub const Context = struct { route: jetzig.channels.Route, session_id: []const u8, channels: *jetzig.kv.Store.ChannelStore, + store: *jetzig.kv.Store.GeneralStore, + cache: *jetzig.kv.Store.CacheStore, + job_queue: *jetzig.kv.Store.JobQueueStore, logger: jetzig.loggers.Logger, }; @@ -17,6 +20,9 @@ pub fn RoutedWebsocket(Routes: type) type { allocator: std.mem.Allocator, connection: *httpz.websocket.Conn, channels: *jetzig.kv.Store.ChannelStore, + store: *jetzig.kv.Store.GeneralStore, + cache: *jetzig.kv.Store.CacheStore, + job_queue: *jetzig.kv.Store.JobQueueStore, route: jetzig.channels.Route, data: *jetzig.Data, session_id: []const u8, @@ -35,6 +41,9 @@ pub fn RoutedWebsocket(Routes: type) type { .route = context.route, .session_id = context.session_id, .channels = context.channels, + .store = context.store, + .cache = context.cache, + .job_queue = context.job_queue, .logger = context.logger, .data = data, }; @@ -56,22 +65,12 @@ pub fn RoutedWebsocket(Routes: type) type { const func = websocket.route.openConnectionFn orelse return; - const channel = jetzig.channels.Channel{ - .allocator = websocket.allocator, - .websocket = websocket, - .state = try websocket.getState(), - .data = websocket.data, - }; + const channel = try jetzig.channels.RoutedChannel(Routes).init(websocket.allocator, websocket); try func(channel); } pub fn clientMessage(websocket: *Websocket, allocator: std.mem.Allocator, data: []const u8) !void { - const channel = jetzig.channels.RoutedChannel(Routes){ - .allocator = allocator, - .websocket = websocket, - .state = try websocket.getState(), - .data = websocket.data, - }; + const channel = try jetzig.channels.RoutedChannel(Routes).init(allocator, websocket); if (websocket.invoke(channel, data)) |maybe_action| { if (maybe_action) |action| { @@ -94,18 +93,20 @@ pub fn RoutedWebsocket(Routes: type) type { websocket.logger.DEBUG("Routed Channel message for `{s}`", .{websocket.route.path}) catch {}; } - pub fn syncState(websocket: *Websocket, channel: jetzig.channels.RoutedChannel(Routes)) !void { - var stack_fallback = std.heap.stackFallback(4096, channel.allocator); + pub fn syncState(websocket: *Websocket, data: *jetzig.Data, scope: []const u8) !void { + const value = data.value orelse return; + + var stack_fallback = std.heap.stackFallback(4096, websocket.allocator); const allocator = stack_fallback.get(); - var write_buffer = channel.websocket.connection.writeBuffer(allocator, .text); + var write_buffer = websocket.connection.writeBuffer(allocator, .text); defer write_buffer.deinit(); const writer = write_buffer.writer(); // TODO: Make this really fast. - try websocket.channels.put(websocket.session_id, channel.state); - try writer.print("__jetzig_channel_state__:{s}", .{try websocket.data.toJson()}); + try websocket.channels.put(websocket.session_id, value); + try writer.print("__jetzig_channel_state__:{s}:{s}", .{ scope, try data.toJson() }); try write_buffer.flush(); websocket.logger.DEBUG("Synchronized Channel state for `{s}`", .{websocket.route.path}) catch {};