diff --git a/demo/src/app/views/websockets.zig b/demo/src/app/views/websockets.zig index 414c068..d74b753 100644 --- a/demo/src/app/views/websockets.zig +++ b/demo/src/app/views/websockets.zig @@ -35,6 +35,10 @@ pub fn delete(id: []const u8, request: *jetzig.Request, data: *jetzig.Data) !jet return request.render(.ok); } +pub fn receiveMessage(message: jetzig.channels.Message) !void { + std.debug.print("payload: {s}\n", .{message.payload}); + try message.channel.publish("hello"); +} test "index" { var app = try jetzig.testing.app(std.testing.allocator, @import("routes")); diff --git a/demo/src/app/views/websockets/index.zmpl b/demo/src/app/views/websockets/index.zmpl index e00e9ec..8a1dc4d 100644 --- a/demo/src/app/views/websockets/index.zmpl +++ b/demo/src/app/views/websockets/index.zmpl @@ -10,7 +10,7 @@ }); websocket.addEventListener("open", (event) => { - websocket.send("hello jetzig websocket"); + websocket.send("websockets:hello jetzig websocket"); }); @end diff --git a/src/Routes.zig b/src/Routes.zig index f615cc3..9635f73 100644 --- a/src/Routes.zig +++ b/src/Routes.zig @@ -11,9 +11,12 @@ mailers_path: []const u8, buffer: std.ArrayList(u8), dynamic_routes: std.ArrayList(Function), static_routes: std.ArrayList(Function), +channel_routes: std.ArrayList(Function), module_paths: std.ArrayList([]const u8), data: *jetzig.data.Data, +const receive_message = "receiveMessage"; + const Routes = @This(); const Function = struct { @@ -120,6 +123,7 @@ pub fn init( .buffer = std.ArrayList(u8).init(allocator), .static_routes = std.ArrayList(Function).init(allocator), .dynamic_routes = std.ArrayList(Function).init(allocator), + .channel_routes = std.ArrayList(Function).init(allocator), .module_paths = std.ArrayList([]const u8).init(allocator), .data = data, }; @@ -130,6 +134,7 @@ pub fn deinit(self: *Routes) void { self.buffer.deinit(); self.static_routes.deinit(); self.dynamic_routes.deinit(); + self.channel_routes.deinit(); } /// Generates the complete route set for the application @@ -137,6 +142,7 @@ pub fn generateRoutes(self: *Routes) ![]const u8 { const writer = self.buffer.writer(); try writer.writeAll( + \\const std = @import("std"); \\const jetzig = @import("jetzig"); \\ \\pub const routes = [_]jetzig.Route{ @@ -148,6 +154,15 @@ pub fn generateRoutes(self: *Routes) ![]const u8 { \\ ); + try writer.writeAll( + \\pub const channel_routes = std.StaticStringMap(jetzig.channels.Route).initComptime(.{ + \\ + ); + try self.writeChannelRoutes(writer); + try writer.writeAll( + \\}); + \\ + ); try writer.writeAll( \\ \\pub const mailers = [_]jetzig.MailerDefinition{ @@ -254,6 +269,10 @@ fn writeRoutes(self: *Routes, writer: anytype) !void { for (view_routes.dynamic) |view_route| { try self.dynamic_routes.append(view_route); } + + for (view_routes.channel) |view_route| { + try self.channel_routes.append(view_route); + } } std.sort.pdq(Function, self.static_routes.items, {}, Function.lessThanFn); @@ -367,8 +386,24 @@ fn writeRoute(self: *Routes, writer: std.ArrayList(u8).Writer, route: Function) const RouteSet = struct { dynamic: []Function, static: []Function, + channel: []Function, }; +fn writeChannelRoutes(self: *Routes, writer: anytype) !void { + for (self.channel_routes.items) |route| { + const module_path = try self.relativePathFrom(.root, route.path, .posix); + defer self.allocator.free(module_path); + const view_name = try route.viewName(); + defer self.allocator.free(view_name); + + std.debug.print("{s}: {s}\n", .{ route.name, route.view_name }); + try writer.print( + \\.{{ "{s}", jetzig.channels.Route{{ .receiveMessageFn = @import("{s}").receiveMessage }} }}, + \\ + , .{ view_name, module_path }); + } +} + fn generateRoutesForView(self: *Routes, dir: std.fs.Dir, path: []const u8) !RouteSet { const stat = try dir.statFile(path); const source = try dir.readFileAllocOptions( @@ -385,30 +420,41 @@ fn generateRoutesForView(self: *Routes, dir: std.fs.Dir, path: []const u8) !Rout var static_routes = std.ArrayList(Function).init(self.allocator); var dynamic_routes = std.ArrayList(Function).init(self.allocator); + var channel_routes = std.ArrayList(Function).init(self.allocator); + var static_params: ?*jetzig.data.Value = null; for (self.ast.nodes.items(.tag), 0..) |tag, index| { switch (tag) { .fn_proto_multi, .fn_proto_one, .fn_proto_simple => |function_tag| { - var function = try self.parseFunction(function_tag, @enumFromInt(index), path, source); - if (function) |*capture| { - if (capture.args.len == 0) { + var maybe_function = try self.parseFunction( + function_tag, + @enumFromInt(index), + path, + source, + ); + if (maybe_function) |*function| { + if (std.mem.eql(u8, function.name, receive_message)) { + try channel_routes.append(function.*); + } + + if (!std.mem.eql(u8, function.name, receive_message) and function.args.len == 0) { std.debug.print( "Expected at least 1 argument for view function `{s}` in `{s}`", - .{ capture.name, path }, + .{ function.name, path }, ); return error.JetzigMissingViewArgument; } - for (capture.args, 0..) |arg, arg_index| { + for (function.args, 0..) |arg, arg_index| { if (std.mem.eql(u8, try arg.typeBasename(), "StaticRequest")) { - capture.static = jetzig.build_options.build_static; - capture.legacy = arg_index + 1 < capture.args.len; - try static_routes.append(capture.*); + function.static = jetzig.build_options.build_static; + function.legacy = arg_index + 1 < function.args.len; + try static_routes.append(function.*); } else if (std.mem.eql(u8, try arg.typeBasename(), "Request")) { - capture.static = false; - capture.legacy = arg_index + 1 < capture.args.len; - try dynamic_routes.append(capture.*); + function.static = false; + function.legacy = arg_index + 1 < function.args.len; + try dynamic_routes.append(function.*); } } } @@ -447,6 +493,7 @@ fn generateRoutesForView(self: *Routes, dir: std.fs.Dir, path: []const u8) !Rout return .{ .dynamic = dynamic_routes.items, .static = static_routes.items, + .channel = channel_routes.items, }; } @@ -618,6 +665,10 @@ fn parseFunction( var it = fn_proto.iterate(&self.ast); while (it.next()) |arg| { + // We don't need to resolve args for `receiveMessage` as it only has one form (it was + // added after the removal of the `data` arg from view functions). + if (std.mem.eql(u8, receive_message, function_name)) continue; + if (arg.name_token) |arg_token| { const arg_name = self.ast.tokenSlice(arg_token); const node = self.ast.nodes.get(@intFromEnum(arg.type_expr.?)); @@ -645,7 +696,7 @@ fn parseFunction( fn parseTypeExpr(self: *Routes, node: std.zig.Ast.Node) ![]const u8 { switch (node.tag) { // Currently all expected params are pointers, keeping this here in case that changes in future: - .identifier => {}, + .identifier => return self.ast.tokenSlice(@as(u32, @intCast(node.main_token))), .ptr_type_aligned => { var buf = std.ArrayList([]const u8).init(self.allocator); defer buf.deinit(); @@ -663,6 +714,14 @@ fn parseTypeExpr(self: *Routes, node: std.zig.Ast.Node) ![]const u8 { else => {}, } + // TODO: Output source line + std.log.err( + "Unexpected token type `{s}` in `{s}`", + .{ + @tagName(node.tag), + self.ast.tokenSlice(@as(u32, @intCast(node.main_token))), + }, + ); return error.JetzigAstParserError; } @@ -671,7 +730,7 @@ fn isActionFunctionName(name: []const u8) bool { if (std.mem.eql(u8, field.name, name)) return true; } - return false; + return std.mem.eql(u8, receive_message, name); } inline fn chompExtension(path: []const u8) []const u8 { diff --git a/src/jetzig.zig b/src/jetzig.zig index fcdf434..5267b94 100644 --- a/src/jetzig.zig +++ b/src/jetzig.zig @@ -25,6 +25,7 @@ pub const auth = @import("jetzig/auth.zig"); pub const callbacks = @import("jetzig/callbacks.zig"); pub const debug = @import("jetzig/debug.zig"); pub const TemplateContext = @import("jetzig/TemplateContext.zig"); +pub const channels = @import("jetzig/channels.zig"); pub const DateTime = jetcommon.types.DateTime; pub const Time = jetcommon.types.Time; diff --git a/src/jetzig/App.zig b/src/jetzig/App.zig index 7ec3d95..597bb2d 100644 --- a/src/jetzig/App.zig +++ b/src/jetzig/App.zig @@ -34,7 +34,11 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void { defer mime_map.deinit(); try mime_map.build(); - const routes = try createRoutes(self.allocator, if (@hasDecl(routes_module, "routes")) &routes_module.routes else &.{}); + const routes = try createRoutes(self.allocator, if (@hasDecl(routes_module, "routes")) + &routes_module.routes + else + &.{}); + defer { for (routes) |var_route| { var_route.deinitParams(); @@ -47,6 +51,11 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void { self.allocator.free(custom_route.template); }; + const channel_routes = if (@hasDecl(routes_module, "channel_routes")) + routes_module.channel_routes + else + std.StaticStringMap(jetzig.channels.Route).initComptime(.{}); + var store = try jetzig.kv.Store.GeneralStore.init(self.allocator, self.env.logger, .general); defer store.deinit(); @@ -85,6 +94,7 @@ pub fn start(self: *const App, routes_module: type, options: AppOptions) !void { self.allocator, self.env, routes, + channel_routes, self.custom_routes.items, if (@hasDecl(routes_module, "jobs")) &routes_module.jobs else &.{}, if (@hasDecl(routes_module, "jobs")) &routes_module.mailers else &.{}, diff --git a/src/jetzig/channels.zig b/src/jetzig/channels.zig new file mode 100644 index 0000000..ceeac5f --- /dev/null +++ b/src/jetzig/channels.zig @@ -0,0 +1,3 @@ +pub const Channel = @import("channels/Channel.zig"); +pub const Message = @import("channels/Message.zig"); +pub const Route = @import("channels/Route.zig"); diff --git a/src/jetzig/channels/Channel.zig b/src/jetzig/channels/Channel.zig new file mode 100644 index 0000000..4e1053e --- /dev/null +++ b/src/jetzig/channels/Channel.zig @@ -0,0 +1,11 @@ +const std = @import("std"); + +const httpz = @import("httpz"); + +const Channel = @This(); + +connection: *httpz.websocket.Conn, + +pub fn publish(self: Channel, data: []const u8) !void { + try self.connection.write(data); +} diff --git a/src/jetzig/channels/Message.zig b/src/jetzig/channels/Message.zig new file mode 100644 index 0000000..b1cb269 --- /dev/null +++ b/src/jetzig/channels/Message.zig @@ -0,0 +1,42 @@ +const std = @import("std"); + +const Channel = @import("Channel.zig"); + +const Message = @This(); + +data: []const u8, +channel_name: ?[]const u8, +payload: []const u8, +channel: Channel, + +pub fn init(channel: Channel, data: []const u8) Message { + const channel_name = parseChannelName(data); + const payload = parsePayload(data, channel_name); + return .{ .data = data, .channel = channel, .channel_name = channel_name, .payload = payload }; +} + +fn parseChannelName(data: []const u8) ?[]const u8 { + return if (std.mem.indexOfScalar(u8, data, ':')) |index| + if (index > 1) data[0..index] else null + else + null; +} + +fn parsePayload(data: []const u8, maybe_channel_name: ?[]const u8) []const u8 { + return if (maybe_channel_name) |channel_name| + data[channel_name.len + 1 ..] + else + data; +} + +test "message with channel and payload" { + const message = Message.init("foo:bar"); + try std.testing.expectEqualStrings(message.channel_name.?, "foo"); + try std.testing.expectEqualStrings(message.payload, "bar"); +} + +test "message with payload only" { + const message = Message.init("bar"); + try std.testing.expectEqual(message.channel_name, null); + try std.testing.expectEqualStrings(message.payload, "bar"); +} diff --git a/src/jetzig/channels/Route.zig b/src/jetzig/channels/Route.zig new file mode 100644 index 0000000..64269ab --- /dev/null +++ b/src/jetzig/channels/Route.zig @@ -0,0 +1,9 @@ +const jetzig = @import("../../jetzig.zig"); + +const Route = @This(); + +receiveMessageFn: *const fn (jetzig.channels.Message) anyerror!void, + +pub fn receiveMessage(route: Route, message: jetzig.channels.Message) !void { + try route.receiveMessageFn(message); +} diff --git a/src/jetzig/http/Server.zig b/src/jetzig/http/Server.zig index f783983..ff09bc9 100644 --- a/src/jetzig/http/Server.zig +++ b/src/jetzig/http/Server.zig @@ -10,6 +10,7 @@ allocator: std.mem.Allocator, logger: jetzig.loggers.Logger, env: jetzig.Environment, routes: []const *const jetzig.views.Route, +channel_routes: std.StaticStringMap(jetzig.channels.Route), custom_routes: []const jetzig.views.Route, job_definitions: []const jetzig.JobDefinition, mailer_definitions: []const jetzig.MailerDefinition, @@ -29,6 +30,7 @@ pub fn init( allocator: std.mem.Allocator, env: jetzig.Environment, routes: []const *const jetzig.views.Route, + channel_routes: std.StaticStringMap(jetzig.channels.Route), custom_routes: []const jetzig.views.Route, job_definitions: []const jetzig.JobDefinition, mailer_definitions: []const jetzig.MailerDefinition, @@ -44,6 +46,7 @@ pub fn init( .logger = env.logger, .env = env, .routes = routes, + .channel_routes = channel_routes, .custom_routes = custom_routes, .job_definitions = job_definitions, .mailer_definitions = mailer_definitions, @@ -176,14 +179,20 @@ pub fn processNextRequest( try self.logger.logRequest(&request); } +/// Attempt to match a channel name to a view with a Channel implementation. +pub fn matchChannelRoute(self: *const Server, channel_name: []const u8) ?jetzig.channels.Route { + return self.channel_routes.get(channel_name); +} + fn upgradeWebsocket(self: *const Server, httpz_request: *httpz.Request, httpz_response: *httpz.Response) !bool { return try httpz.upgradeWebsocket( jetzig.http.Websocket, httpz_request, httpz_response, - jetzig.http.Websocket.Context{ .allocator = self.allocator }, + jetzig.http.Websocket.Context{ .allocator = self.allocator, .server = self }, ); } + fn maybeMiddlewareRender(request: *jetzig.http.Request, response: *const jetzig.http.Response) !bool { if (request.middleware_rendered) |_| { // Request processing ends when a middleware renders or redirects. diff --git a/src/jetzig/http/Websocket.zig b/src/jetzig/http/Websocket.zig index fb815d1..0a88e42 100644 --- a/src/jetzig/http/Websocket.zig +++ b/src/jetzig/http/Websocket.zig @@ -1,24 +1,35 @@ const std = @import("std"); +const jetzig = @import("../../jetzig.zig"); + const httpz = @import("httpz"); pub const Context = struct { allocator: std.mem.Allocator, + server: *const jetzig.http.Server, }; const Websocket = @This(); -connection: *httpz.websocket.Conn, allocator: std.mem.Allocator, +connection: *httpz.websocket.Conn, +server: *const jetzig.http.Server, pub fn init(connection: *httpz.websocket.Conn, context: Context) !Websocket { return .{ - .connection = connection, .allocator = context.allocator, + .connection = connection, + .server = context.server, }; } pub fn clientMessage(self: *Websocket, data: []const u8) !void { - const message = try std.mem.concat(self.allocator, u8, &.{ "Hello from Jetzig websocket. Your message was: ", data }); - try self.connection.write(message); + const channel = jetzig.channels.Channel{ .connection = self.connection }; + const message = jetzig.channels.Message.init(channel, data); + + if (message.channel_name) |target_channel_name| { + if (self.server.matchChannelRoute(target_channel_name)) |route| { + try route.receiveMessage(message); + } else try self.server.logger.WARN("Unrecognized channel: {s}", .{target_channel_name}); + } else try self.server.logger.WARN("Invalid channel message format.", .{}); } diff --git a/src/tests.zig b/src/tests.zig index 2746893..07ff09c 100644 --- a/src/tests.zig +++ b/src/tests.zig @@ -11,5 +11,7 @@ test { _ = @import("jetzig/http/Path.zig"); _ = @import("jetzig/jobs/Job.zig"); _ = @import("jetzig/mail/Mail.zig"); + _ = @import("jetzig/channels/Channel.zig"); + _ = @import("jetzig/channels/Message.zig"); _ = @import("jetzig/loggers/LogQueue.zig"); }