This commit is contained in:
Bob Farrell 2025-05-02 19:57:49 +01:00
parent e647b0057a
commit b795c6184e
4 changed files with 272 additions and 179 deletions

View File

@ -4,6 +4,13 @@ const httpz = @import("httpz");
const jetzig = @import("../../jetzig.zig");
pub const Env = struct {
store: *jetzig.kv.Store.GeneralStore,
cache: *jetzig.kv.Store.CacheStore,
job_queue: *jetzig.kv.Store.JobQueueStore,
logger: jetzig.loggers.Logger,
};
pub fn RoutedChannel(Routes: type) type {
return struct {
const Channel = @This();
@ -12,6 +19,32 @@ pub fn RoutedChannel(Routes: type) type {
websocket: *jetzig.websockets.RoutedWebsocket(Routes),
state: *jetzig.data.Value,
data: *jetzig.data.Data,
_connections: *std.StringHashMap(Connection),
env: Env,
const Connection = struct {
object: *jetzig.data.Value,
data: *jetzig.Data,
};
pub fn init(allocator: std.mem.Allocator, websocket: *jetzig.websockets.Websocket) !Channel {
const connections = try allocator.create(std.StringHashMap(Connection));
connections.* = std.StringHashMap(Connection).init(allocator);
return .{
.allocator = allocator,
.websocket = websocket,
.state = try websocket.getState(),
.data = websocket.data,
._connections = connections,
.env = .{
.store = websocket.store,
.cache = websocket.cache,
.job_queue = websocket.job_queue,
.logger = websocket.logger,
},
};
}
pub fn publish(channel: Channel, data: anytype) !void {
var stack_fallback = std.heap.stackFallback(4096, channel.allocator);
@ -23,7 +56,7 @@ pub fn RoutedChannel(Routes: type) type {
const writer = write_buffer.writer();
try std.json.stringify(data, .{}, writer);
try write_buffer.flush();
channel.websocket.logger.DEBUG(
channel.env.logger.DEBUG(
"Published Channel message for `{s}`",
.{channel.websocket.route.path},
) catch {};
@ -45,12 +78,48 @@ pub fn RoutedChannel(Routes: type) type {
try writer.writeAll("__jetzig_event__:");
try std.json.stringify(.{ .method = method, .params = args }, .{}, writer);
try write_buffer.flush();
channel.websocket.logger.DEBUG(
channel.env.logger.DEBUG(
"Invoked Javascript function `{s}` for `{s}`",
.{ @tagName(method), channel.websocket.route.path },
) catch {};
}
pub fn connect(channel: Channel, comptime scope: []const u8) !*jetzig.data.Value {
if (channel._connections.get(scope)) |cached| {
return cached.object;
}
if (channel.websocket.session_id.len != 32) return error.JetzigInvalidSessionIdLength;
const connections = channel.get("_connections") orelse try channel.put("_connections", .array);
const connection_id = for (connections.items(.array)) |connection| {
if (connection.getT(.string, "scope")) |connection_scope| {
if (std.mem.eql(u8, connection_scope, scope)) {
break connection.getT(.string, "id") orelse return error.JetzigInvalidChannelState;
}
}
} else blk: {
const id = try channel.allocator.alloc(u8, 32);
_ = jetzig.util.generateRandomString(id);
try connections.append(.{ .id = id, .scope = scope });
break :blk id;
};
var buf: [32 + ":".len + 32]u8 = undefined;
const connection_key = try std.fmt.bufPrint(&buf, "{s}:{s}", .{ channel.websocket.session_id, connection_id });
return try channel.websocket.channels.get(channel.data, connection_key) orelse blk: {
const data = try channel.allocator.create(jetzig.Data);
data.* = jetzig.Data.init(channel.allocator);
const object = try data.root(.object);
const duped_connection_key = try channel.allocator.dupe(u8, connection_key);
try channel.websocket.channels.put(duped_connection_key, object);
try channel._connections.put(scope, .{ .data = data, .object = object });
const connections_state = channel.get("_connections_state") orelse try channel.put("_connections_state", .object);
try connections_state.put(scope, object);
break :blk object;
};
}
pub fn getT(
channel: Channel,
comptime T: jetzig.data.Data.ValueType,
@ -76,7 +145,13 @@ pub fn RoutedChannel(Routes: type) type {
}
pub fn sync(channel: Channel) !void {
try channel.websocket.syncState(channel);
try channel.websocket.syncState(channel.data, "__root__");
var it = channel._connections.iterator();
while (it.next()) |entry| {
const data = entry.value_ptr.*.data;
const scope = entry.key_ptr.*;
try channel.websocket.syncState(data, scope);
}
}
};
}

View File

@ -225,6 +225,9 @@ pub fn RoutedServer(Routes: type) type {
.route = route,
.session_id = session_id,
.channels = self.channels,
.store = self.store,
.cache = self.cache,
.job_queue = self.job_queue,
.logger = self.logger,
},
);

View File

@ -2,6 +2,10 @@ window.jetzig = window.jetzig ? window.jetzig : {}
jetzig = window.jetzig;
(() => {
const state_tag = "__jetzig_channel_state__:";
const actions_tag = "__jetzig_actions__:";
const event_tag = "__jetzig_event__:";
const transform = (value, state, element) => {
const id = element.getAttribute('jetzig-id');
const transformer = id && jetzig.channel.transformers[id];
@ -12,44 +16,50 @@ jetzig = window.jetzig;
}
};
jetzig.channel = {
websocket: null,
actions: {},
action_specs: {},
stateChangedCallbacks: [],
messageCallbacks: [],
invokeCallbacks: {},
elementMap: {},
transformers: {},
onStateChanged: function(callback) { this.stateChangedCallbacks.push(callback); },
onMessage: function(callback) { this.messageCallbacks.push(callback); },
init: function(host, path) {
this.websocket = new WebSocket(`ws://${host}${path}`);
this.websocket.addEventListener("message", (event) => {
const state_tag = "__jetzig_channel_state__:";
const actions_tag = "__jetzig_actions__:";
const event_tag = "__jetzig_event__:";
const reduceState = (ref, state) => {
if (!ref.startsWith('$.')) throw new Error(`Unexpected ref format: ${ref}`);
const args = ref.split('.');
args.shift();
const isNumeric = (string) => [...string].every(char => '0123456789'.includes(char));
const isObject = (object) => object && typeof object === 'object';
return args.reduce((acc, arg) => {
if (isNumeric(arg)) {
if (acc && Array.isArray(acc) && acc.length > arg) return acc[parseInt(arg)];
return null;
} else {
if (acc && isObject(acc)) return acc[arg];
return null;
}
}, state);
};
if (event.data.startsWith(state_tag)) {
const state = JSON.parse(event.data.slice(state_tag.length));
Object.entries(this.elementMap).forEach(([ref, elements]) => {
const handleState = (event, channel) => {
// TODO: Handle different scopes and update elements based on scope.
const detagged = event.data.slice(state_tag.length);
const scope = detagged.split(':', 1)[0];
const state = JSON.parse(detagged.slice(scope.length + 1));
Object.entries(channel.elementMap).forEach(([ref, elements]) => {
const value = reduceState(ref, state);
elements.forEach(element => element.innerHTML = transform(value, state, element));
});
this.stateChangedCallbacks.forEach((callback) => {
channel.stateChangedCallbacks.forEach((callback) => {
callback(state);
});
} else if (event.data.startsWith(event_tag)) {
};
const handleEvent = (event, channel) => {
const data = JSON.parse(event.data.slice(event_tag.length));
if (Object.hasOwn(this.invokeCallbacks, data.method)) {
this.invokeCallbacks[data.method].forEach(callback => {
if (Object.hasOwn(channel.invokeCallbacks, data.method)) {
channel.invokeCallbacks[data.method].forEach(callback => {
callback(data);
});
}
} else if (event.data.startsWith(actions_tag)) {
};
const handleAction = (event, channel) => {
const data = JSON.parse(event.data.slice(actions_tag.length));
data.actions.forEach(action => {
this.action_specs[action.name] = {
channel.action_specs[action.name] = {
callback: (...params) => {
if (action.params.length != params.length) {
throw new Error(`Invalid params for action '${action.name}'. Expected ${action.params.length} params, found ${params.length}`);
@ -79,15 +89,15 @@ jetzig = window.jetzig;
}
});
this.websocket.send(`_invoke:${action.name}:${JSON.stringify(params)}`);
channel.websocket.send(`_invoke:${action.name}:${JSON.stringify(params)}`);
},
spec: { ...action },
};
this.actions[action.name] = this.action_specs[action.name].callback;
channel.actions[action.name] = channel.action_specs[action.name].callback;
});
document.querySelectorAll('[jetzig-click]').forEach(element => {
const ref = element.getAttribute('jetzig-click');
const action = this.action_specs[ref];
const action = channel.action_specs[ref];
if (action) {
element.addEventListener('click', () => {
const args = [];
@ -105,32 +115,36 @@ jetzig = window.jetzig;
throw new Error(`Unknown click handler: '${ref}'`);
}
});
};
jetzig.channel = {
websocket: null,
actions: {},
action_specs: {},
stateChangedCallbacks: [],
messageCallbacks: [],
invokeCallbacks: {},
elementMap: {},
transformers: {},
onStateChanged: function(callback) { this.stateChangedCallbacks.push(callback); },
onMessage: function(callback) { this.messageCallbacks.push(callback); },
init: function(host, path) {
this.websocket = new WebSocket(`ws://${host}${path}`);
this.websocket.addEventListener("message", (event) => {
if (event.data.startsWith(state_tag)) {
handleState(event, this);
} else if (event.data.startsWith(event_tag)) {
handleEvent(event, this);
} else if (event.data.startsWith(actions_tag)) {
handleAction(event, this);
} else {
const data = JSON.parse(event.data);
this.messageCallbacks.forEach((callback) => {
callback(data);
});
}
});
const reduceState = (ref, state) => {
if (!ref.startsWith('$.')) throw new Error(`Unexpected ref format: ${ref}`);
const args = ref.split('.');
args.shift();
const isNumeric = (string) => [...string].every(char => '0123456789'.includes(char));
const isObject = (object) => object && typeof object === 'object';
return args.reduce((acc, arg) => {
if (isNumeric(arg)) {
if (acc && Array.isArray(acc) && acc.length > arg) return acc[parseInt(arg)];
return null;
} else {
if (acc && isObject(acc)) return acc[arg];
return null;
}
}, state);
};
document.querySelectorAll('[jetzig-connect]').forEach(element => {
const ref = element.getAttribute('jetzig-connect');
if (!this.elementMap[ref]) this.elementMap[ref] = [];

View File

@ -9,6 +9,9 @@ pub const Context = struct {
route: jetzig.channels.Route,
session_id: []const u8,
channels: *jetzig.kv.Store.ChannelStore,
store: *jetzig.kv.Store.GeneralStore,
cache: *jetzig.kv.Store.CacheStore,
job_queue: *jetzig.kv.Store.JobQueueStore,
logger: jetzig.loggers.Logger,
};
@ -17,6 +20,9 @@ pub fn RoutedWebsocket(Routes: type) type {
allocator: std.mem.Allocator,
connection: *httpz.websocket.Conn,
channels: *jetzig.kv.Store.ChannelStore,
store: *jetzig.kv.Store.GeneralStore,
cache: *jetzig.kv.Store.CacheStore,
job_queue: *jetzig.kv.Store.JobQueueStore,
route: jetzig.channels.Route,
data: *jetzig.Data,
session_id: []const u8,
@ -35,6 +41,9 @@ pub fn RoutedWebsocket(Routes: type) type {
.route = context.route,
.session_id = context.session_id,
.channels = context.channels,
.store = context.store,
.cache = context.cache,
.job_queue = context.job_queue,
.logger = context.logger,
.data = data,
};
@ -56,22 +65,12 @@ pub fn RoutedWebsocket(Routes: type) type {
const func = websocket.route.openConnectionFn orelse return;
const channel = jetzig.channels.Channel{
.allocator = websocket.allocator,
.websocket = websocket,
.state = try websocket.getState(),
.data = websocket.data,
};
const channel = try jetzig.channels.RoutedChannel(Routes).init(websocket.allocator, websocket);
try func(channel);
}
pub fn clientMessage(websocket: *Websocket, allocator: std.mem.Allocator, data: []const u8) !void {
const channel = jetzig.channels.RoutedChannel(Routes){
.allocator = allocator,
.websocket = websocket,
.state = try websocket.getState(),
.data = websocket.data,
};
const channel = try jetzig.channels.RoutedChannel(Routes).init(allocator, websocket);
if (websocket.invoke(channel, data)) |maybe_action| {
if (maybe_action) |action| {
@ -94,18 +93,20 @@ pub fn RoutedWebsocket(Routes: type) type {
websocket.logger.DEBUG("Routed Channel message for `{s}`", .{websocket.route.path}) catch {};
}
pub fn syncState(websocket: *Websocket, channel: jetzig.channels.RoutedChannel(Routes)) !void {
var stack_fallback = std.heap.stackFallback(4096, channel.allocator);
pub fn syncState(websocket: *Websocket, data: *jetzig.Data, scope: []const u8) !void {
const value = data.value orelse return;
var stack_fallback = std.heap.stackFallback(4096, websocket.allocator);
const allocator = stack_fallback.get();
var write_buffer = channel.websocket.connection.writeBuffer(allocator, .text);
var write_buffer = websocket.connection.writeBuffer(allocator, .text);
defer write_buffer.deinit();
const writer = write_buffer.writer();
// TODO: Make this really fast.
try websocket.channels.put(websocket.session_id, channel.state);
try writer.print("__jetzig_channel_state__:{s}", .{try websocket.data.toJson()});
try websocket.channels.put(websocket.session_id, value);
try writer.print("__jetzig_channel_state__:{s}:{s}", .{ scope, try data.toJson() });
try write_buffer.flush();
websocket.logger.DEBUG("Synchronized Channel state for `{s}`", .{websocket.route.path}) catch {};