package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "image/jpeg" "io" "log" "log/slog" "net/http" "net/url" "os" "regexp" "strconv" "strings" "time" "git.zio.sh/astra/bsky2tg/bsky" tgbotapi "github.com/OvyFlash/telegram-bot-api" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" vidio "git.zio.sh/astra/Vidio" ) const ( // serverAddr = "wss://stream.zio.blue/subscribe" serverAddr = "wss://jetstream2.us-west.bsky.network/subscribe" postFormat = "%s\nā\nš¦ @%s" quotePostFormat = "
%s\nā”ļø @%s\nā\nš¦ @%s" embedURL = "https://fxbsky.app/profile/%s/post/%s" ) type handler struct { seenSeqs map[int64]struct{} tg *tgbotapi.BotAPI bsky *bsky.BSky } var ( post = flag.String("post", "", "URL to a BlueSky post") delete = flag.Bool("delete", false, "true/false to delete post") oldPosts = flag.Float64("oldposttime", 1, "Ignore posts if createdAt more than this many hours ago") ) func main() { flag.Parse() var handle = os.Getenv("BSKY_HANDLE") var password = os.Getenv("BSKY_PASSWORD") bskyClient := bsky.NewBSky() err := bskyClient.Auth([]string{handle, password}) if err != nil { log.Fatal(err, ". please set BSKY_HANDLE and BSKY_PASSWORD env variables") } h := &handler{ seenSeqs: make(map[int64]struct{}), bsky: bskyClient, } endpoint := "https://api.telegram.org/bot%s/%s" if os.Getenv("TG_API_ENDPOINT") != "" { endpoint = os.Getenv("TG_API_ENDPOINT") } bot, err := tgbotapi.NewBotAPIWithAPIEndpoint(os.Getenv("TG_TOKEN"), endpoint) if err != nil { panic(err) } h.tg = bot if os.Getenv("TG_CHANNEL_ID") == "" { log.Fatal("TG_CHANNEL_ID is not set") } if *post != "" { r := regexp.MustCompile(`^https:\/\/.*?\/profile\/(.*?)\/post\/(.*?)$`) s := r.FindStringSubmatch(*post) handle := s[1] if s[1][0:4] != "did:" { handle, _ = bskyClient.ResolveHandle(s[1]) } if handle != bskyClient.Bluesky.Cfg.DID { log.Fatal("Unable to send posts from other accounts") } tgpost, tgposterr := h.bsky.Bluesky.GetTelegramData(s[2]) if *delete { if tgposterr == "" { log.Printf("Found post %s in channel %d, deleting", s[2], tgpost.ChannelID) m := tgbotapi.NewDeleteMessages(tgpost.ChannelID, tgpost.MessageID) h.tg.Send(m) h.bsky.Bluesky.DeleteRecord([]string{s[2], s[1], bsky.PostCollection}) } else { log.Printf("Unable to find post %s on PDS", s[2]) } return } if tgposterr == "" { log.Printf("Post %s already sent to channel %d, exiting", s[2], tgpost.ChannelID) return } postJSON := bskyClient.Bluesky.FetchPost(handle, s[2]) p, _ := json.Marshal(postJSON.Record) h.ProcessPost(&models.Event{ Did: postJSON.Author.Did, TimeUS: postJSON.Record.CreatedAt.Unix(), Kind: "", Commit: &models.Commit{ CID: postJSON.Cid, Operation: "create", RKey: strings.Split(postJSON.URI, "/")[4], Collection: "app.bsky.feed.post", Record: p, }, }) return } ctx := context.Background() slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug.Level(), }))) logger := slog.Default() config := client.DefaultClientConfig() config.WebsocketURL = serverAddr config.WantedCollections = []string{"app.bsky.feed.post"} config.WantedDids = []string{bskyClient.Bluesky.Cfg.DID} config.Compress = true scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) c, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("failed to create client: %v", err) } cursor := time.Now().UnixMicro() restartCount := 0 loop: if err := c.ConnectAndRead(ctx, &cursor); err != nil { log.Printf("failed to connect: %v. retrying", err) if restartCount >= 3 { log.Print("restart count limit hit, setting cursor to now\n") restartCount = 0 cursor = time.Now().UnixMicro() } else { restartCount += 1 cursor = int64(bskyClient.Bluesky.Cfg.Cursor) } goto loop } } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit == nil { return nil } switch event.Commit.Operation { case models.CommitOperationCreate, models.CommitOperationUpdate: h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post bsky.PersistAuthSession(h.bsky.Bluesky.Cfg) h.ProcessPost(event) case models.CommitOperationDelete: h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post bsky.PersistAuthSession(h.bsky.Bluesky.Cfg) r, e := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey) if e == "" { m := tgbotapi.NewDeleteMessages(r.ChannelID, r.MessageID) h.tg.Send(m) h.bsky.Bluesky.DeleteRecord([]string{event.Commit.RKey, event.Did, bsky.PostCollection}) } } return nil } func (h *handler) handleVideo(media bsky.ParsedEmbeds) (tgbotapi.InputMedia, error) { url := buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI) log.Printf("Fetching video: %s\n", url) resp, err := http.Get(url) if err != nil { return nil, fmt.Errorf("failed to fetch video: %w", err) } defer resp.Body.Close() filename := media.URI + ".mp4" f, err := os.Create(filename) if err != nil { return nil, fmt.Errorf("failed to create video file: %w", err) } defer func() { f.Close() os.Remove(filename) }() if _, err := io.Copy(f, resp.Body); err != nil { return nil, fmt.Errorf("failed to write video: %w", err) } if _, err := f.Seek(0, 0); err != nil { return nil, fmt.Errorf("failed to seek video: %w", err) } mediaAdd := tgbotapi.NewInputMediaVideo(tgbotapi.FileReader{Name: "video.mp4", Reader: f}) metadata, err := getVideoMetadata(f.Name()) if err != nil { return nil, fmt.Errorf("failed to read video metadata: %w", err) } mediaAdd.SupportsStreaming = true mediaAdd.Height = metadata.Height() mediaAdd.Width = metadata.Width() mediaAdd.Duration = int(metadata.Duration()) frames, _ := metadata.ReadFrames(0) var buf bytes.Buffer jpeg.Encode(&buf, frames[0], &jpeg.Options{Quality: 90}) mediaAdd.Thumb = tgbotapi.FileBytes{Name: "thumb.jpg", Bytes: buf.Bytes()} return &mediaAdd, nil } func (h *handler) ProcessPost(event *models.Event) error { ps, _ := h.bsky.ParsePost(event.Commit.Record) po := ps.GetEmbeds() cid, _ := strconv.ParseInt(os.Getenv("TG_CHANNEL_ID"), 10, 64) isEditedPost := false now := time.Now() createdAt := ps.CreatedAt duration := now.Sub(createdAt) if duration.Hours() > *oldPosts || strings.HasPrefix(ps.Text, "@") || ps.IsReply() { return nil } telegramRecord, telegramRecordErr := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey) if telegramRecordErr == "" { isEditedPost = true } aliases := h.bsky.Bluesky.FetchAliases() facets := ps.ProcessFacets(aliases) ownHandle, handleErr := h.bsky.GetHandleFromDID(h.bsky.Bluesky.Cfg.DID) if handleErr != nil { ownHandle = h.bsky.Bluesky.Cfg.Handle } var captionText string if ps.IsQuotePost() { var quotedURI string if ps.Embed.Record != nil && ps.Embed.Record.Record != nil && ps.Embed.Record.Record.URI != "" { quotedURI = ps.Embed.Record.Record.URI } else if ps.Embed.Record != nil && ps.Embed.Record.URI != "" { quotedURI = ps.Embed.Record.URI } if quotedURI != "" { parts := strings.Split(quotedURI, "/") if len(parts) >= 5 { handle, _ := h.bsky.GetHandleFromDID(parts[2]) captionText = fmt.Sprintf( quotePostFormat, facets, parts[2], parts[4], handle, event.Did, event.Commit.RKey, ownHandle) } } } if captionText == "" { if facets != "" { captionText = fmt.Sprintf(postFormat, facets, h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, ownHandle) } else { captionText = fmt.Sprintf("š¦ @%s", h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, ownHandle) } } hasActualMedia := false for _, embed := range po { if embed.Type != "record" { hasActualMedia = true break } } if hasActualMedia { mediaGroup := []tgbotapi.InputMedia{} if ps.Embed.Type == "app.bsky.embed.recordWithMedia" { hasExternal := false for _, media := range po { if media.Type == "external" { hasExternal = true break } } if hasExternal && ps.Embed.Media != nil && ps.Embed.Media.External != nil { // Send as text message with webpage preview m := tgbotapi.NewMessage(cid, captionText) m.ParseMode = tgbotapi.ModeHTML m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{ IsDisabled: false, URL: ps.Embed.Media.External.URI, PreferLargeMedia: true, ShowAboveText: true, } resp, _ := h.tg.Send(m) uri, postCid := getLink(event) h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp.Chat.ID, MessageID: []int{resp.MessageID}, Link: &bsky.Link{ Cid: postCid, URI: uri, }, }, event.Commit.RKey) return nil } // recordWithMedia with images or video (not external) ā fall through to normal media handling for _, media := range po { switch media.Type { case "record": continue case "image": mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))) if len(mediaGroup) == 0 { mediaAdd.Caption = captionText mediaAdd.ParseMode = tgbotapi.ModeHTML } mediaGroup = append(mediaGroup, &mediaAdd) case "video": mediaAdd, err := h.handleVideo(media) if err != nil { log.Printf("Failed to handle video: %s\n", err) break } if len(mediaGroup) == 0 { setCaption(mediaAdd, captionText) } mediaGroup = append(mediaGroup, mediaAdd) } } } else if po[0].Type == "external" { tenorGif := tgbotapi.NewInputMediaVideo(tgbotapi.FileURL(po[0].URI)) tenorGif.Caption = captionText tenorGif.ParseMode = tgbotapi.ModeHTML mediaGroup = append(mediaGroup, &tenorGif) } else { for _, media := range po { switch media.Type { case "image": mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))) if len(mediaGroup) == 0 { mediaAdd.Caption = captionText mediaAdd.ParseMode = tgbotapi.ModeHTML } mediaGroup = append(mediaGroup, &mediaAdd) case "video": mediaAdd, err := h.handleVideo(media) if err != nil { log.Printf("Failed to handle video: %s\n", err) break } if len(mediaGroup) == 0 { setCaption(mediaAdd, captionText) } mediaGroup = append(mediaGroup, mediaAdd) } } } if len(mediaGroup) == 0 { log.Print("No mediaGroup to send, see previous error") } else { if isEditedPost { h.tg.Send(tgbotapi.NewEditMessageCaption(telegramRecord.ChannelID, telegramRecord.MessageID[0], captionText)) } else { resp, _ := h.tg.SendMediaGroup(tgbotapi.NewMediaGroup(cid, mediaGroup)) uri, postCid := getLink(event) var messageIDs []int for _, msgID := range resp { messageIDs = append(messageIDs, msgID.MessageID) } h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp[0].Chat.ID, MessageID: messageIDs, Link: &bsky.Link{ Cid: postCid, URI: uri, }, }, event.Commit.RKey) } } } else { m := tgbotapi.MessageConfig{} if captionText == "" { m = tgbotapi.NewMessage(cid, fmt.Sprintf(postFormat, facets, h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)) } else { m = tgbotapi.NewMessage(cid, captionText) } m.ParseMode = tgbotapi.ModeHTML if ps.IsQuotePost() { var previewURI string if ps.Embed.Record != nil && ps.Embed.Record.Record != nil { previewURI = fmt.Sprintf(embedURL, strings.Split(ps.Embed.Record.Record.URI, "/")[2], strings.Split(ps.Embed.Record.Record.URI, "/")[4]) } else if ps.Embed.Record != nil { previewURI = fmt.Sprintf(embedURL, strings.Split(ps.Embed.Record.URI, "/")[2], strings.Split(ps.Embed.Record.URI, "/")[4]) } m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{ IsDisabled: false, URL: previewURI, PreferSmallMedia: true, PreferLargeMedia: false, ShowAboveText: true, } } else { m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{IsDisabled: true} } resp, e := h.tg.Send(m) if e != nil { log.Printf("Failed to send message: %s\n", e) return e } uri, postCid := getLink(event) h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp.Chat.ID, MessageID: []int{resp.MessageID}, Link: &bsky.Link{ Cid: postCid, URI: uri, }, }, event.Commit.RKey) } return nil } func setCaption(media tgbotapi.InputMedia, caption string) { switch m := media.(type) { case *tgbotapi.InputMediaVideo: m.Caption = caption m.ParseMode = tgbotapi.ModeHTML case *tgbotapi.InputMediaPhoto: m.Caption = caption m.ParseMode = tgbotapi.ModeHTML } } func buildBlobURL(server string, did string, cid string) string { return server + "/xrpc/com.atproto.sync.getBlob?did=" + url.QueryEscape(did) + "&cid=" + cid } func getLink(event *models.Event) (string, string) { return fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey), event.Commit.CID } func getVideoMetadata(fname string) (video *vidio.Video, err error) { video, err = vidio.NewVideo(fname) return }