diff --git a/demo/src/app/views/websockets.zig b/demo/src/app/views/websockets.zig index d74b753..4194833 100644 --- a/demo/src/app/views/websockets.zig +++ b/demo/src/app/views/websockets.zig @@ -36,8 +36,32 @@ pub fn delete(id: []const u8, request: *jetzig.Request, data: *jetzig.Data) !jet } pub fn receiveMessage(message: jetzig.channels.Message) !void { - std.debug.print("payload: {s}\n", .{message.payload}); - try message.channel.publish("hello"); + const data = try message.data(); + if (data.getT(.string, "toggle")) |toggle| { + if (message.channel.get("cells")) |cells| { + const is_taken = cells.getT(.boolean, toggle); + if (is_taken == null or is_taken.? == false) { + try cells.put(toggle, true); + } + } else { + var cells = try message.channel.put("cells", .object); + for (1..10) |cell| { + var buf: [1]u8 = undefined; + const key = try std.fmt.bufPrint(&buf, "{d}", .{cell}); + try cells.put(key, std.mem.eql(u8, key, toggle)); + } + } + try message.channel.sync(); + } else { + var cells = try message.channel.put("cells", .object); + for (1..10) |cell| { + var buf: [1]u8 = undefined; + const key = try std.fmt.bufPrint(&buf, "{d}", .{cell}); + try cells.put(key, false); + } + try message.channel.sync(); + } + // try message.channel.publish("hello"); } test "index" { diff --git a/demo/src/app/views/websockets/index.zmpl b/demo/src/app/views/websockets/index.zmpl index 8a1dc4d..17f920c 100644 --- a/demo/src/app/views/websockets/index.zmpl +++ b/demo/src/app/views/websockets/index.zmpl @@ -1,17 +1,78 @@ + + @if (context.request) |request| @if (request.headers.get("host")) |host| @end @end + + + + + + + + + + + + + + + + + + + + + +
+ + diff --git a/src/Routes.zig b/src/Routes.zig index 9635f73..243e64e 100644 --- a/src/Routes.zig +++ b/src/Routes.zig @@ -396,7 +396,6 @@ fn writeChannelRoutes(self: *Routes, writer: anytype) !void { const view_name = try route.viewName(); defer self.allocator.free(view_name); - std.debug.print("{s}: {s}\n", .{ route.name, route.view_name }); try writer.print( \\.{{ "{s}", jetzig.channels.Route{{ .receiveMessageFn = @import("{s}").receiveMessage }} }}, \\ diff --git a/src/jetzig/App.zig b/src/jetzig/App.zig index 597bb2d..7ba4244 100644 --- a/src/jetzig/App.zig +++ b/src/jetzig/App.zig @@ -65,6 +65,9 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void { var cache = try jetzig.kv.Store.CacheStore.init(self.allocator, self.env.logger, .cache); defer cache.deinit(); + var channels = try jetzig.kv.Store.CacheStore.init(self.allocator, self.env.logger, .channels); + defer channels.deinit(); + var repo = try jetzig.database.repo(self.allocator, self); defer repo.deinit(); @@ -102,6 +105,7 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void { &store, &job_queue, &cache, + &channels, &repo, options.global, ); diff --git a/src/jetzig/channels/Channel.zig b/src/jetzig/channels/Channel.zig index 4e1053e..6b35f0f 100644 --- a/src/jetzig/channels/Channel.zig +++ b/src/jetzig/channels/Channel.zig @@ -2,10 +2,33 @@ const std = @import("std"); const httpz = @import("httpz"); +const jetzig = @import("../../jetzig.zig"); + const Channel = @This(); -connection: *httpz.websocket.Conn, +websocket: *jetzig.http.Websocket, +state: *jetzig.data.Value, pub fn publish(self: Channel, data: []const u8) !void { try self.connection.write(data); } + +pub fn getT( + self: Channel, + comptime T: jetzig.data.Data.ValueType, + key: []const u8, +) @TypeOf(self.state.getT(T, key)) { + return self.state.getT(T, key); +} + +pub fn get(self: Channel, key: []const u8) ?*jetzig.data.Value { + return self.state.get(key); +} + +pub fn put(self: Channel, key: []const u8, value: anytype) @TypeOf(self.state.put(key, value)) { + return try self.state.put(key, value); +} + +pub fn sync(self: Channel) !void { + try self.websocket.syncState(self); +} diff --git a/src/jetzig/channels/Message.zig b/src/jetzig/channels/Message.zig index b1cb269..d729c84 100644 --- a/src/jetzig/channels/Message.zig +++ b/src/jetzig/channels/Message.zig @@ -1,32 +1,48 @@ const std = @import("std"); +const jetzig = @import("../../jetzig.zig"); + const Channel = @import("Channel.zig"); const Message = @This(); -data: []const u8, +allocator: std.mem.Allocator, +raw_data: []const u8, channel_name: ?[]const u8, payload: []const u8, channel: Channel, -pub fn init(channel: Channel, data: []const u8) Message { - const channel_name = parseChannelName(data); - const payload = parsePayload(data, channel_name); - return .{ .data = data, .channel = channel, .channel_name = channel_name, .payload = payload }; +pub fn init(allocator: std.mem.Allocator, channel: Channel, raw_data: []const u8) Message { + const channel_name = parseChannelName(raw_data); + const payload = parsePayload(raw_data, channel_name); + return .{ + .allocator = allocator, + .raw_data = raw_data, + .channel = channel, + .channel_name = channel_name, + .payload = payload, + }; } -fn parseChannelName(data: []const u8) ?[]const u8 { - return if (std.mem.indexOfScalar(u8, data, ':')) |index| - if (index > 1) data[0..index] else null +pub fn data(message: Message) !*jetzig.data.Value { + var d = try message.allocator.create(jetzig.data.Data); + d.* = jetzig.data.Data.init(message.allocator); + try d.fromJson(message.payload); + return d.value.?; +} + +fn parseChannelName(raw_data: []const u8) ?[]const u8 { + return if (std.mem.indexOfScalar(u8, raw_data, ':')) |index| + if (index > 1) raw_data[0..index] else null else null; } -fn parsePayload(data: []const u8, maybe_channel_name: ?[]const u8) []const u8 { +fn parsePayload(raw_data: []const u8, maybe_channel_name: ?[]const u8) []const u8 { return if (maybe_channel_name) |channel_name| - data[channel_name.len + 1 ..] + raw_data[channel_name.len + 1 ..] else - data; + raw_data; } test "message with channel and payload" { diff --git a/src/jetzig/config.zig b/src/jetzig/config.zig index da82dbc..7dd77fb 100644 --- a/src/jetzig/config.zig +++ b/src/jetzig/config.zig @@ -145,6 +145,19 @@ pub const cache: kv.Store.Options = .{ // }, }; +/// Channels. Identical to `store` options, but allows using different +/// backends (e.g. `.memory` for key-value store, `.file` for cache. +/// Channel state data is stored here. +pub const channels: kv.Store.Options = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-channels.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.addressSpace(4096), + // }, +}; + /// SMTP configuration for Jetzig Mail. pub const smtp: mail.SMTPConfig = .{ .port = 25, diff --git a/src/jetzig/http/Server.zig b/src/jetzig/http/Server.zig index ff09bc9..c47f45b 100644 --- a/src/jetzig/http/Server.zig +++ b/src/jetzig/http/Server.zig @@ -19,6 +19,7 @@ initialized: bool = false, store: *jetzig.kv.Store.GeneralStore, job_queue: *jetzig.kv.Store.JobQueueStore, cache: *jetzig.kv.Store.CacheStore, +channels: *jetzig.kv.Store.ChannelStore, repo: *jetzig.database.Repo, global: *anyopaque, decoded_static_route_params: []const *jetzig.data.Value = &.{}, @@ -38,6 +39,7 @@ pub fn init( store: *jetzig.kv.Store.GeneralStore, job_queue: *jetzig.kv.Store.JobQueueStore, cache: *jetzig.kv.Store.CacheStore, + channels: *jetzig.kv.Store.ChannelStore, repo: *jetzig.database.Repo, global: *anyopaque, ) Server { @@ -54,6 +56,7 @@ pub fn init( .store = store, .job_queue = job_queue, .cache = cache, + .channels = channels, .repo = repo, .global = global, }; diff --git a/src/jetzig/http/Websocket.zig b/src/jetzig/http/Websocket.zig index 0a88e42..2f17478 100644 --- a/src/jetzig/http/Websocket.zig +++ b/src/jetzig/http/Websocket.zig @@ -14,18 +14,28 @@ const Websocket = @This(); allocator: std.mem.Allocator, connection: *httpz.websocket.Conn, server: *const jetzig.http.Server, +data: *jetzig.Data, +id: [32]u8 = undefined, pub fn init(connection: *httpz.websocket.Conn, context: Context) !Websocket { - return .{ + var websocket = Websocket{ .allocator = context.allocator, .connection = connection, .server = context.server, + .data = try context.allocator.create(jetzig.Data), }; + websocket.data.* = jetzig.Data.init(context.allocator); + _ = jetzig.util.generateRandomString(&websocket.id); + + return websocket; } pub fn clientMessage(self: *Websocket, data: []const u8) !void { - const channel = jetzig.channels.Channel{ .connection = self.connection }; - const message = jetzig.channels.Message.init(channel, data); + const channel = jetzig.channels.Channel{ + .websocket = self, + .state = try self.getState(), + }; + const message = jetzig.channels.Message.init(self.allocator, channel, data); if (message.channel_name) |target_channel_name| { if (self.server.matchChannelRoute(target_channel_name)) |route| { @@ -33,3 +43,17 @@ pub fn clientMessage(self: *Websocket, data: []const u8) !void { } else try self.server.logger.WARN("Unrecognized channel: {s}", .{target_channel_name}); } else try self.server.logger.WARN("Invalid channel message format.", .{}); } + +pub fn syncState(self: *Websocket, channel: jetzig.channels.Channel) !void { + // TODO: Make this really fast. + try self.server.channels.put(&self.id, channel.state); + try self.connection.write(try self.data.toJson()); +} + +fn getState(self: *Websocket) !*jetzig.data.Value { + return try self.server.channels.get(self.data, &self.id) orelse blk: { + const root = try self.data.root(.object); + try self.server.channels.put(&self.id, root); + break :blk try self.server.channels.get(self.data, &self.id) orelse error.JetzigInvalidChannel; + }; +} diff --git a/src/jetzig/kv.zig b/src/jetzig/kv.zig index c10078e..88d8930 100644 --- a/src/jetzig/kv.zig +++ b/src/jetzig/kv.zig @@ -22,6 +22,9 @@ pub const Store = struct { /// Store ephemeral data. pub const CacheStore = @import("kv/Store.zig").Store(config.get(Store.Options, "cache")); + /// Store channel data. + pub const ChannelStore = @import("kv/Store.zig").Store(config.get(Store.Options, "channels")); + /// Background job storage. pub const JobQueueStore = @import("kv/Store.zig").Store(config.get(Store.Options, "job_queue")); diff --git a/src/jetzig/kv/Store.zig b/src/jetzig/kv/Store.zig index 3a46cbe..34721bf 100644 --- a/src/jetzig/kv/Store.zig +++ b/src/jetzig/kv/Store.zig @@ -53,7 +53,7 @@ fn jetKVOptions(options: KVOptions) jetzig.jetkv.Options { } /// Role a given store fills. Used in log outputs. -pub const Role = enum { jobs, cache, general, custom }; +pub const Role = enum { jobs, cache, general, channels, custom }; pub fn Store(comptime options: KVOptions) type { return struct { diff --git a/src/jetzig/testing/App.zig b/src/jetzig/testing/App.zig index 98c5408..2617fcc 100644 --- a/src/jetzig/testing/App.zig +++ b/src/jetzig/testing/App.zig @@ -11,6 +11,7 @@ routes: []const jetzig.views.Route, arena: *std.heap.ArenaAllocator, store: *MemoryStore, cache: *MemoryStore, +channels: *MemoryStore, job_queue: *MemoryStore, multipart_boundary: ?[]const u8 = null, logger: jetzig.loggers.Logger, @@ -63,6 +64,7 @@ pub fn init(allocator: std.mem.Allocator, routes_module: type) !App { .routes = &routes_module.routes, .store = try createStore(arena.allocator(), logger, .general), .cache = try createStore(arena.allocator(), logger, .cache), + .channels = try createStore(arena.allocator(), logger, .channels), .job_queue = try createStore(arena.allocator(), logger, .jobs), .logger = logger, .server = .{ .logger = logger }, @@ -154,6 +156,7 @@ pub fn request( .mime_map = jetzig.testing.mime_map, .store = self.store, .cache = self.cache, + .channels = self.channels, .job_queue = self.job_queue, .global = undefined, .repo = self.repo,