This commit is contained in:
Bob Farrell 2025-04-21 19:32:35 +01:00
parent cd5a00d85f
commit 9847efdf4a
12 changed files with 202 additions and 29 deletions

View File

@ -36,8 +36,32 @@ pub fn delete(id: []const u8, request: *jetzig.Request, data: *jetzig.Data) !jet
}
pub fn receiveMessage(message: jetzig.channels.Message) !void {
std.debug.print("payload: {s}\n", .{message.payload});
try message.channel.publish("hello");
const data = try message.data();
if (data.getT(.string, "toggle")) |toggle| {
if (message.channel.get("cells")) |cells| {
const is_taken = cells.getT(.boolean, toggle);
if (is_taken == null or is_taken.? == false) {
try cells.put(toggle, true);
}
} else {
var cells = try message.channel.put("cells", .object);
for (1..10) |cell| {
var buf: [1]u8 = undefined;
const key = try std.fmt.bufPrint(&buf, "{d}", .{cell});
try cells.put(key, std.mem.eql(u8, key, toggle));
}
}
try message.channel.sync();
} else {
var cells = try message.channel.put("cells", .object);
for (1..10) |cell| {
var buf: [1]u8 = undefined;
const key = try std.fmt.bufPrint(&buf, "{d}", .{cell});
try cells.put(key, false);
}
try message.channel.sync();
}
// try message.channel.publish("hello");
}
test "index" {

View File

@ -1,17 +1,78 @@
<script>
const channel = {
websocket: null,
callbacks: [],
onStateChanged: function(callback) { this.callbacks.push(callback); },
publish: function(path, data) {
if (this.websocket) {
const json = JSON.stringify(data);
this.websocket.send(`${path}:${json}`);
}
},
};
</script>
@if (context.request) |request|
@if (request.headers.get("host")) |host|
<script>
const websocket = new WebSocket('ws://{{host}}');
console.log(websocket);
websocket.addEventListener("message", (event) => {
console.log(event.data);
});
websocket.addEventListener("open", (event) => {
websocket.send("websockets:hello jetzig websocket");
channel.websocket = new WebSocket('ws://{{host}}');
channel.websocket.addEventListener("message", (event) => {
const state = JSON.parse(event.data);
channel.callbacks.forEach((callback) => {
callback(state);
});
});
@// channel.websocket.addEventListener("open", (event) => {
@// // TODO
@// channel.publish("websockets", {});
@// });
</script>
@end
@end
<style>
#tic-tac-toe-grid td {
min-width: 5rem;
width: 5rem;
height: 5rem;
border: 1px dotted black;
font-size: 3rem;
font-family: monospace;
}
</style>
<table id="tic-tac-toe-grid">
<tbody>
<tr>
<td id="tic-tac-toe-cell-1" data-cell="1"></td>
<td id="tic-tac-toe-cell-2" data-cell="2"></td>
<td id="tic-tac-toe-cell-3" data-cell="3"></td>
</tr>
<tr>
<td id="tic-tac-toe-cell-4" data-cell="4"></td>
<td id="tic-tac-toe-cell-5" data-cell="5"></td>
<td id="tic-tac-toe-cell-6" data-cell="6"></td>
</tr>
<tr>
<td id="tic-tac-toe-cell-7" data-cell="7"></td>
<td id="tic-tac-toe-cell-8" data-cell="8"></td>
<td id="tic-tac-toe-cell-9" data-cell="9"></td>
</tr>
</tbody>
</table>
<script>
channel.onStateChanged(state => {
console.log(state);
Object.entries(state.cells).forEach(([cell, toggle]) => {
const element = document.querySelector(`#tic-tac-toe-cell-${cell}`);
element.innerHTML = toggle ? "&#9992;" : "&#129422;"
});
});
document.querySelectorAll("#tic-tac-toe-grid td").forEach(element => {
element.addEventListener("click", () => {
channel.publish("websockets", { toggle: element.dataset.cell });
});
});
</script>

View File

@ -396,7 +396,6 @@ fn writeChannelRoutes(self: *Routes, writer: anytype) !void {
const view_name = try route.viewName();
defer self.allocator.free(view_name);
std.debug.print("{s}: {s}\n", .{ route.name, route.view_name });
try writer.print(
\\.{{ "{s}", jetzig.channels.Route{{ .receiveMessageFn = @import("{s}").receiveMessage }} }},
\\

View File

@ -65,6 +65,9 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void {
var cache = try jetzig.kv.Store.CacheStore.init(self.allocator, self.env.logger, .cache);
defer cache.deinit();
var channels = try jetzig.kv.Store.CacheStore.init(self.allocator, self.env.logger, .channels);
defer channels.deinit();
var repo = try jetzig.database.repo(self.allocator, self);
defer repo.deinit();
@ -102,6 +105,7 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void {
&store,
&job_queue,
&cache,
&channels,
&repo,
options.global,
);

View File

@ -2,10 +2,33 @@ const std = @import("std");
const httpz = @import("httpz");
const jetzig = @import("../../jetzig.zig");
const Channel = @This();
connection: *httpz.websocket.Conn,
websocket: *jetzig.http.Websocket,
state: *jetzig.data.Value,
pub fn publish(self: Channel, data: []const u8) !void {
try self.connection.write(data);
}
pub fn getT(
self: Channel,
comptime T: jetzig.data.Data.ValueType,
key: []const u8,
) @TypeOf(self.state.getT(T, key)) {
return self.state.getT(T, key);
}
pub fn get(self: Channel, key: []const u8) ?*jetzig.data.Value {
return self.state.get(key);
}
pub fn put(self: Channel, key: []const u8, value: anytype) @TypeOf(self.state.put(key, value)) {
return try self.state.put(key, value);
}
pub fn sync(self: Channel) !void {
try self.websocket.syncState(self);
}

View File

@ -1,32 +1,48 @@
const std = @import("std");
const jetzig = @import("../../jetzig.zig");
const Channel = @import("Channel.zig");
const Message = @This();
data: []const u8,
allocator: std.mem.Allocator,
raw_data: []const u8,
channel_name: ?[]const u8,
payload: []const u8,
channel: Channel,
pub fn init(channel: Channel, data: []const u8) Message {
const channel_name = parseChannelName(data);
const payload = parsePayload(data, channel_name);
return .{ .data = data, .channel = channel, .channel_name = channel_name, .payload = payload };
pub fn init(allocator: std.mem.Allocator, channel: Channel, raw_data: []const u8) Message {
const channel_name = parseChannelName(raw_data);
const payload = parsePayload(raw_data, channel_name);
return .{
.allocator = allocator,
.raw_data = raw_data,
.channel = channel,
.channel_name = channel_name,
.payload = payload,
};
}
fn parseChannelName(data: []const u8) ?[]const u8 {
return if (std.mem.indexOfScalar(u8, data, ':')) |index|
if (index > 1) data[0..index] else null
pub fn data(message: Message) !*jetzig.data.Value {
var d = try message.allocator.create(jetzig.data.Data);
d.* = jetzig.data.Data.init(message.allocator);
try d.fromJson(message.payload);
return d.value.?;
}
fn parseChannelName(raw_data: []const u8) ?[]const u8 {
return if (std.mem.indexOfScalar(u8, raw_data, ':')) |index|
if (index > 1) raw_data[0..index] else null
else
null;
}
fn parsePayload(data: []const u8, maybe_channel_name: ?[]const u8) []const u8 {
fn parsePayload(raw_data: []const u8, maybe_channel_name: ?[]const u8) []const u8 {
return if (maybe_channel_name) |channel_name|
data[channel_name.len + 1 ..]
raw_data[channel_name.len + 1 ..]
else
data;
raw_data;
}
test "message with channel and payload" {

View File

@ -145,6 +145,19 @@ pub const cache: kv.Store.Options = .{
// },
};
/// Channels. Identical to `store` options, but allows using different
/// backends (e.g. `.memory` for key-value store, `.file` for cache.
/// Channel state data is stored here.
pub const channels: kv.Store.Options = .{
.backend = .memory,
// .backend = .file,
// .file_options = .{
// .path = "/path/to/jetkv-channels.db",
// .truncate = false, // Set to `true` to clear the store on each server launch.
// .address_space_size = jetzig.jetkv.JetKV.addressSpace(4096),
// },
};
/// SMTP configuration for Jetzig Mail.
pub const smtp: mail.SMTPConfig = .{
.port = 25,

View File

@ -19,6 +19,7 @@ initialized: bool = false,
store: *jetzig.kv.Store.GeneralStore,
job_queue: *jetzig.kv.Store.JobQueueStore,
cache: *jetzig.kv.Store.CacheStore,
channels: *jetzig.kv.Store.ChannelStore,
repo: *jetzig.database.Repo,
global: *anyopaque,
decoded_static_route_params: []const *jetzig.data.Value = &.{},
@ -38,6 +39,7 @@ pub fn init(
store: *jetzig.kv.Store.GeneralStore,
job_queue: *jetzig.kv.Store.JobQueueStore,
cache: *jetzig.kv.Store.CacheStore,
channels: *jetzig.kv.Store.ChannelStore,
repo: *jetzig.database.Repo,
global: *anyopaque,
) Server {
@ -54,6 +56,7 @@ pub fn init(
.store = store,
.job_queue = job_queue,
.cache = cache,
.channels = channels,
.repo = repo,
.global = global,
};

View File

@ -14,18 +14,28 @@ const Websocket = @This();
allocator: std.mem.Allocator,
connection: *httpz.websocket.Conn,
server: *const jetzig.http.Server,
data: *jetzig.Data,
id: [32]u8 = undefined,
pub fn init(connection: *httpz.websocket.Conn, context: Context) !Websocket {
return .{
var websocket = Websocket{
.allocator = context.allocator,
.connection = connection,
.server = context.server,
.data = try context.allocator.create(jetzig.Data),
};
websocket.data.* = jetzig.Data.init(context.allocator);
_ = jetzig.util.generateRandomString(&websocket.id);
return websocket;
}
pub fn clientMessage(self: *Websocket, data: []const u8) !void {
const channel = jetzig.channels.Channel{ .connection = self.connection };
const message = jetzig.channels.Message.init(channel, data);
const channel = jetzig.channels.Channel{
.websocket = self,
.state = try self.getState(),
};
const message = jetzig.channels.Message.init(self.allocator, channel, data);
if (message.channel_name) |target_channel_name| {
if (self.server.matchChannelRoute(target_channel_name)) |route| {
@ -33,3 +43,17 @@ pub fn clientMessage(self: *Websocket, data: []const u8) !void {
} else try self.server.logger.WARN("Unrecognized channel: {s}", .{target_channel_name});
} else try self.server.logger.WARN("Invalid channel message format.", .{});
}
pub fn syncState(self: *Websocket, channel: jetzig.channels.Channel) !void {
// TODO: Make this really fast.
try self.server.channels.put(&self.id, channel.state);
try self.connection.write(try self.data.toJson());
}
fn getState(self: *Websocket) !*jetzig.data.Value {
return try self.server.channels.get(self.data, &self.id) orelse blk: {
const root = try self.data.root(.object);
try self.server.channels.put(&self.id, root);
break :blk try self.server.channels.get(self.data, &self.id) orelse error.JetzigInvalidChannel;
};
}

View File

@ -22,6 +22,9 @@ pub const Store = struct {
/// Store ephemeral data.
pub const CacheStore = @import("kv/Store.zig").Store(config.get(Store.Options, "cache"));
/// Store channel data.
pub const ChannelStore = @import("kv/Store.zig").Store(config.get(Store.Options, "channels"));
/// Background job storage.
pub const JobQueueStore = @import("kv/Store.zig").Store(config.get(Store.Options, "job_queue"));

View File

@ -53,7 +53,7 @@ fn jetKVOptions(options: KVOptions) jetzig.jetkv.Options {
}
/// Role a given store fills. Used in log outputs.
pub const Role = enum { jobs, cache, general, custom };
pub const Role = enum { jobs, cache, general, channels, custom };
pub fn Store(comptime options: KVOptions) type {
return struct {

View File

@ -11,6 +11,7 @@ routes: []const jetzig.views.Route,
arena: *std.heap.ArenaAllocator,
store: *MemoryStore,
cache: *MemoryStore,
channels: *MemoryStore,
job_queue: *MemoryStore,
multipart_boundary: ?[]const u8 = null,
logger: jetzig.loggers.Logger,
@ -63,6 +64,7 @@ pub fn init(allocator: std.mem.Allocator, routes_module: type) !App {
.routes = &routes_module.routes,
.store = try createStore(arena.allocator(), logger, .general),
.cache = try createStore(arena.allocator(), logger, .cache),
.channels = try createStore(arena.allocator(), logger, .channels),
.job_queue = try createStore(arena.allocator(), logger, .jobs),
.logger = logger,
.server = .{ .logger = logger },
@ -154,6 +156,7 @@ pub fn request(
.mime_map = jetzig.testing.mime_map,
.store = self.store,
.cache = self.cache,
.channels = self.channels,
.job_queue = self.job_queue,
.global = undefined,
.repo = self.repo,